www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Simplest multithreading example

reply Brian <brian.patrick.forbes gmail.com> writes:
Hello, I am trying to get the most trivial example of 
multithreading working, but can't seem to figure it out.
I want to split a task across threads, and wait for all those 
tasks to finish before moving to the next line of code.

The following 2 attempts have failed :

-----------------------------------------------------
Trial 1 :
-----------------------------------------------------

auto I = std.range.iota(0,500);
int [] X; // output
foreach (i; parallel(I) )
     X ~= i;
core.thread.thread_joinAll(); // Apparently no applicable here ?
writeln(X); // some random subset of indices

------------------------------------------------
Trial 2 : (closer to Java)
------------------------------------------------
class DerivedThread : Thread
{
     int [] X;
     int i;
     this(int [] X, int i){
         this.X = X;
	this.i = i;
         super(&run);
     }

     private:
         void run(){
             X ~= i;
         }
}

void main(){
	auto I = std.range.iota(0,500);
	int [] X; // output
	Thread [] threads;
	foreach (i; I )
		threads ~= new DerivedThread( X,i);
	foreach( thread; threads)
		thread.start();
	foreach( thread; threads)
		thread.join(); // does not seem to do anything
	core.thread.thread_joinAll(); // also not doing anything

	writeln(X); // X contains nothing at all
}

How can I get the program to wait until all threads have finished 
before moving to the next line of code ?

Thank you !
Aug 31 2017
next sibling parent Michael Coulombe <kirsybuu gmail.com> writes:
On Friday, 1 September 2017 at 01:59:07 UTC, Brian wrote:
 Hello, I am trying to get the most trivial example of 
 multithreading working, but can't seem to figure it out.
 I want to split a task across threads, and wait for all those 
 tasks to finish before moving to the next line of code.

 The following 2 attempts have failed :

 -----------------------------------------------------
 Trial 1 :
 -----------------------------------------------------

 auto I = std.range.iota(0,500);
 int [] X; // output
 foreach (i; parallel(I) )
     X ~= i;
 core.thread.thread_joinAll(); // Apparently no applicable here ?
 writeln(X); // some random subset of indices

 ------------------------------------------------
 Trial 2 : (closer to Java)
 ------------------------------------------------
 class DerivedThread : Thread
 {
     int [] X;
     int i;
     this(int [] X, int i){
         this.X = X;
 	this.i = i;
         super(&run);
     }

     private:
         void run(){
             X ~= i;
         }
 }

 void main(){
 	auto I = std.range.iota(0,500);
 	int [] X; // output
 	Thread [] threads;
 	foreach (i; I )
 		threads ~= new DerivedThread( X,i);
 	foreach( thread; threads)
 		thread.start();
 	foreach( thread; threads)
 		thread.join(); // does not seem to do anything
 	core.thread.thread_joinAll(); // also not doing anything

 	writeln(X); // X contains nothing at all
 }

 How can I get the program to wait until all threads have 
 finished before moving to the next line of code ?

 Thank you !
Just like a sequential loop, when you do "foreach (i; parallel(I) ) { ... }", execution will not continue past the foreach loop until all the tasks associated with each element of I have finished. Your particular example of "X ~= i" in the body of the loop is not thread-safe, so if that is the code you really intend to run, you should protect X with a Mutex or something comparable.
Aug 31 2017
prev sibling parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 08/31/2017 06:59 PM, Brian wrote:
 Hello, I am trying to get the most trivial example of multithreading
 working, but can't seem to figure it out.
 I want to split a task across threads, and wait for all those tasks to
 finish before moving to the next line of code.

 The following 2 attempts have failed :

 -----------------------------------------------------
 Trial 1 :
 -----------------------------------------------------

 auto I = std.range.iota(0,500);
 int [] X; // output
 foreach (i; parallel(I) )
     X ~= i;
 core.thread.thread_joinAll(); // Apparently no applicable here ?
As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali
Aug 31 2017
parent reply Brian <brian.patrick.forbes gmail.com> writes:
On Friday, 1 September 2017 at 04:43:29 UTC, Ali Çehreli wrote:
 On 08/31/2017 06:59 PM, Brian wrote:
 Hello, I am trying to get the most trivial example of
multithreading
 working, but can't seem to figure it out.
 I want to split a task across threads, and wait for all those
tasks to
 finish before moving to the next line of code.

 The following 2 attempts have failed :

 -----------------------------------------------------
 Trial 1 :
 -----------------------------------------------------

 auto I = std.range.iota(0,500);
 int [] X; // output
 foreach (i; parallel(I) )
     X ~= i;
 core.thread.thread_joinAll(); // Apparently no applicable
here ? As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali
Hello, thank you very much for your quick replies ! I was trying to make a trivial example, but the 'real' problem is trying to split a huge calculation to different threads. Schematically : double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily. Very interesting approach about letting only the thread touch a particular element, I will try that. FYI I did manage to make the following work, but not sure if this is really still multi-threaded ? int [] I; foreach (i; 0 .. 500) I ~= i; int [] X; // output class DerivedThread : Thread { private int [] i; this(int [] i){ this.i = i; super(&run); } private void run(){ synchronized{ // Need synchronization here ! foreach( i0; i) X ~= i0; } } } Thread [] threads; foreach (i; std.range.chunks( I, 50 ) ) threads ~= new DerivedThread( i); foreach( thread; threads) thread.start(); core.thread.thread_joinAll(); // Does in fact seem to 'join all' threads writeln(X);
Aug 31 2017
next sibling parent =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 08/31/2017 10:27 PM, Brian wrote:

 the 'real' problem is trying
 to split a huge calculation to different threads.
