www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Synchronize Class fields between different threads

reply DrCataclysm <4002600 ba-glauchau.de> writes:
I am trying to understand concurrent/parallel programming with D 
but i just don't get how
i should usesome of the concepts.

This is the code i am using to tying out stuff.

public class TCPListener {
     ubyte[] _messageBuffer;
     Socket _server;
     Socket _client;

     // define server in constructor
     this(string address, ushort port) {
         _server = new TcpSocket();
         _server.setOption(SocketOptionLevel.SOCKET, 
SocketOption.REUSEADDR, true);
     }

     // starts accept() in a new thread
     void startlistening(){...}

     // accepts connection and assigns client
     // after that start receiving in new thread and keep accepting
     void accept() {...}

     // receives data and adds to buffer
     // emits message if lineend in buffer
     void receiving() {...}

     // gets the client and sends the data (prefferably in its own 
task/thread)
     // emits signal when completed
     void send(string data) {...}

}

And I would like to use it in the following matter:

int main()
{
     auto o1 = new SpecialisedTCPListener();

     //do other stuff
}

class SpecialisedTCPListener{
     TCPListener _listener;

     this(){
         _listener= new TCPListener("127.0.0.1", 10000);
         _listener.connect(&MessageReceived);
         _listener.connect(&SendCallback);
         _listener.startListening();
     }

     public void MessageReceived(string message){
         auto answer = doSomeThings(message);
         // send the answer
         _listener.Send(answer);
     }

     // did it send correctly?
     public void SendCallback(CallBackData e){...}
}


When i tried this approach it did not work. Accepting and 
receiving worked normally but sending was
impossible because _client was thread local und would return a 
nullpointer.

How do i pass fields to the different threads?

I tried using spawn to start the threads but that only works with 
functions and not with class methods.
What would be better ways to do something like this?
Nov 10
parent reply rikki cattermole <rikki cattermole.co.nz> writes:
Remember this bit: Everything on the heap, is not thread-local, it is 
global. This includes everything inside a class.

When you synchronize (statement) it is locking and then unlocking a 
mutex. A class has a mutex, simple! It only prevent multiple threads 
modifying a single thing at specific times, thats all.
Nov 10
parent reply DrCataclysm <4002600 ba-glauchau.de> writes:
On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole 
wrote:
 Remember this bit: Everything on the heap, is not thread-local, 
 it is global. This includes everything inside a class.

 When you synchronize (statement) it is locking and then 
 unlocking a mutex. A class has a mutex, simple! It only prevent 
 multiple threads modifying a single thing at specific times, 
 thats all.
this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Nov 10
next sibling parent rikki cattermole <rikki cattermole.co.nz> writes:
On 10/11/2017 2:13 PM, DrCataclysm wrote:
 On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole wrote:
 Remember this bit: Everything on the heap, is not thread-local, it is 
 global. This includes everything inside a class.

 When you synchronize (statement) it is locking and then unlocking a 
 mutex. A class has a mutex, simple! It only prevent multiple threads 
 modifying a single thing at specific times, thats all.
this is my implementation of Accept private void Accept(){         // start accepting in a different thread         try{             _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString));             auto _acceptTask = task(&this.Accept);             _acceptTask.executeInNewThread();             Receive();         }         catch (SocketAcceptException e){             writeln("Error while accepting connection: " ~ e.msg);         }     } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
Assuming _client is in a class, heap.
Nov 10
prev sibling parent reply bauss <jj_1337 live.dk> writes:
On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
 On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole 
 wrote:
 Remember this bit: Everything on the heap, is not 
 thread-local, it is global. This includes everything inside a 
 class.

 When you synchronize (statement) it is locking and then 
 unlocking a mutex. A class has a mutex, simple! It only 
 prevent multiple threads modifying a single thing at specific 
 times, thats all.
this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }
Nov 10
parent reply DrCataclysm <4002600 ba-glauchau.de> writes:
On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
 On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
 On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole 
 wrote:
 Remember this bit: Everything on the heap, is not 
 thread-local, it is global. This includes everything inside a 
 class.

 When you synchronize (statement) it is locking and then 
 unlocking a mutex. A class has a mutex, simple! It only 
 prevent multiple threads modifying a single thing at specific 
 times, thats all.
this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }
thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?
Nov 10
next sibling parent reply bauss <jj_1337 live.dk> writes:
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:
 On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
 On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm wrote:
 On Friday, 10 November 2017 at 13:50:56 UTC, rikki cattermole 
 wrote:
 Remember this bit: Everything on the heap, is not 
 thread-local, it is global. This includes everything inside 
 a class.

 When you synchronize (statement) it is locking and then 
 unlocking a mutex. A class has a mutex, simple! It only 
 prevent multiple threads modifying a single thing at 
 specific times, thats all.
this is my implementation of Accept private void Accept(){ // start accepting in a different thread try{ _client = _server.accept(); emit(ClientConnected(_client.remoteAddress.toAddrString)); auto _acceptTask = task(&this.Accept); _acceptTask.executeInNewThread(); Receive(); } catch (SocketAcceptException e){ writeln("Error while accepting connection: " ~ e.msg); } } Is _client on the Heap or the Stack? If it is on the Stack, how would i get in on the Heap?
_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }
thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?
Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks. ``` static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts timeLimit--; } if (task.done) { auto value = task.yieldForce(); } ``` Could make it a function though: ``` bool yieldTimeLimit(Task)(Task task) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (yieldTimeLimit(task)) { auto value = task.yieldForce(); } ```
Nov 10
parent bauss <jj_1337 live.dk> writes:
On Friday, 10 November 2017 at 15:01:30 UTC, bauss wrote:
 On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:
 On Friday, 10 November 2017 at 14:27:41 UTC, bauss wrote:
 On Friday, 10 November 2017 at 14:13:26 UTC, DrCataclysm 
 wrote:
 [...]
_client is allocated in the heap. Socket accept(); returns the socket created from Socket accepting(). Which is like below: protected Socket accepting() pure nothrow { return new Socket; }
thank you, i thought i was going mad. It is working now. The problem was that the debugger in eclipse ddt seems to completely broken. If i run it directly from bash it is working. One last question: is there a function in the std to wait for a task to finish within a time limit?
Not an ideal solution, but should work: (I'm not aware of any build-in solutions using Phobos' tasks. ``` static const timeLimit = 1000; // Wait for the task up to 1000 milliseconds while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); // Preventing the CPU to go nuts timeLimit--; } if (task.done) { auto value = task.yieldForce(); } ``` Could make it a function though: ``` bool yieldTimeLimit(Task)(Task task) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (yieldTimeLimit(task)) { auto value = task.yieldForce(); } ```
Pardon my brain fart. The last bit should be: ``` bool yieldTimeLimit(Task)(Task task, size_t timeLimit) { while (!task.done && timeLimit) { import core.time : Thread, dur; Thread.sleep( dur!("msecs")(1) ); timeLimit--; } return task.done; } ... if (task.yieldTimeLimit(1000)) // Waits 1000 milliseconds { auto value = task.yieldForce(); } ```
Nov 10
prev sibling parent crimaniak <crimaniak gmail.com> writes:
On Friday, 10 November 2017 at 14:36:03 UTC, DrCataclysm wrote:

 It is working now. The problem was that the debugger in eclipse 
 ddt seems to completely broken. If i run it directly from bash 
 it is working.
Be careful with such statements. Typically, this situation means that there are Heisenbugs in the code that appear in certain conditions.
Nov 10