www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - std.concurrency extension

reply jdrewsen <jdrewsen nospam.com> writes:
Hi,

    Currently each thread spawned has an associated Tid when using 
std.concurrency. When calling receive() this is always the Tid (and 
associated messagebox) that you receive from.

This becomes a problem when you spawn several worker threads that send 
messages to the main thread since each call to receive must know how to 
handle all kind on messages.

For example: the current version of the curl wrapper I'm working on 
receives HTTP data from worker threads using receive. The user of this 
wrapper can also create his own worker threads that will send data to 
the main thread. This means that the curl wrapper must be able to handle 
the users worker thread messages.

The GO language has solved this problem with channels. My suggestion is 
to allow a thread to create additional Tids that can act just like 
channels. Then by adding a receive() call that accepts a Tid as first 
argument we can use them as channels.

e.g.
-------------------------------------------------
void run() {
	auto channel1 = receiveOnly!Tid();
	channel1.send("Hello world);
}

Tid channel1 = Tid();
auto tid1 = spawn(&run);
tid.send(channel1);

// receive only from the specified work on the channel
writeln(channel1.receiveOnly!string());
-------------------------------------------------

This would decouple handling of different worker threads if needed.

This is something I need for the curl wrapper I think. I can create 
patch for this if it is something that will be accepted upstream?

/Jonas
Jun 07 2011
next sibling parent reply Jose Armando Garcia <jsancio gmail.com> writes:
On Tue, Jun 7, 2011 at 7:07 PM, jdrewsen <jdrewsen nospam.com> wrote:
 Hi,

 =A0 Currently each thread spawned has an associated Tid when using
 std.concurrency. When calling receive() this is always the Tid (and
 associated messagebox) that you receive from.

 This becomes a problem when you spawn several worker threads that send
 messages to the main thread since each call to receive must know how to
 handle all kind on messages.

 For example: the current version of the curl wrapper I'm working on recei=
ves
 HTTP data from worker threads using receive. The user of this wrapper can
 also create his own worker threads that will send data to the main thread=
.
 This means that the curl wrapper must be able to handle the users worker
 thread messages.

 The GO language has solved this problem with channels. My suggestion is t=
o
 allow a thread to create additional Tids that can act just like channels.
 Then by adding a receive() call that accepts a Tid as first argument we c=
an
 use them as channels.

 e.g.
 -------------------------------------------------
 void run() {
 =A0 =A0 =A0 =A0auto channel1 =3D receiveOnly!Tid();
 =A0 =A0 =A0 =A0channel1.send("Hello world);
 }

 Tid channel1 =3D Tid();
 auto tid1 =3D spawn(&run);
 tid.send(channel1);

 // receive only from the specified work on the channel
 writeln(channel1.receiveOnly!string());
 -------------------------------------------------

 This would decouple handling of different worker threads if needed.

 This is something I need for the curl wrapper I think. I can create patch
 for this if it is something that will be accepted upstream?

 /Jonas
If I was you I would build channels on top of Tid/mbox. How you do you efficiently wait on multiple channels?
Jun 07 2011
parent Jonas Drewsen <jdrewsen nospam.com> writes:
On 08/06/11 00.53, Jose Armando Garcia wrote:
 On Tue, Jun 7, 2011 at 7:07 PM, jdrewsen<jdrewsen nospam.com>  wrote:
 Hi,

    Currently each thread spawned has an associated Tid when using
 std.concurrency. When calling receive() this is always the Tid (and
 associated messagebox) that you receive from.

 This becomes a problem when you spawn several worker threads that send
 messages to the main thread since each call to receive must know how to
 handle all kind on messages.

 For example: the current version of the curl wrapper I'm working on receives
 HTTP data from worker threads using receive. The user of this wrapper can
 also create his own worker threads that will send data to the main thread.
 This means that the curl wrapper must be able to handle the users worker
 thread messages.

 The GO language has solved this problem with channels. My suggestion is to
 allow a thread to create additional Tids that can act just like channels.
 Then by adding a receive() call that accepts a Tid as first argument we can
 use them as channels.

 e.g.
 -------------------------------------------------
 void run() {
         auto channel1 = receiveOnly!Tid();
         channel1.send("Hello world);
 }

 Tid channel1 = Tid();
 auto tid1 = spawn(&run);
 tid.send(channel1);

 // receive only from the specified work on the channel
 writeln(channel1.receiveOnly!string());
 -------------------------------------------------

 This would decouple handling of different worker threads if needed.

 This is something I need for the curl wrapper I think. I can create patch
 for this if it is something that will be accepted upstream?

 /Jonas
If I was you I would build channels on top of Tid/mbox. How you do you efficiently wait on multiple channels?
As I see it building channels on top of the current Tid/mbox model would only add an extra indirection. Since there is only one Tid per thread it would need a central receive() to dispatch to channels. In addition it would have to tune the mailbox size to accommodate all channels etc. Adding the option to provide the Tid for the receive call seems to be much simpler. Waiting on multiple channels (ie. Tids) would introduce some changes to the MessageBox class and a new condition variable. But this is a bigger change that I currently do not need myself and therefore probably would not include in a patch. /Jonas
Jun 08 2011
prev sibling next sibling parent Jesse Phillips <jessekphillips+D gmail.com> writes:
jdrewsen Wrote:

 This would decouple handling of different worker threads if needed.
 
 This is something I need for the curl wrapper I think. I can create 
 patch for this if it is something that will be accepted upstream?
 
 /Jonas
I believe Garcia is right. From the documentation: "This is a low-level messaging API upon which more structured or restrictive APIs may be built." I think a channel API probably should go into Phobos, but then again I don't know much about the multi-processing stuff.
Jun 07 2011
prev sibling parent reply Sean Kelly <sean invisibleduck.org> writes:
On Jun 7, 2011, at 3:07 PM, jdrewsen wrote:

 Hi,
=20
   Currently each thread spawned has an associated Tid when using =
std.concurrency. When calling receive() this is always the Tid (and = associated messagebox) that you receive from.
=20
 This becomes a problem when you spawn several worker threads that send =
messages to the main thread since each call to receive must know how to = handle all kind on messages.
=20
 For example: the current version of the curl wrapper I'm working on =
receives HTTP data from worker threads using receive. The user of this = wrapper can also create his own worker threads that will send data to = the main thread. This means that the curl wrapper must be able to handle = the users worker thread messages.
=20
 The GO language has solved this problem with channels. My suggestion =
is to allow a thread to create additional Tids that can act just like = channels. Then by adding a receive() call that accepts a Tid as first = argument we can use them as channels. A Tid is a thread ID, thus "tid". The current design allows you to = design any message format you want. In Erlang (on which this design is = based), it's common practice to have the sender's Tid included in the = message. As for channels... I hope that they can be built on top of the = existing messaging system. They might not be as optimal as a = from-scratch approach though, since a bit of extra dispatching would = need to be done in the wrapper.=
Jun 08 2011
parent jdrewsen <jdrewsen nospam.com> writes:
Den 08-06-2011 16:47, Sean Kelly skrev:
 On Jun 7, 2011, at 3:07 PM, jdrewsen wrote:

 Hi,

    Currently each thread spawned has an associated Tid when using
std.concurrency. When calling receive() this is always the Tid (and associated
messagebox) that you receive from.

 This becomes a problem when you spawn several worker threads that send
messages to the main thread since each call to receive must know how to handle
all kind on messages.

 For example: the current version of the curl wrapper I'm working on receives
HTTP data from worker threads using receive. The user of this wrapper can also
create his own worker threads that will send data to the main thread. This
means that the curl wrapper must be able to handle the users worker thread
messages.

 The GO language has solved this problem with channels. My suggestion is to
allow a thread to create additional Tids that can act just like channels. Then
by adding a receive() call that accepts a Tid as first argument we can use them
as channels.
A Tid is a thread ID, thus "tid". The current design allows you to design any message format you want. In Erlang (on which this design is based), it's common practice to have the sender's Tid included in the message. As for channels... I hope that they can be built on top of the existing messaging system. They might not be as optimal as a from-scratch approach though, since a bit of extra dispatching would need to be done in the wrapper.
I took a look at the std.concurrency code and noticed that one of my assumptions was wrong: A receive call does not necessarily handle messages in the messagebox in the order they arrive. If a message cannot be handled by the receive call then the next message available in the messagebox is tried and so on. My assumption was that it would throw. Now that I've checked in TDPL I can see that it is actually documented there. The library API documentation on the web could use some fleshing out though. Sorry about the noise. /Jonas
Jun 08 2011