www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - std.concurrency thread communication problem

reply Charles Hixson via Digitalmars-d-learn writes:
I'm building a program which I intend to have many threads that can each send 
messages to (and receive messages from) each other.  The obvious way to do 
this would be to have a shared array of Tids, but this seems to not work.  I'm 
continually fighting the system to get it to compile, and this makes me think 
it should probably be done some other way...but what?

One possibility is to have each thread maintain a separate array that contains 
all the threads, which would mean that they would need to be initialized after 
they were created.  This would avoid the problems of shared Tids, but each Tid 
contains a private mailbox, so this would be being duplicated, and that 
bothers me...it seems like a poor idea.  (Maybe I'm wrong about that...but I 
don't know.)

I do know that I want a n by n communication matrix (leaving out the main 
thread), with each thread sending messages to all to others.  (Well, except 
for a few that I haven't really defined yet, but which handle separated 
functions.)  My plan was to have each thread run an execution loop which 
frequently checked for messages received in between performing its own 
functions.  They are not intended to synchronize with each other.  They are 
not intended to be temporary, i.e., each of these threads would be started 
shortly after program initialization, and continue running until program 
termination.  But how should I get them to know each other's address?

I don't want the main thread to need to act as a switchboard between all the 
others, though I guess that would "sort of" work.  (Actually, if I need to do 
that, that job would be pulled off into yet another thread...and I end up with 
more threads than processors.  Still, that's a design that is possible, IIUC.)

Any comments or suggestions?
May 17 2014
parent reply "John Colvin" <john.loughran.colvin gmail.com> writes:
On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via 
Digitalmars-d-learn wrote:
 I'm building a program which I intend to have many threads that 
 can each send
 messages to (and receive messages from) each other.  The 
 obvious way to do
 this would be to have a shared array of Tids, but this seems to 
 not work.  I'm
 continually fighting the system to get it to compile, and this 
 makes me think
 it should probably be done some other way...but what?

 One possibility is to have each thread maintain a separate 
 array that contains
 all the threads, which would mean that they would need to be 
 initialized after
 they were created.  This would avoid the problems of shared 
 Tids, but each Tid
 contains a private mailbox, so this would be being duplicated, 
 and that
 bothers me...it seems like a poor idea.  (Maybe I'm wrong about 
 that...but I
 don't know.)
If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread. Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
May 17 2014
parent reply =?UTF-8?B?QWxpIMOHZWhyZWxp?= <acehreli yahoo.com> writes:
On 05/17/2014 12:33 PM, John Colvin wrote:
 On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
 Digitalmars-d-learn wrote:
 I'm building a program which I intend to have many threads that can
 each send
 messages to (and receive messages from) each other.  The obvious way
 to do
 this would be to have a shared array of Tids, but this seems to not
 work.  I'm
 continually fighting the system to get it to compile, and this makes
 me think
 it should probably be done some other way...but what?

 One possibility is to have each thread maintain a separate array that
 contains
 all the threads, which would mean that they would need to be
 initialized after
 they were created.  This would avoid the problems of shared Tids, but
 each Tid
 contains a private mailbox, so this would be being duplicated, and that
 bothers me...it seems like a poor idea.  (Maybe I'm wrong about
 that...but I
 don't know.)
If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread. Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
The following is what I've come up with. I had to use a number of shared-related casts. import std.stdio; import std.concurrency; import std.datetime; import std.random; import core.thread; enum threadCount = 5; enum messagePerThread = 3; // Represents messages sent to threads to start their tasks struct Start {} // Receives the number (id) of this thread and the workers to send messages to void workerFunc(size_t id, shared(Tid)[] workers) { receiveOnly!Start(); // A local function to reduce code duplication bool checkMessageForMe(Duration timeout) { return receiveTimeout( timeout, (size_t from) { writefln("%s received from %s", id, from); }); } // My main task is to send messages to others: size_t totalSent = 0; while (totalSent < messagePerThread) { auto to = uniform(0, workers.length); // Only send to others; not to self if (to != id) { auto chosen = cast(Tid)workers[to]; writefln("%s sending to %s", id, to); chosen.send(id); ++totalSent; } checkMessageForMe(0.seconds); } // Process trailing messages sent to me bool received = false; do { received = checkMessageForMe(10.msecs); } while (received); } void main() { auto workers = new shared(Tid)[threadCount]; foreach (id; 0 .. threadCount) { auto worker = spawn(&workerFunc, id, workers); workers[id] = cast(shared(Tid))worker; } foreach (sharedWorker; workers) { auto worker = cast(Tid)sharedWorker; worker.send(Start()); } thread_joinAll(); } Sample output: 0 sending to 2 4 sending to 3 4 sending to 2 1 sending to 4 3 received from 4 3 sending to 2 0 sending to 1 4 received from 1 1 received from 0 1 sending to 0 0 received from 1 0 sending to 1 1 received from 0 1 sending to 0 0 received from 1 3 sending to 2 4 sending to 2 2 sending to 0 2 received from 0 2 received from 4 3 sending to 1 2 sending to 3 0 received from 2 1 received from 3 2 received from 3 2 sending to 0 3 received from 2 0 received from 2 2 received from 3 2 received from 4 Ali
May 17 2014
parent Charles Hixson via Digitalmars-d-learn writes:
On Saturday, May 17, 2014 12:59:22 PM Ali Çehreli via Digitalmars-d-learn 
wrote:
 On 05/17/2014 12:33 PM, John Colvin wrote:
 On Saturday, 17 May 2014 at 18:43:25 UTC, Charles Hixson via
 
 Digitalmars-d-learn wrote:
 I'm building a program which I intend to have many threads that can
 each send
 messages to (and receive messages from) each other.  The obvious way
 to do
 this would be to have a shared array of Tids, but this seems to not
 work.  I'm
 continually fighting the system to get it to compile, and this makes
 me think
 it should probably be done some other way...but what?
 
 One possibility is to have each thread maintain a separate array that
 contains
 all the threads, which would mean that they would need to be
 initialized after
 they were created.  This would avoid the problems of shared Tids, but
 each Tid
 contains a private mailbox, so this would be being duplicated, and that
 bothers me...it seems like a poor idea.  (Maybe I'm wrong about
 that...but I
 don't know.)
If my understanding is correct, each Tid contains a reference to the corresponding thread's MessageBox (implemented by way of MessageBox being a class), not an independent instance. You should be fine to just have an array of the relevant Tids in each thread. Alternatively, a single __gshared array of threads should work, given you are sufficiently careful with it. Remember, if no-one is doing any writing then you don't need to do any synchronisation of reads.
The following is what I've come up with. I had to use a number of shared-related casts. import std.stdio; import std.concurrency; import std.datetime; import std.random; import core.thread; enum threadCount = 5; enum messagePerThread = 3; // Represents messages sent to threads to start their tasks struct Start {} // Receives the number (id) of this thread and the workers to send messages to void workerFunc(size_t id, shared(Tid)[] workers) { receiveOnly!Start(); // A local function to reduce code duplication bool checkMessageForMe(Duration timeout) { return receiveTimeout( timeout, (size_t from) { writefln("%s received from %s", id, from); }); } // My main task is to send messages to others: size_t totalSent = 0; while (totalSent < messagePerThread) { auto to = uniform(0, workers.length); // Only send to others; not to self if (to != id) { auto chosen = cast(Tid)workers[to]; writefln("%s sending to %s", id, to); chosen.send(id); ++totalSent; } checkMessageForMe(0.seconds); } // Process trailing messages sent to me bool received = false; do { received = checkMessageForMe(10.msecs); } while (received); } void main() { auto workers = new shared(Tid)[threadCount]; foreach (id; 0 .. threadCount) { auto worker = spawn(&workerFunc, id, workers); workers[id] = cast(shared(Tid))worker; } foreach (sharedWorker; workers) { auto worker = cast(Tid)sharedWorker; worker.send(Start()); } thread_joinAll(); } Sample output: 0 sending to 2 4 sending to 3 4 sending to 2 1 sending to 4 3 received from 4 3 sending to 2 0 sending to 1 4 received from 1 1 received from 0 1 sending to 0 0 received from 1 0 sending to 1 1 received from 0 1 sending to 0 0 received from 1 3 sending to 2 4 sending to 2 2 sending to 0 2 received from 0 2 received from 4 3 sending to 1 2 sending to 3 0 received from 2 1 received from 3 2 received from 3 2 sending to 0 3 received from 2 0 received from 2 2 received from 3 2 received from 4 Ali
Thank you immensely. That is precisely the kind of information I was hoping for.
May 17 2014