www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Shutting down thread with Socket blocking for connection

reply Vidar Wahlberg <canidae exent.net> writes:
Coming from a C++/Java world I find D's approach to concurrency slightly 
difficult to grasp, perhaps someone could help me out a bit on this problem:

I'd like to have a method that spawns a new thread which sets up a 
socket and listens for connections (in a blocking fashion). This part is 
easy, the hard part is nicely shutting down the thread listening for 
connections.
Here's some quick code (ignore the fact that even if "sock" was shared, 
you would not be guaranteed that it was initialized correctly by the 
time you call "shutdown()" and "close()"):
*************
import std.concurrency;
import std.socket;
/*shared*/ Socket sock;
void main() {
         spawn(&listen);

         /* do stuff */

         sock.shutdown(SocketShutdown.BOTH);
         sock.close();
}
void listen() {
         sock = new TcpSocket();
         sock.blocking = true;
         sock.bind(new InternetAddress(8080));
         sock.listen(10);
         sock.accept(); // assume no connection was made, thus still 
blocking
}
*************

If I make "sock" shared, then I'm not allowed to call any methods on 
"sock" in "listen()" ("is not callable using argument types (...) shared").
I came across a post about a similar issue in this mail group from 
Andrew Wiley (2011-02-15 23:59) which had some example code, but I could 
not get that code to compile. Neither could I find an example in TDPL 
that shows how to deal with threads that are blocked.

Basically, how would I go on about nicely stopping a thread that's 
waiting for connections?
Mar 04 2012
parent reply Dmitry Olshansky <dmitry.olsh gmail.com> writes:
On 05.03.2012 1:46, Vidar Wahlberg wrote:
 Coming from a C++/Java world I find D's approach to concurrency slightly
 difficult to grasp, perhaps someone could help me out a bit on this
 problem:

 I'd like to have a method that spawns a new thread which sets up a
 socket and listens for connections (in a blocking fashion). This part is
 easy, the hard part is nicely shutting down the thread listening for
 connections.

 Here's some quick code (ignore the fact that even if "sock" was shared,
 you would not be guaranteed that it was initialized correctly by the
 time you call "shutdown()" and "close()"):

 *************
 import std.concurrency;
 import std.socket;
 /*shared*/ Socket sock;

 void main() {
 spawn(&listen);

 /* do stuff */

 sock.shutdown(SocketShutdown.BOTH);
 sock.close();
 }
 void listen() {
 sock = new TcpSocket();
 sock.blocking = true;
 sock.bind(new InternetAddress(8080));
 sock.listen(10);
 sock.accept(); // assume no connection was made, thus still blocking
 }
 *************

 If I make "sock" shared, then I'm not allowed to call any methods on
 "sock" in "listen()" ("is not callable using argument types (...) shared").

Everything is thread-local by default. And thus there is extra protection with shared, for instance you *can* do atomic op on them (core.atomic), but sockets API wasn't designed with shared in mind (obviously). In any case, this oldschool sharing like that is a recipe for race conditions and bad scalability.
 I came across a post about a similar issue in this mail group from
 Andrew Wiley (2011-02-15 23:59) which had some example code, but I could
 not get that code to compile. Neither could I find an example in TDPL
 that shows how to deal with threads that are blocked.

 Basically, how would I go on about nicely stopping a thread that's
 waiting for connections?

Even in C I would put all responsibility on the listen thread. In D, you have message passing in std(!), so do a select-pooling loop and check for messages from main thread: (*not tested*) It may be that e.g. null are not accepted for select as an empty set, then just create a new empty SocketSet. void main() { Tid listenT = spawn(&listen); /* do stuff */ send(listenT, 42); //usually messages have extra info ;) } void listen() { bool working = true; Socket sock = new TcpSocket(); sock.blocking = true; sock.bind(new InternetAddress(8080)); scope(exit) { sock.shutdown(SocketShutdown.BOTH); sock.close(); } sock.listen(10); SocketSet set = new SocketSet(); set.add(sock); while(working){ if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may do plain 0 sock.accept(); // no blocking here } set.reset(); set.add(sock); receiveTimeout(dur!"us"(1), (int code){ working = false; }); } } What the heck in such convenient manner you can send it message to bind to another port, etc. -- Dmitry Olshansky
Mar 05 2012
next sibling parent reply "Regan Heath" <regan netmail.co.nz> writes:
A more efficient approach is to use async socket routines and an event  
object.

So, in main you create a shared event object, then start the listen thread.
In listen you call an async select or accept, and then wait on that /and/  
the shared event object.

To stop listen you set the shared event, which wakes it, and it notices  
the event is set and performs the cleanup/stops itself.

I'm not sure if phobos has support for this sort of thing yet, but you  
could always leverage the underlying C library.  I may be able to offer  
some pointers on Windows, but I haven't done something like this on any  
unix platform for a while..

Regan
Mar 05 2012
parent Dmitry Olshansky <dmitry.olsh gmail.com> writes:
On 05.03.2012 16:46, Regan Heath wrote:
 A more efficient approach is to use async socket routines and an event
 object.

 So, in main you create a shared event object, then start the listen thread.
 In listen you call an async select or accept, and then wait on that
 /and/ the shared event object.

 To stop listen you set the shared event, which wakes it, and it notices
 the event is set and performs the cleanup/stops itself.

 I'm not sure if phobos has support for this sort of thing yet, but you
 could always leverage the underlying C library. I may be able to offer
 some pointers on Windows, but I haven't done something like this on any
 unix platform for a while..

Yeah, the main problem with event systems & async I/O is they are not cross-platform at all. And looking at linux even across one OS. To wrap them sanely on all platforms is no trivial task. I wish we had it in phobos though. -- Dmitry Olshansky
Mar 05 2012
prev sibling next sibling parent Vidar Wahlberg <canidae exent.net> writes:
On 2012-03-05 09:38, Dmitry Olshansky wrote:
 ...
 while(working){

 if(select(set, null, null, 10) > 0){ //10 usec wait on a socket, may do
 plain 0
 sock.accept(); // no blocking here
 }
 set.reset();
 set.add(sock);
 receiveTimeout(dur!"us"(1), (int code){ working = false; });
 }
...

Thanks for the answers. However, I still have a problem: What I'd really like to do is have one thread listening for incoming connections and spawn a new thread for each connection (it's a simple HTTP server, I'm interested in learning about sockets & threads, I'm not looking for a HTTP server). "Socket.accept()" returns a Socket whenever a connection is made, my intention was to pass this to a new thread, but as I pointed out in the initial post I can't find a way to do so, and I'm unable to find a solution to this problem from your replies. Does this mean that it's recommended to keep the listening socket and all the sockets it spawns in a single thread? I don't know a whole lot on how sockets works, but can't that easily cause poor performance when you're reading from or writing to multiple sockets? I've not been able to find some D2 code that use multiple threads in conjunction with sockets, but if anyone know about some examples I'd be happy to read that.
Mar 06 2012
prev sibling parent "Danny Arends" <Fanny.Arends gmail.com> writes:
You could have a look at my attempt:

https://github.com/DannyArends/D-coding/tree/master/src/web
Mar 15 2012