www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Multi-processing : processes, mutexes and shared data

Hello there,

I wanted to create a multi-process program, and share some data 
between the processes (for a heavy task, so a thread wouldn't be 
efficient). However, I didn't see any way to multi-process a 
function in the druntime/phobos, so I created a simple Process 
class :
```
module process;

import core.sys.posix.unistd : fork, _exit;
import core.sys.posix.sys.wait;
import std.process : ProcessException;
import std.functional : toDelegate;

class Process
{
private:
     pid_t pid;

public:
     this(T, Args...)(T function(Args) fn, Args args)
     {
         this(fn.toDelegate, args);
     }

     this(T, Args...)(T delegate(Args) dg, Args args)
         if (is(T == int) || is(T == void))
     {
         pid_t pid = fork();
         if (pid < 0)
         {
             throw ProcessException.newFromErrno("Failed to spawn 
new process");
         }
         else if (pid == 0)
         {
             // child
             static if (is(T == int))
             {
                 _exit(dg(args));
             }
             else
             {
                 dg(args);
                 _exit(0);
             }
         }
         else
         {
             // parent
             this.pid = pid;
         }
     }

     int wait()
     {
         int status = void;
         waitpid(pid, &status, 0);
         return WEXITSTATUS(status);
     }
}
```

Nice. Now, let's use it :

```
import process;
import std.stdio;
import core.thread.osthread : Thread;
import core.time;


void job(int i)
{
     Thread.sleep(dur!"seconds"(1));
     writeln(i);
}

void main()
{
     enum NB_PROCESS = 15;

     Process[NB_PROCESS] processes;
     foreach(i ; 0 .. NB_PROCESS)
     {
         processes[i] = new Process(&job, i);
     }

     scope(exit)
     {
         foreach(i ; 0 .. NB_PROCESS)
         {
             processes[i].wait();
         }
     }
}
```
[x] create multiple processes.

Ok, now we need to share data between all the processes.

Maybe the `synchronized` keyword ? 
[no](https://dlang.org/spec/statement.html#SynchronizedStatement), that's only
for threads.
Let's use a Mutex from 
[core.sync.mutex](https://dlang.org/phobos/core_sync_mutex.html) 
then. Wait, nope, it does not support multi-processes, 
`pthread_mutexattr_getpshared` is not set to 
`PTHREAD_PROCESS_SHARED` [[source 
code](https://github.com/dlang/dmd/blob/master/druntime/src/core/sync/mutex.d#L92=)][[man](https://man7.org/linux/man-pages/man3/pthread_mutexattr_getpshared.3.html)].

So, I need to modify it. Let's copy-pasta `core.sync.mutex` and 
add the following modifications to the [line 
92](https://github.com/dlang/dmd/blob/69ab16a7e81c24bc893851d1fbf68a0ae8baeb53/druntime/src/cor
/sync/mutex.d#L92=) :
```
             !pthread_mutexattr_setpshared(&attr, 
PTHREAD_PROCESS_SHARED) ||
                 abort("Error: pthread_mutexattr_setpshared 
failed.");
```
Rename the module to avoid conflicts, and voilĂ  !

[x] Mutex for multiple processes.

Now let's give it a try. But how to check if it correctly works ? 
We need to create a shared memory space. Maybe a `shared` 
variable ? 
[No](http://ddili.org/ders/d.en/concurrency_shared.html), same 
problem as before ; it's only for multi-threading. Arg, got it. D 
isn't made for multi-processing, I'll code it.

*3 hours later* :
```
module shared_memory;

import core.sys.posix.sys.mman;
import core.sys.posix.unistd;
import core.sys.posix.fcntl;
import std.conv : emplace;

class SharedMemory(T, Args...)
{
private:
     static if (is(T == class))
         T cl;
     T* ptr;
     string ident;

public:
      nogc
     this(string identifier, Args args)
     {
         ident = identifier;

         static if (is(T == class))
             size_t size = __traits(classInstanceSize, T);
         else
             size_t size = T.sizeof;

         int shm_fd = shm_open(ident.ptr, O_CREAT | O_RDWR, 
0x1b6); // 0x1b6 = 0666 (octal)
         ftruncate(shm_fd, size);
         void[] memory = mmap(null, size, PROT_READ | PROT_WRITE, 
MAP_SHARED, shm_fd, 0)[0..size];

         static if (is(T == class))
         {
             cl = emplace!(T, Args)(memory, args);
             ptr = &cl;
         }
         else
         {
             ptr = emplace!(T, Args)(memory, args);
         }
     }

     // nogc
     void unlink()
     {
         static if (is(T == class))
             destroy(cl);
         shm_unlink(ident.ptr); // shm_unlink also closes the file 
descriptor
     }

     T* data()
     {
         return ptr;
     }
}
```

Seems to work, now let's create a test program :
```
import process;
import mutex;
import shared_memory;

import std.stdio;
import core.thread.osthread : Thread;
import core.time;

void job(int i, SharedMemory!Mutex mutex, SharedMemory!ulong sm)
{
     auto m = *mutex.data;
     Thread.sleep(dur!"msecs"(1000)); // force the scheduler to 
change the execution context
     m.lock();
     Thread.sleep(dur!"msecs"(100)); // ditto
     foreach (j; 1 .. 100)
         *sm.data += i * j - i + i*i*i*i; // "random" calculations
     m.unlock();
}

void main()
{
     enum NB_PROCESS = 100;

     auto sum = new SharedMemory!ulong("sum");
     auto mutex = new SharedMemory!Mutex("mutex");

     *sum.data = 0;

     Process[NB_PROCESS] processes;
     foreach(i ; 0 .. NB_PROCESS)
     {
         processes[i] = new Process(&job, i, mutex, sum);
     }

     foreach(i ; 0 .. NB_PROCESS)
     {
         processes[i].wait();
     }

     if (*sum.data != 193107012120)
     {
         writeln("failed ! sum: ", *sum.data);
     }

     import core.memory : GC;
     GC.collect(); // force collection

     sum.unlink();
     mutex.unlink();
}
```
Let's run our program a hundred times : `for i in 0..100; do 
./mulproc; done`. No output, so everything worked properly ! Yaay 
!

[x] Share data between processes

*Achievement get: multi-processing in D.*

---

Fine. This was fun to do.

However, I have two questions :
- Why wasn't this implemented in druntime/phobos ?
- Can we implement this (also for Non-POSIX systems) in 
druntime/phobos ? If yes, how to do it properly (i.e. by not 
using my code)

Bests,
Luhrel
Jul 15 2022