www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Simple parallel foreach and summation/reduction

reply Chris Katko <ckatko gmail.com> writes:
All I want to do is loop from 0 to [constant] with a for or 
foreach, and have it split up across however many cores I have.

     ulong sum;
     foreach(i; [0 to 1 trillion])
       {
       //flip some dice using
       float die_value = uniform(0F,12F);
       if(die_value > [constant]) sum++;
       }
     writeln("The sum is %d", sum);

However, there are two caveats.:

  - One: I can't throw a range of values into an array and foreach 
on that like many examples use. Because 1 trillion (counting from 
zero) might be a little big for an array. (I'm using 1 trillion 
to illustrate a specific bottleneck / problem form.)

  - I want to merge the results at the end.

Which means I either need to use mutexes (BAD. NO. BOO. HISS.)  
or each "thread" would need to know if it's separate, and then 

all were completed, add those sums together.

I know this is an incredibly simple conceptual problem to solve. 
So I feel like I'm missing some huge, obvious, answer for doing 
it elegantly in D.

And this just occurred to me, if I had a trillion foreach, will 
that make 1 trillion threads? What I want is, IIRC, what OpenMP 
does. It divides up your range (blocks of sequential numbers) by 
the number of threads. So domain of [1 to 1000] with ten threads 
would become workloads on the indexes of [1-100], [101-200], 
[201-300], and so on. for each CPU. They each get a 100 element 
chunk.

So I guess foreach won't work here for that, will it? Hmmm...

  ----> But again, conceptually this is simple: I have, say, 1 
trillion sequential numbers. I want to assign a "block" (or 
"range") to each CPU core. And since their math does not actually 
interfer with each other, I can simply sum each core's results at 
the end.

Thanks,
--Chris
Sep 19 2018
parent reply Neia Neutuladh <neia ikeran.org> writes:
On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
 All I want to do is loop from 0 to [constant] with a for or 
 foreach, and have it split up across however many cores I have.
You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking. auto taskpool = new TaskPool(); taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
Sep 19 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh 
wrote:
 On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko 
 wrote:
 All I want to do is loop from 0 to [constant] with a for or 
 foreach, and have it split up across however many cores I have.
You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking. auto taskpool = new TaskPool(); taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC. Any ideas?
Sep 21 2018
next sibling parent Dennis <dkorpel gmail.com> writes:
On Friday, 21 September 2018 at 07:25:17 UTC, Chris Katko wrote:
 I get "Error: template instance `reduce!((a, b) => a + b)` 
 cannot use local __lambda1 as parameter to non-global template 
 reduce(functions...)" when trying to compile that using the 
 online D editor with DMD and LDC.

 Any ideas?
That's a long standing issue: https://issues.dlang.org/show_bug.cgi?id=5710 Using a string for the expression does work though: ``` import std.stdio, std.parallelism, std.range; void main() { taskPool.reduce!"a + b"(iota(1_000L)).writeln; } ```
Sep 21 2018
prev sibling parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 09/21/2018 12:25 AM, Chris Katko wrote:
 On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh wrote:
 On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko wrote:
 All I want to do is loop from 0 to [constant] with a for or foreach, 
 and have it split up across however many cores I have.
You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking. auto taskpool = new TaskPool(); taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC. Any ideas?
You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism: http://ddili.org/ders/d.en/parallelism.html That chapter is missing e.g. the newly-added fold(): https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold Ali
Sep 21 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
 On 09/21/2018 12:25 AM, Chris Katko wrote:
 On Thursday, 20 September 2018 at 05:51:17 UTC, Neia Neutuladh 
 wrote:
 On Thursday, 20 September 2018 at 05:34:42 UTC, Chris Katko 
 wrote:
 All I want to do is loop from 0 to [constant] with a for or 
 foreach, and have it split up across however many cores I 
 have.
