digitalmars.D.learn - Simplest multithreading example
- Brian (47/47) Aug 31 2017 Hello, I am trying to get the most trivial example of
- Michael Coulombe (8/55) Aug 31 2017 Just like a sequential loop, when you do "foreach (i; parallel(I)
- =?UTF-8?Q?Ali_=c3=87ehreli?= (24/37) Aug 31 2017 As Michael Coulombe said, parallel() does that implicitly.
- Brian (46/89) Aug 31 2017 Hello, thank you very much for your quick replies !
- =?UTF-8?Q?Ali_=c3=87ehreli?= (7/9) Sep 01 2017 I still think you can take advantage of std.parallelism:
- ag0aep6g (29/47) Sep 01 2017 Works pretty well for me:
Hello, I am trying to get the most trivial example of
multithreading working, but can't seem to figure it out.
I want to split a task across threads, and wait for all those
tasks to finish before moving to the next line of code.
The following 2 attempts have failed :
-----------------------------------------------------
Trial 1 :
-----------------------------------------------------
auto I = std.range.iota(0,500);
int [] X; // output
foreach (i; parallel(I) )
X ~= i;
core.thread.thread_joinAll(); // Apparently no applicable here ?
writeln(X); // some random subset of indices
------------------------------------------------
Trial 2 : (closer to Java)
------------------------------------------------
class DerivedThread : Thread
{
int [] X;
int i;
this(int [] X, int i){
this.X = X;
this.i = i;
super(&run);
}
private:
void run(){
X ~= i;
}
}
void main(){
auto I = std.range.iota(0,500);
int [] X; // output
Thread [] threads;
foreach (i; I )
threads ~= new DerivedThread( X,i);
foreach( thread; threads)
thread.start();
foreach( thread; threads)
thread.join(); // does not seem to do anything
core.thread.thread_joinAll(); // also not doing anything
writeln(X); // X contains nothing at all
}
How can I get the program to wait until all threads have finished
before moving to the next line of code ?
Thank you !
Aug 31 2017
On Friday, 1 September 2017 at 01:59:07 UTC, Brian wrote:
Hello, I am trying to get the most trivial example of
multithreading working, but can't seem to figure it out.
I want to split a task across threads, and wait for all those
tasks to finish before moving to the next line of code.
The following 2 attempts have failed :
-----------------------------------------------------
Trial 1 :
-----------------------------------------------------
auto I = std.range.iota(0,500);
int [] X; // output
foreach (i; parallel(I) )
X ~= i;
core.thread.thread_joinAll(); // Apparently no applicable here ?
writeln(X); // some random subset of indices
------------------------------------------------
Trial 2 : (closer to Java)
------------------------------------------------
class DerivedThread : Thread
{
int [] X;
int i;
this(int [] X, int i){
this.X = X;
this.i = i;
super(&run);
}
private:
void run(){
X ~= i;
}
}
void main(){
auto I = std.range.iota(0,500);
int [] X; // output
Thread [] threads;
foreach (i; I )
threads ~= new DerivedThread( X,i);
foreach( thread; threads)
thread.start();
foreach( thread; threads)
thread.join(); // does not seem to do anything
core.thread.thread_joinAll(); // also not doing anything
writeln(X); // X contains nothing at all
}
How can I get the program to wait until all threads have
finished before moving to the next line of code ?
Thank you !
Just like a sequential loop, when you do "foreach (i; parallel(I)
) { ... }", execution will not continue past the foreach loop
until all the tasks associated with each element of I have
finished.
Your particular example of "X ~= i" in the body of the loop is
not thread-safe, so if that is the code you really intend to run,
you should protect X with a Mutex or something comparable.
Aug 31 2017
On 08/31/2017 06:59 PM, Brian wrote:
Hello, I am trying to get the most trivial example of multithreading
working, but can't seem to figure it out.
I want to split a task across threads, and wait for all those tasks to
finish before moving to the next line of code.
The following 2 attempts have failed :
-----------------------------------------------------
Trial 1 :
-----------------------------------------------------
auto I = std.range.iota(0,500);
int [] X; // output
foreach (i; parallel(I) )
X ~= i;
core.thread.thread_joinAll(); // Apparently no applicable here ?
As Michael Coulombe said, parallel() does that implicitly.
If the problem is to generate numbers in parallel, I restructured the
code by letting each thread touch only its element of a results array
that has already been resized for all the results (so that there is no
race condition):
import std.stdio;
import std.parallelism;
import std.range;
void main() {
auto arrs = new int[][](totalCPUs);
const perWorker = 10;
foreach (i, arr; parallel(arrs)) {
const beg = cast(int)i * perWorker;
const end = beg + perWorker;
arrs[i] = std.range.iota(beg,end).array;
}
writeln(arrs);
}
If needed, std.algorithm.joiner can be used to make it a single sequence
of ints:
import std.algorithm;
writeln(arrs.joiner);
Ali
Aug 31 2017
On Friday, 1 September 2017 at 04:43:29 UTC, Ali Çehreli wrote:On 08/31/2017 06:59 PM, Brian wrote:Hello, thank you very much for your quick replies ! I was trying to make a trivial example, but the 'real' problem is trying to split a huge calculation to different threads. Schematically : double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily. Very interesting approach about letting only the thread touch a particular element, I will try that. FYI I did manage to make the following work, but not sure if this is really still multi-threaded ? int [] I; foreach (i; 0 .. 500) I ~= i; int [] X; // output class DerivedThread : Thread { private int [] i; this(int [] i){ this.i = i; super(&run); } private void run(){ synchronized{ // Need synchronization here ! foreach( i0; i) X ~= i0; } } } Thread [] threads; foreach (i; std.range.chunks( I, 50 ) ) threads ~= new DerivedThread( i); foreach( thread; threads) thread.start(); core.thread.thread_joinAll(); // Does in fact seem to 'join all' threads writeln(X);Hello, I am trying to get the most trivial example ofmultithreadingworking, but can't seem to figure it out. I want to split a task across threads, and wait for all thosetasks tofinish before moving to the next line of code. The following 2 attempts have failed : ----------------------------------------------------- Trial 1 : ----------------------------------------------------- auto I = std.range.iota(0,500); int [] X; // output foreach (i; parallel(I) ) X ~= i; core.thread.thread_joinAll(); // Apparently no applicablehere ? As Michael Coulombe said, parallel() does that implicitly. If the problem is to generate numbers in parallel, I restructured the code by letting each thread touch only its element of a results array that has already been resized for all the results (so that there is no race condition): import std.stdio; import std.parallelism; import std.range; void main() { auto arrs = new int[][](totalCPUs); const perWorker = 10; foreach (i, arr; parallel(arrs)) { const beg = cast(int)i * perWorker; const end = beg + perWorker; arrs[i] = std.range.iota(beg,end).array; } writeln(arrs); } If needed, std.algorithm.joiner can be used to make it a single sequence of ints: import std.algorithm; writeln(arrs.joiner); Ali
Aug 31 2017
On 08/31/2017 10:27 PM, Brian wrote:the 'real' problem is trying to split a huge calculation to different threads.I still think you can take advantage of std.parallelism: https://dlang.org/phobos/std_parallelism.html Unfortunately, its features like asyncBuf, map, and amap do not stand out in the documentation. Here's my interpretation of them: http://ddili.org/ders/d.en/parallelism.html Ali
Sep 01 2017
On 09/01/2017 07:27 AM, Brian wrote:
double [] hugeCalc(int i){
// Code that takes a long time
}
so if I do
double[][int] _hugeCalcCache;
foreach(i ; I)
_hugeCalcCache[i] = hugeCalc(i);
of course the required time is I.length * (a long time), so I wanted to
shorten this by splitting to different threads :
foreach(i ; parallel(I) )
_hugeCalcCache[i] = hugeCalc(i);
but as you can guess, it doesn't work that easily.
Works pretty well for me:
----
double [] hugeCalc(int i)
{
// Code that takes a long time
import core.thread: Thread;
import std.datetime: seconds;
Thread.sleep(1.seconds);
return [i];
}
void main()
{
static import std.range;
import std.parallelism: parallel;
auto I = std.range.iota(0, 10);
double[][int] _hugeCalcCache;
foreach(i ; parallel(I))
_hugeCalcCache[i] = hugeCalc(i);
}
----
That runs in about 3 seconds here. The serial version would of course
take about 10 seconds. So, parallelism achieved!
Though I don't know if it's safe to access an associative array
concurrently like that. I'd use a normal dynamic array instead and
initialize it before going parallel:
----
auto _hugeCalcCache = new double[][](10);
----
Sep 01 2017
On Friday, 1 September 2017 at 20:02:23 UTC, ag0aep6g wrote:On 09/01/2017 07:27 AM, Brian wrote:Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : double [] hugeCalc(int i){ // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 100); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100) but this does seem to work using your other method of initialization : auto _hugeCalcCache = new double[][](100); foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); foreach( double[] x ; _hugeCalcCache) writeln( x ); // this now contains all values so I guess initializing the whole array at compile time makes it thread safe ? (The second case runs in 16 seconds on my computer.) Anyways it seems to work, thanks again !double [] hugeCalc(int i){ // Code that takes a long time } so if I do double[][int] _hugeCalcCache; foreach(i ; I) _hugeCalcCache[i] = hugeCalc(i); of course the required time is I.length * (a long time), so I wanted to shorten this by splitting to different threads : foreach(i ; parallel(I) ) _hugeCalcCache[i] = hugeCalc(i); but as you can guess, it doesn't work that easily.Works pretty well for me: ---- double [] hugeCalc(int i) { // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } void main() { static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 10); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); } ---- That runs in about 3 seconds here. The serial version would of course take about 10 seconds. So, parallelism achieved! Though I don't know if it's safe to access an associative array concurrently like that. I'd use a normal dynamic array instead and initialize it before going parallel: ---- auto _hugeCalcCache = new double[][](10); ----
Sep 04 2017
On 09/05/2017 03:15 AM, Brian wrote:Thanks very much for your help, I finally had time to try your suggestions. The initial example you showed does indeed have the same problem of not iterating over all values : double [] hugeCalc(int i){ // Code that takes a long time import core.thread: Thread; import std.datetime: seconds; Thread.sleep(1.seconds); return [i]; } static import std.range; import std.parallelism: parallel; auto I = std.range.iota(0, 100); double[][int] _hugeCalcCache; foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); writeln( _hugeCalcCache.keys ); // this is some random subset of (0,100)Yeah. As expected, associative array accesses are apparently not thread-safe. A simple writeln is a terrible way to figure that out, though. I'd suggest sorting the keys and comparing that to `I`: ---- import std.algorithm: equal, sort; auto sortedKeys = _hugeCalcCache.keys.sort; assert(sortedKeys.equal(I)); ----but this does seem to work using your other method of initialization : auto _hugeCalcCache = new double[][](100); foreach(i ; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); foreach( double[] x ; _hugeCalcCache) writeln( x ); // this now contains all values so I guess initializing the whole array at compile time makes it thread safe ?There's nothing compile-timey about the code. The initialization is done at run-time, but before the parallel stuff starts. Note that the type of `_hugeCalcCache` here is different from above. Here it's `double[][]`, i.e. a dynamic array. Above it's `double[][int]`, i.e. an associative array. Those types are quite different, despite their similar names. You can prepare an associative array in a similar way, before doing the parallel stuff. Then it might be thread-safe (not sure): ---- double[][int] _hugeCalcCache; /* associative array */ /* First initialize the elements serially: */ foreach(i; I) _hugeCalcCache[i] = []; /* Then do the huge calculations in parallel: */ foreach(i; parallel(I)) _hugeCalcCache[i] = hugeCalc(i); ---- But if your keys are consecutive numbers, I see no point in using an associative array.
Sep 05 2017









Michael Coulombe <kirsybuu gmail.com> 