www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - std.concurrency.receive() and event demultiplexer

reply "Ruslan Mullakhmetov" <nobody example.com> writes:
Hi,

I'm playing around with std.concurrency and found it quite 
interesting. I drop all praise words (they all already said) and 
go to the question itself.

With std.concurrency we could have a number of asynchronously 
operating routines doing job "linearly", for instance, reading 
from socket and sending received data to consumer thread. that 
ok. but what if socket or generally handling task blocks 
indefinitely? We lose ability to respond to external world (other 
threads). Okay, we could employ async event-based model for 
handling our task (reading socket), but now we would block in 
event demultiplexer loop.

Now we could go further and overcome it with complication of 
logic:
   - timed wait for event on socket
   - when timeout occur check receive for incoming messages again 
with timeout
   - switch back to events waiting

The drawbacks are obvious:
   - complicated logic
   - artificial switching between two demultiplexers: event loop 
and std.concurrency.receive()
   - need to choose good timeout to meet both: responsiveness and 
cpu load

Alternatively it is possible to take away all blocking operations 
to another child thread, but this does not eliminate problem of 
resource freeing. with socket example this is dangerous with 
hanging socket descriptor and (1) not telling peer socket to shut 
up conversation (2) overflow of number of open file descriptors

The problem would be solved quite elegant if either (1) receive() 
could handle different events, not just communication messages, 
(2) it would be possible to get waitable descriptor for receive() 
that could be registered in 3-party event demultiplexer.

Of course receive() is not aimed for (1) and i know no ways to 
get (2) working.

So the question: how to overcome the problem i described if i 
described it clear.
Aug 09 2013
next sibling parent Sean Kelly <sean invisibleduck.org> writes:
On Aug 9, 2013, at 10:59 AM, Ruslan Mullakhmetov <nobody example.com> =
wrote:
=20
 With std.concurrency we could have a number of asynchronously =
operating routines doing job "linearly", for instance, reading from = socket and sending received data to consumer thread. that ok. but what = if socket or generally handling task blocks indefinitely? We lose = ability to respond to external world (other threads). Okay, we could = employ async event-based model for handling our task (reading socket), = but now we would block in event demultiplexer loop. The current design was deliberate, for a few reasons. First, Phobos = doesn't have much in the way of network support. std.socket has been on = the chopping block for ages, and we haven't yet gotten a performant = option to replace it. So in large part I simply didn't feel we had a = network package to integrate. Second, while integrating IO events with = receive is appealing as far as convenience, I had difficulty coming up = with a way that was sufficiently performant to satisfy most people who = would actually be doing event-based IO. To be acceptable the method = would have to avoid per-event allocations entirely, and work in a = similar manner on both Windows (which uses a proctor pattern) and *nix = (which uses a reactor pattern). I had some ideas for how to approach = this, but it seemed like the first step would be to simply provide an = adaptor that acted as a proxy between the IO event loop and the = send/receive loop, running in a separate thread. That would handle the = convenience for casual use, and the performance-minded folks could just = ignore it and do their own thing, which is what they'd want to do = anyway.
 Alternatively it is possible to take away all blocking operations to =
another child thread, but this does not eliminate problem of resource = freeing. with socket example this is dangerous with hanging socket = descriptor and (1) not telling peer socket to shut up conversation (2) = overflow of number of open file descriptors Could you expand on this?
 The problem would be solved quite elegant if either (1) receive() =
could handle different events, not just communication messages, (2) it = would be possible to get waitable descriptor for receive() that could be = registered in 3-party event demultiplexer.
=20
 Of course receive() is not aimed for (1) and i know no ways to get (2) =