You're looking at std.parallelism.TaskPool, especially the amap and reduce functions. Should do pretty much exactly what you're asking. auto taskpool = new TaskPool(); taskpool.reduce!((a, b) => a + b)(iota(1_000_000_000_000L));
I get "Error: template instance `reduce!((a, b) => a + b)` cannot use local __lambda1 as parameter to non-global template reduce(functions...)" when trying to compile that using the online D editor with DMD and LDC. Any ideas?
You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism: http://ddili.org/ders/d.en/parallelism.html That chapter is missing e.g. the newly-added fold(): https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold Ali
Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set. T test(T)(T x, T y) { return x + y; } double monte(T)(T x) { double v = uniform(-1F, 1F); double u = uniform(-1F, 1F); if(sqrt(v*v + u*u) < 1.0) { return 1; }else{ return 0; } } auto taskpool = new TaskPool(); sum = taskpool.reduce!(test)( taskpool.amap!monte( iota(num) ) ); taskpool.finish(true); 1000000 becomes ~8MB 10000000 becomes 80MB 100000000, I can't even run because it says "Exception: Memory Allocation failed"
Sep 21 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko wrote:
 On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli wrote:
 On 09/21/2018 12:25 AM, Chris Katko wrote:
 [...]
You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism: http://ddili.org/ders/d.en/parallelism.html That chapter is missing e.g. the newly-added fold(): https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold Ali
Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set. T test(T)(T x, T y) { return x + y; } double monte(T)(T x) { double v = uniform(-1F, 1F); double u = uniform(-1F, 1F); if(sqrt(v*v + u*u) < 1.0) { return 1; }else{ return 0; } } auto taskpool = new TaskPool(); sum = taskpool.reduce!(test)( taskpool.amap!monte( iota(num) ) ); taskpool.finish(true); 1000000 becomes ~8MB 10000000 becomes 80MB 100000000, I can't even run because it says "Exception: Memory Allocation failed"
Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...
Sep 21 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Saturday, 22 September 2018 at 02:26:41 UTC, Chris Katko wrote:
 On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko 
 wrote:
 On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli 
 wrote:
 On 09/21/2018 12:25 AM, Chris Katko wrote:
 [...]
You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism: http://ddili.org/ders/d.en/parallelism.html That chapter is missing e.g. the newly-added fold(): https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold Ali
Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set. T test(T)(T x, T y) { return x + y; } double monte(T)(T x) { double v = uniform(-1F, 1F); double u = uniform(-1F, 1F); if(sqrt(v*v + u*u) < 1.0) { return 1; }else{ return 0; } } auto taskpool = new TaskPool(); sum = taskpool.reduce!(test)( taskpool.amap!monte( iota(num) ) ); taskpool.finish(true); 1000000 becomes ~8MB 10000000 becomes 80MB 100000000, I can't even run because it says "Exception: Memory Allocation failed"
Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...
So I looked into it. It's amap that explodes in RAM. Per the docs, amap has "less overhead but more memory usage." While map has more overhead but less memory usage and "avoids the need to keep all results in memory." But, if I make a call to map... it doesn't compile! I get: Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map Simply changing amap to map here: sum = taskPool.reduce!(test) ( taskPool.map!(monte)(range) );
Sep 23 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Monday, 24 September 2018 at 05:59:20 UTC, Chris Katko wrote:
 On Saturday, 22 September 2018 at 02:26:41 UTC, Chris Katko 
 wrote:
 On Saturday, 22 September 2018 at 02:13:58 UTC, Chris Katko 
 wrote:
 On Friday, 21 September 2018 at 12:15:59 UTC, Ali Çehreli 
 wrote:
 On 09/21/2018 12:25 AM, Chris Katko wrote:
 [...]
