www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Question on shared memory concurrency

reply Andy Valencia <dont spam.me> writes:
I tried a shared memory parallel increment.  Yes, it's basically 
a cache line thrasher, but I wanted to see what's involved in 
shared memory programming.  Even though I tried to follow all the 
rules to make true shared memory (not thread local) it appears I 
failed, as the wait loop at the end only sees its own local 250 
million increments?

import core.atomic : atomicFetchAdd;
import std.stdio : writeln;
import std.concurrency : spawn;
import core.time : msecs;
import core.thread : Thread;

const uint NSWEPT = 1_000_000_000;
const uint NCPU = 4;

void
doadd(ref shared(uint) val)
{
     for (uint count = 0; count < NSWEPT/NCPU; ++count) {
         atomicFetchAdd(val, 1);
     }
}

void
main()
{
     shared(uint) val = 0;

     for (int x = 0; x < NCPU-1; ++x) {
         spawn(&doadd, val);
     }
     doadd(val);
     while (val != NSWEPT) {
         Thread.sleep(1.msecs);
     }
}
Mar 03
parent reply "Richard (Rikki) Andrew Cattermole" <richard cattermole.co.nz> writes:
A way to do this without spawning threads manually:

```d
import std.parallelism : TaskPool, parallel, taskPool, defaultPoolThreads;
import std.stdio : writeln;
import std.range : iota;

enum NSWEPT = 1_000_000;
enum NCPU = 4;

void main() {
     import core.atomic : atomicLoad, atomicOp;

     shared(uint) value;

     defaultPoolThreads(NCPU);
     TaskPool pool = taskPool();

     foreach(_; pool.parallel(iota(NSWEPT))) {
         atomicOp!"+="(value, 1);
     }

     writeln(pool.size);
     writeln(atomicLoad(value));
}
```

Unfortunately I could only use the default task pool, creating a new one 
took too long on run.dlang.io.

I also has to decrease NSWEPT because anything larger would take too long.
Mar 03
parent reply Andy Valencia <dont spam.me> writes:
On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew 
Cattermole wrote:
 A way to do this without spawning threads manually:
...
Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations. Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
Mar 04
next sibling parent evilrat <evilrat666 gmail.com> writes:
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
 On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew 
 Cattermole wrote:
 A way to do this without spawning threads manually:
...
Thank you! Of course, a thread dispatch per atomic increment is going to be s.l.o.w., so not surprising you had to trim the iterations. Bug I still hope to be able to share memory between spawned threads, and if it isn't a shared ref of a shared variable, then what would it be? Do I have to use the memory allocator?
There is `__gshared` type qualifier, but unlike plain `shared` it is up to you to ensure valid concurrency access as stated in the docs. https://dlang.org/spec/const3.html#shared_global
Mar 04
prev sibling parent reply Andy Valencia <dont spam.me> writes:
On Monday, 4 March 2024 at 16:02:50 UTC, Andy Valencia wrote:
 On Monday, 4 March 2024 at 03:42:48 UTC, Richard (Rikki) Andrew 
 Cattermole wrote:
 ... I still hope to be able to share memory between spawned 
 threads, and if it isn't a shared ref of a shared variable, 
 then what would it be?  Do I have to use the memory allocator?
For any other newbie dlang voyagers, here's a version which works as expected using the system memory allocator. On my little i7 I get 1.48 secs wallclock with 5.26 CPU seconds. import core.atomic : atomicFetchAdd; import std.concurrency : spawn; import core.time : msecs; import core.thread : Thread; import core.memory : GC; const uint NSWEPT = 100_000_000; const uint NCPU = 4; void doadd(shared uint *buf) { for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(buf[0], 1); } } void main() { shared uint *buf = cast(shared uint *)GC.calloc(uint.sizeof * 1, GC.BlkAttr.NO_SCAN); for (uint x = 0; x < NCPU-1; ++x) { spawn(&doadd, buf); } doadd(buf); while (buf[0] != NSWEPT) { Thread.sleep(1.msecs); } }
Mar 04
parent Andy Valencia <dont spam.me> writes:
On Monday, 4 March 2024 at 18:08:52 UTC, Andy Valencia wrote:
 For any other newbie dlang voyagers, here's a version which 
 works as expected using the system memory allocator.  On my 
 little i7 I get 1.48 secs wallclock with 5.26 CPU seconds.
...
Using a technique I found in a unit test in std/concurrency.d, I managed to share process memory without GC. It counted up to 1,000,000,000 on my low end i7 in: real 0m15.666s user 0m59.913s sys 0m0.004s import core.atomic : atomicFetchAdd; import std.concurrency : spawn, send, receiveOnly, ownerTid; import core.thread : Thread; const uint NSWEPT = 1_000_000_000; const uint NCPU = 4; void doadd() { auto val = receiveOnly!(shared(int)[]); for (uint count = 0; count < NSWEPT/NCPU; ++count) { atomicFetchAdd(val[0], 1); } ownerTid.send(true); } void main() { static shared int[] val = new shared(int)[1]; // Parallel workers for (int x = 0; x < NCPU; ++x) { auto tid = spawn(&doadd); tid.send(val); } // Pick up all completed workers for (int x = 0; x < NCPU; ++x) { receiveOnly!(bool); } assert(val[0] == NSWEPT); }
Mar 05