I still think you can take advantage of std.parallelism: https://dlang.org/phobos/std_parallelism.html Unfortunately, its features like asyncBuf, map, and amap do not stand out in the documentation. Here's my interpretation of them: http://ddili.org/ders/d.en/parallelism.html Ali
Sep 01 2017
prev sibling parent reply ag0aep6g <anonymous example.com> writes:
On 09/01/2017 07:27 AM, Brian wrote:
 double [] hugeCalc(int i){
      // Code that takes a long time
 }
 
 so if I do
 
 
 double[][int] _hugeCalcCache;
 foreach(i ; I)
     _hugeCalcCache[i] = hugeCalc(i);
 
 of course the required time is I.length * (a long time), so I wanted to 
 shorten this by splitting to different threads :
 
 foreach(i ; parallel(I) )
     _hugeCalcCache[i] = hugeCalc(i);
 
 but as you can guess, it doesn't work that easily.
Works pretty well for me: ---- double [] hugeCalc(int i) { // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } void main() { static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 10); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); } ---- That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved! Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel: ---- auto _hugeCalcCache = new double[][](10); ----
Sep 01 2017
parent reply Brian <brian.patrick.forbes gmail.com> writes:
On Friday, 1 September 2017 at 20:02:23 UTC, ag0aep6g wrote:
 On 09/01/2017 07:27 AM, Brian wrote:
 double [] hugeCalc(int i){
      // Code that takes a long time
 }
 
 so if I do
 
 
 double[][int] _hugeCalcCache;
 foreach(i ; I)
     _hugeCalcCache[i] = hugeCalc(i);
 
 of course the required time is I.length * (a long time), so I 
 wanted to shorten this by splitting to different threads :
 
 foreach(i ; parallel(I) )
     _hugeCalcCache[i] = hugeCalc(i);
 
 but as you can guess, it doesn't work that easily.
Works pretty well for me: ---- double [] hugeCalc(int i) { // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } void main() { static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 10); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); } ---- That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved! Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel: ---- auto _hugeCalcCache = new double[][](10); ----
Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : double [] hugeCalc(int i){ // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 100); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100) but this does seem to work using your other method of initialization : auto _hugeCalcCache = new double[][](100); foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); foreach( double[] x ; _hugeCalcCache) writeln( x ); // this now contains all values so I guess initializing the whole array at compile time makes it thread safe ? (The second case runs in 16 seconds on my computer.) Anyways it seems to work, thanks again !
Sep 04 2017
parent ag0aep6g <anonymous example.com> writes:
On 09/05/2017 03:15 AM, Brian wrote:
 Thanks very much for your help, I finally had time to try your 
 suggestions. The initial example you showed does indeed have the same 
 problem of not iterating over all values :
 
 
      double [] hugeCalc(int i){
      // Code that takes a long time
      import core.thread: Thread;
      import std.datetime: seconds;
      Thread.sleep(1.seconds);
      return [i];
      }
 
      static import std.range;
      import std.parallelism: parallel;
      auto I = std.range.iota(0, 100);
      double[][int] _hugeCalcCache;
      foreach(i ; parallel(I))
          _hugeCalcCache[i] = hugeCalc(i);
 
 
       writeln( _hugeCalcCache.keys ); // this is some random subset of 
 (0,100)
Yeah. As expected, associative array accesses are apparently not thread-safe. A simple writeln is a terrible way to figure that out, though. I'd suggest sorting the keys and comparing that to `I`: ---- import std.algorithm: equal, sort; auto sortedKeys = _hugeCalcCache.keys.sort; assert(sortedKeys.equal(I)); ----
 but this does seem to work using your other method of initialization :
 
 
      auto _hugeCalcCache = new double[][](100);
      foreach(i ; parallel(I))
          _hugeCalcCache[i] = hugeCalc(i);
 
      foreach( double[] x ; _hugeCalcCache)
      writeln( x ); // this now contains all values
 
 
 so I guess initializing the whole array at compile time makes it thread 
 safe ?
There's nothing compile-timey about the code. The initialization is done at run-time, but before the parallel stuff starts. Note that the type of `_hugeCalcCache` here is different from above. Here it's `double[][]`, i.e. a dynamic array. Above it's `double[][int]`, i.e. an associative array. Those types are quite different, despite their similar names. You can prepare an associative array in a similar way, before doing the parallel stuff. Then it might be thread-safe (not sure): ---- double[][int] _hugeCalcCache; /* associative array */ /* First initialize the elements serially: */ foreach(i; I) _hugeCalcCache[i] = []; /* Then do the huge calculations in parallel: */ foreach(i; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); ---- But if your keys are consecutive numbers, I see no point in using an associative array.
Sep 05 2017