You can use a free-standing function as a workaround, which is included in the following chapter that explains most of std.parallelism: http://ddili.org/ders/d.en/parallelism.html That chapter is missing e.g. the newly-added fold(): https://dlang.org/phobos/std_parallelism.html#.TaskPool.fold Ali
Okay... so I've got it running. The problem is, it uses tons of RAM. In fact, proportional to the working set. T test(T)(T x, T y) { return x + y; } double monte(T)(T x) { double v = uniform(-1F, 1F); double u = uniform(-1F, 1F); if(sqrt(v*v + u*u) < 1.0) { return 1; }else{ return 0; } } auto taskpool = new TaskPool(); sum = taskpool.reduce!(test)( taskpool.amap!monte( iota(num) ) ); taskpool.finish(true); 1000000 becomes ~8MB 10000000 becomes 80MB 100000000, I can't even run because it says "Exception: Memory Allocation failed"
Also, when I don't call .finish(true) at the end, it just sits there forever (after running) like one of the threads won't terminate. Requiring a control-C. But the docs and examples don't seem to indicate I should need that...
So I looked into it. It's amap that explodes in RAM. Per the docs, amap has "less overhead but more memory usage." While map has more overhead but less memory usage and "avoids the need to keep all results in memory." But, if I make a call to map... it doesn't compile! I get: Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map Simply changing amap to map here: sum = taskPool.reduce!(test) ( taskPool.map!(monte)(range) );
Actually, I just realized/remembered that the error occurs inside parallelism itself, and MANY times at that: /usr/include/dmd/phobos/std/parallelism.d(2590): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2596): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2616): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2626): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map /usr/include/dmd/phobos/std/parallelism.d(2634): Error: no [] operator overload for type std.parallelism.TaskPool.map!(monte).map!(Result).map.Map monte.d(64): Error: template instance std.parallelism.TaskPool.reduce!(test).reduce!(Map) error instantiating Though I tried looking up the git version of prallelism.d and the lines don't quite line up: https://github.com/dlang/phobos/blob/master/std/parallelism.d
Sep 24 2018
parent reply Chris Katko <ckatko gmail.com> writes:
On Monday, 24 September 2018 at 07:13:24 UTC, Chris Katko wrote:
 On Monday, 24 September 2018 at 05:59:20 UTC, Chris Katko wrote:
         [...]
Actually, I just realized/remembered that the error occurs inside parallelism itself, and MANY times at that: [...]
This JUST occurred to me. When I use an outer taskPool.[a]map, am I NOT supposed to use the taskPool version of reduce?! But instead, the std.algorithm one? Because this is running with both/all cores, and only using 2.7MB of RAM: sum = taskPool.reduce!(test)( map!(monte)(range) //map, not taskPool.map ); If that's the correct case, the docs did NOT make that obvious! FYI, I went from ~5200 samples / mSec, to 7490 samples / mSec. 36% difference for second "real" core. Better than nothing, I guess. I'll have to try it on my main machine with a proper CPU.
Sep 24 2018
parent Russel Winder <russel winder.org.uk> writes:
Hi,

Apologies for coming late to this thread.

I started with:

   import std.random: uniform;
   import std.range: iota;
   import std.stdio: writeln;

   void main() {
   	ulong sum;
   	foreach(i; iota(1_000_000_000)) {
   		if (uniform(0F,12F) > 6F) sum++;
   	}
   	writeln("The sum is ", sum);
   }

and then transformed it to:

   import std.algorithm: map, reduce;
   import std.random: uniform;
   import std.range: iota;
   import std.stdio: writeln;

   void main() {
   	ulong sum =3D iota(1_000_000_000).map!((_) =3D> uniform(0F,12F) > 6F ? =
1 : 0).reduce!"a +b";
   	writeln("The sum is ", sum);
   }

and then made use of std.parallelism:

   import std.algorithm: map;
   import std.array:array;
   import std.parallelism: taskPool;
   import std.random: uniform;
   import std.range: iota;
   import std.stdio: writeln;

   void main() {
   	ulong sum =3D taskPool().reduce!"a + b"(iota(1_000_000_000).map!((_) =
=3D> uniform(0F,12F) > 6F ? 1 : 0));
   	writeln("The sum is ", sum);
   }

I am not entirely sure how to capture the memory used but roughly (since th=
is
is a one off measure and not a statistically significant experiment):

first takes 30s
second takes 30s
third takes 4s

on an ancient twin Xeon workstation, so 8 cores but all ancient and slow.

The issue here is that std.parallelism.reduce, std.parallelism.map, and
std.parallelism.amap are all "top level" work scattering functions, they al=
l
assume total control of the resources. So the above is a parallel reduce us=
ing
sequential map which works fine. Trying to mix parallel reduce and parallel
map or amap ends up with two different attempts to make use of the resource=
s
to create tasks.

std.parallelism isn't really a fork/join framework in the Java sense, if yo=
u
want tree structure parallelism, you have to do things with futures. =20

--=20
Russel.
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=
=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D
Dr Russel Winder      t: +44 20 7585 2200
41 Buckmaster Road    m: +44 7770 465 077
London SW11 1EN, UK   w: www.russel.org.uk
Sep 24 2018