www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Thread communication

reply "Chris" <wendlec tcd.ie> writes:
Is there a good way to stop work-intensive threads via thread 
communication (instead of using a shared variable)? The example 
below is very basic and naive and only meant to exemplify the 
basic problem.

I want to stop (and abort) the worker as soon as new input 
arrives. However, while executing the function that contains the 
foreach-loop the worker thread doesn't listen, because it's busy, 
of course. I've tried a few solutions with send and receive in 
this block, but somehow none of them work perfectly.

//=======
import std.stdio : readln, writefln, writeln;
import std.string : strip;
import std.concurrency;
import core.thread;

Tid thread1;

struct Exit {}

void main()
{
   string input;
   bool exists;
   while ((input = readln.strip) != null)
   {
     if (exists)
     {
       thread1.send(Exit());
     }
     thread1 = spawn(&worker);
     exists = true;
     thread1.send(input.idup);
   }
}

void worker()
{
   bool run = true;
   while (run)
   {
     receive(
       (string input)
       {
         foreach (ref i; 0..10)
         {
           writefln("%d.\tDoing something with input %s", i+1, 
input);
           Thread.sleep(500.msecs);
         }
         run = false;
       },
       (Exit exit)
       {
         run = false;
       }
     );
   }
   writeln("End of thread worker");
}
//=======
Aug 04 2015
next sibling parent reply "Dicebot" <public dicebot.lv> writes:
receiveTimeout
Aug 04 2015
parent reply =?UTF-8?B?QWxpIMOHZWhyZWxp?= <acehreli yahoo.com> writes:
On 08/04/2015 09:19 AM, Dicebot wrote:
 receiveTimeout
I think the problem here is that the worker is busy, not even able to call that. This sounds like sending a signal to the specific thread (with pthread_kill()) but I don't know the details of it nor whether Phobos supports it. Ali
Aug 04 2015
parent reply "Chris" <wendlec tcd.ie> writes:
On Tuesday, 4 August 2015 at 18:15:08 UTC, Ali Çehreli wrote:
 On 08/04/2015 09:19 AM, Dicebot wrote:
 receiveTimeout
I think the problem here is that the worker is busy, not even able to call that. This sounds like sending a signal to the specific thread (with pthread_kill()) but I don't know the details of it nor whether Phobos supports it. Ali
The problem is that it works up to a certain extent with receiveTimeout. However, if the input arrives in very short intervals, all the solutions I've come up with so far (including data sharing) fail sooner or later. New threads are spawned faster than old ones can be given the abort signal. There are ways to wait, till a given thread dies, say with a shared variable isAlive `while (isAlive) {}`, but even here I've come across problems when the input comes very fast. I don't know how to solve this problem, because message passing follows a linear protocol (as far as I understand it) and shared variables give rise to data races. Something like pthread_kill() would indeed be useful, to terminate a thread at random. I wonder if fibers would be an option. D-threads seem to be based on the assumption that there is no need to abort threads at random, any time. Or am I mistaken?
Aug 05 2015
parent reply "Marc =?UTF-8?B?U2Now7x0eiI=?= <schuetzm gmx.net> writes:
On Wednesday, 5 August 2015 at 11:23:28 UTC, Chris wrote:
 The problem is that it works up to a certain extent with 
 receiveTimeout. However, if the input arrives in very short 
 intervals, all the solutions I've come up with so far 
 (including data sharing) fail sooner or later. New threads are 
 spawned faster than old ones can be given the abort signal. 
 There are ways to wait, till a given thread dies, say with a 
 shared variable isAlive `while (isAlive) {}`, but even here 
 I've come across problems when the input comes very fast.
You could use a thread pool, thereby limiting the number of threads that can run at any one time. But I guess you want the processing of new data to start as soon as possible, in which case that wouldn't help you.
 I don't know how to solve this problem, because message passing 
 follows a linear protocol (as far as I understand it) and 
 shared variables give rise to data races. Something like 
 pthread_kill() would indeed be useful, to terminate a thread at 
 random. I wonder if fibers would be an option.

 D-threads seem to be based on the assumption that there is no 
 need to abort threads at random, any time. Or am I mistaken?