working. For (1) I think it's more that not every platform provides a performant = way to signal the receipt of multiple kinds of messages. On Windows you = can use WaitForMultipleObjects, and on *nix you can kind of fake it with = poll, but once you move to an IO event library like libev or libevent = things get a lot trickier, which leads into (2). To solve (2), one = approach would be to have each "thread" created by spawn() be a kernel = thread running a Fiber (potentially multiple fibers at some point once = the TLS/FLS issue is sorted out). Then when receive() determines that = waiting for a signal is necessary, the call could yield() back to the = event loop and let something else process. This is basically how Vibe.d = works today, but obviously without the nifty std.concurrency = integration. And it's a direction I'd like to consider heading, but = first we really need a solution to the TLS/FLS problem, which becomes = pretty complicated if we want to support dynamic libraries (we do).=
Aug 12 2013
prev sibling parent "JR" <zorael gmail.com> writes:
On Friday, 9 August 2013 at 17:59:33 UTC, Ruslan Mullakhmetov 
wrote:
 Now we could go further and overcome it with complication of 
 logic:
   - timed wait for event on socket
   - when timeout occur check receive for incoming messages 
 again with timeout
   - switch back to events waiting

 The drawbacks are obvious:
   - complicated logic
   - artificial switching between two demultiplexers: event loop 
 and std.concurrency.receive()
   - need to choose good timeout to meet both: responsiveness 
 and cpu load

 Alternatively it is possible to take away all blocking 
 operations to another child thread, but this does not eliminate 
 problem of resource freeing. with socket example this is 
 dangerous with hanging socket descriptor and (1) not telling 
 peer socket to shut up conversation (2) overflow of number of 
 open file descriptors
I ended going that way with my small toy IRC bot; one thread to *read* from the connected stream and pass on incoming lines, only briefly checking for messages inbetween (short) stream read timeouts; another thread to *write* to the same stream, indefinitely blocking in std.concurrency.receive() until a string comes along. Somewhat dumbed-down excerpt with added clarifying comments; /* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/ __gshared Socket __gsocket; // *right* in the pride D: __gshared SocketStream __gstream; void serverRead() { bool halt; char[512] buf; char[] slice; Tid broker = locateTid("broker"); register("reader"); // thread string identifier __gsocket.setOption(SocketOptionLevel.SOCKET, SocketOption.RCVTIMEO, 5.seconds); // arbitrary value // some template mixins to reduce duplicate code mixin MessageActionLocal!(halt,true,Imperative.Abort) killswitch; mixin MessageActionLocal!(halt,true,OwnerTerminated) ownerTerm; mixin MessageAction!Variant variant; // for debugging while (!halt) { slice = __gstream.readLine(buf); if (slice.length) broker.send(slice.idup); // we just want to check for queued messages, // not block waiting for new ones, so timeout immediately receiveTimeout(0.seconds, &killswitch.trigger, // these set halt = true &ownerTerm.trigger, // ^ &variant.doPrint // this just prints what it received ); } } void serverWrite() { bool halt; long prev; register("writer"); // even so, ugh for the boilerplate that remains mixin MessageActionLocal!(halt,true,Imperative.Abort) killswitch; mixin MessageActionLocal!(halt,true,OwnerTerminated) ownerTerm; mixin MessageAction!Variant variant; // likewise while (!halt) { receive( (string text) { __gstream.sendLine(text); // heavily abbreviated; with only this you'll soon // get kicked due to spam }, &killswitch.trigger, &ownerTerm.trigger, &variant.doPrint ); } } /* --8<----8<----8<----8<----8<----8<----8<----8<----8<----8<--*/ In particular I'm not happy about the __gshared resources, but this works well enough for my purposes (again, IRC bot). I initialize said socket and stream in the thread that spawns these two, so while used by both, neither will close them when exiting scope. But yes, the reader keeps switching around. Both need to be able to catch OwnerTerminated and some other choice imperatives of import. The longest stall will naturally be when it's sent a message while blocked reading from the stream, but in this context 5 seconds is not that big a deal. Still, I wish I knew some other way -- to salvage my pride, if nothing else.
Aug 16 2013