It was a conscious decision not to provide a kill method for threads, because it is impossible to guarantee that your program is still consistent afterwards. Maybe we can lift this restriction if we know that the thread's main function is pure and takes no references to mutable data, because then it can by definition never mess up the program's state. OTOH, the GC might be running at the time the thread is killed, which could again lead to inconsistencies...
Aug 05 2015
next sibling parent reply "Alex Parrill" <initrd.gz gmail.com> writes:
On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
 Maybe we can lift this restriction if we know that the thread's 
 main function is pure and takes no references to mutable data, 
 because then it can by definition never mess up the program's 
 state.
That'd be a pretty useless thread; how would it communicate results back to the main thread (or wherever it should go)?
Aug 05 2015
parent "Marc =?UTF-8?B?U2Now7x0eiI=?= <schuetzm gmx.net> writes:
On Wednesday, 5 August 2015 at 14:34:42 UTC, Alex Parrill wrote:
 On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
 Maybe we can lift this restriction if we know that the 
 thread's main function is pure and takes no references to 
 mutable data, because then it can by definition never mess up 
 the program's state.
That'd be a pretty useless thread; how would it communicate results back to the main thread (or wherever it should go)?
It could return something. `std.concurrency.Tid` would have to be extended with a `join()` method that returns its result. Or we could somehow allow sending and receiving data.
Aug 05 2015
prev sibling parent =?UTF-8?B?IuWyqeWAiSDmvqoi?= <mio.iwakura gmail.com> writes:
On Wednesday, 5 August 2015 at 14:31:20 UTC, Marc Schütz wrote:
 It was a conscious decision not to provide a kill method for 
 threads, because it is impossible to guarantee that your 
 program is still consistent afterwards.
What about the situation where we want to kill worker threads off when closing a program? For example, I have a program with a thread that does some heavy computation in the background. When the application is closed, I want it to abort that computation, however I can't just slap a receiveTimeout in the worker thread because it is doing its work in a parallel foreach loop.
Aug 05 2015
prev sibling next sibling parent "thedeemon" <dlang thedeemon.com> writes:
On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote:

 I want to stop (and abort) the worker as soon as new input 
 arrives. However, while executing the function that contains 
 the foreach-loop the worker thread doesn't listen, because it's 
 busy, of course.
I think this is a matter of architecture. If you want to use message-passing and you want the worker to react quickly to new events, this means it needs to check for new messages (via receiveTimeout) often enough, there's no way around it.
Aug 05 2015
prev sibling parent reply "Kagamin" <spam here.lot> writes:
On Tuesday, 4 August 2015 at 15:19:51 UTC, Chris wrote:
         foreach (ref i; 0..10)
         {
           writefln("%d.\tDoing something with input %s", i+1, 
 input);
           Thread.sleep(500.msecs);
         }
AFAIK, boost does it by integrating support for interruption into various functions, so IO, waits and locks reply to interrupt requests appropriately. You can do something similar.
Aug 06 2015
parent "Chris" <wendlec tcd.ie> writes:
On Thursday, 6 August 2015 at 08:40:58 UTC, Kagamin wrote:        
}
 AFAIK, boost does it by integrating support for interruption 
 into various functions, so IO, waits and locks reply to 
 interrupt requests appropriately. You can do something similar.
I understand the philosophy behind D-threads. However, I have a situation where waiting for a thread to react to an abort signal (if it reacts at all) and finish according to a protocol can cause a delay that may not be acceptable to the user or cause inconsistencies. Instant abortion works best with data sharing. However, then I have the ugly situation where I have to place the abort flag at strategical places in several functions/blocks to make sure the task will not be pursued, because you never know when exactly the new input will arrive. In this way it can be intercepted. Unfortunately, this is messy and it is not easy to avoid data races. A possible solution would be to halt all threads except for the main thread, spawn a new thread, and end the old thread silently behind the scenes. I'm not sure, if this is possible though. I also wonder, if it would be possible to use some sort of observer that never sleeps.
Aug 06 2015