www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Help with concurrency - traversing a DAG

reply "tbttfox" <nospam nospam.nospam> writes:
So I've got a project in mind, and at the core of that project is 
a DAG with lots of nodes. This seems to be a great candidate for 
concurrent evaluation. The problem is that a node can only be 
evaluated after all of its parents have.

I'm really new to D and static typing in general (coming from 
Python-it was way too slow for this), and I'm having trouble 
navigating the shared/const/immutables and keeping within the 
concurrency structure in D.


The actual question: Will the following strategy work? I've tried 
coding this a couple times, and it just doesn't seem to be 
working. Is there something wrong with my logic?

I make a shared list of linked nodes and pass that to each thread 
on spawn. Then the indexes of any node without a parent go into a 
queue.  The main thread then passes list indexes to the worker 
threads from that queue. When a worker thread gets an index, it 
makes a local copy of that node, evaluates it, and stores the 
data it computes with the node.

Then every child of that node is checked to see if its parents 
have all been processed.  If so, that child is added to the queue 
and the thread requests another item from the queue. When the 
queue is empty, the job is done.
Oct 18 2013
parent reply "qznc" <qznc web.de> writes:
On Friday, 18 October 2013 at 07:20:07 UTC, tbttfox wrote:
 So I've got a project in mind, and at the core of that project 
 is a DAG with lots of nodes. This seems to be a great candidate 
 for concurrent evaluation. The problem is that a node can only 
 be evaluated after all of its parents have.

 I'm really new to D and static typing in general (coming from 
 Python-it was way too slow for this), and I'm having trouble 
 navigating the shared/const/immutables and keeping within the 
 concurrency structure in D.


 The actual question: Will the following strategy work? I've 
 tried coding this a couple times, and it just doesn't seem to 
 be working. Is there something wrong with my logic?

 I make a shared list of linked nodes and pass that to each 
 thread on spawn. Then the indexes of any node without a parent 
 go into a queue.  The main thread then passes list indexes to 
 the worker threads from that queue. When a worker thread gets 
 an index, it makes a local copy of that node, evaluates it, and 
 stores the data it computes with the node.

 Then every child of that node is checked to see if its parents 
 have all been processed.  If so, that child is added to the 
 queue and the thread requests another item from the queue. When 
 the queue is empty, the job is done.
I assume your goal is to exploit parallelism for a speedup? How expensive is the "evaluates it" step? Is the traversal or the evaluation dominating the run time in the single-threaded version? How scalable do you need to be (e.g. Desktop Multicore or Cluster or Cloud)? In terms of logic, I can see no error. Though it is not clear, if you use push or pull. The master might push to the workers or the workers might pull from the master.
Oct 18 2013
parent reply "tbttfox" <nospam nospam.nospam> writes:
On Friday, 18 October 2013 at 11:40:24 UTC, qznc wrote:
 On Friday, 18 October 2013 at 07:20:07 UTC, tbttfox wrote:
 So I've got a project in mind, and at the core of that project 
 is a DAG with lots of nodes. This seems to be a great 
 candidate for concurrent evaluation. The problem is that a 
 node can only be evaluated after all of its parents have.

 I'm really new to D and static typing in general (coming from 
 Python-it was way too slow for this), and I'm having trouble 
 navigating the shared/const/immutables and keeping within the 
 concurrency structure in D.


 The actual question: Will the following strategy work? I've 
 tried coding this a couple times, and it just doesn't seem to 
 be working. Is there something wrong with my logic?

 I make a shared list of linked nodes and pass that to each 
 thread on spawn. Then the indexes of any node without a parent 
 go into a queue.  The main thread then passes list indexes to 
 the worker threads from that queue. When a worker thread gets 
 an index, it makes a local copy of that node, evaluates it, 
 and stores the data it computes with the node.

 Then every child of that node is checked to see if its parents 
 have all been processed.  If so, that child is added to the 
 queue and the thread requests another item from the queue. 
 When the queue is empty, the job is done.
I assume your goal is to exploit parallelism for a speedup? How expensive is the "evaluates it" step? Is the traversal or the evaluation dominating the run time in the single-threaded version? How scalable do you need to be (e.g. Desktop Multicore or Cluster or Cloud)? In terms of logic, I can see no error. Though it is not clear, if you use push or pull. The master might push to the workers or the workers might pull from the master.
Correct, I'm trying to exploit the parallelism for a speedup. The evaluation will eventually be the expensive part (once I write it, of course.). And this is for a desktop application, so only multicore. My not currently not working implementation has the workers make a pull request from the master. I'll post the code when I have a minute later in the day. And thanks much for the help ~tbttfox
Oct 18 2013
parent reply "qznc" <qznc web.de> writes:
On Friday, 18 October 2013 at 16:31:13 UTC, tbttfox wrote:
 My not currently not working implementation has the workers 
 make a pull request from the master.
As far as I understand you just want something working right now? Then my suggestion would be to look into std.parallelism [0]. Create a task for each node without a parent. Let the tasks create new tasks for their children. This would implement your concept quite well. The queue is actually the task queue of a thread pool. The nice thing, is that all this queueing and synchronization stuff is already implemented for you in the standard library. You could try to make the DAG immutable and create a secondary data structure for the evaluation results. [0] http://dlang.org/phobos/std_parallelism.html
Oct 18 2013
next sibling parent "tbttfox" <nospam nospam.nospam> writes:
 As far as I understand you just want something working right 
 now? Then my suggestion would be to look into std.parallelism 
 [0]. Create a task for each node without a parent. Let the 
 tasks create new tasks for their children.
Yeah, I'm just trying to get something working right now, so I will certainly look into that. Luckily I have time this weekend to grok more of this and try some new things out (assuming I don't get sidetracked by anything too shiny)
 You could try to make the DAG immutable and create a secondary 
 data structure for the evaluation results.
Again, certainly something I'll look into, but I'm a little suspicious about storing the evaluation results separately because I'll constantly be accessing and adding them. So they'll have to be shared anyway, right? So if that's the case why not just read the DAG links from the already shared data.
Oct 18 2013
prev sibling parent reply "tbttfox" <nospam nospam.nospam> writes:
On Friday, 18 October 2013 at 19:47:37 UTC, qznc wrote:
 Create a task for each node without a parent. Let the tasks 
 create new tasks for their children.
I was finally able to try this out, but I'm having a problem. My test case right now is a simple linked list, and what I think happens is that the queue empties and makes it past the pool.finish() before the worker thread can add another task to the queue. Here's the code: http://pastebin.com/LLfMyKVp Thanks again
Oct 22 2013
parent "qznc" <qznc web.de> writes:
On Tuesday, 22 October 2013 at 08:13:57 UTC, tbttfox wrote:
 On Friday, 18 October 2013 at 19:47:37 UTC, qznc wrote:
 Create a task for each node without a parent. Let the tasks 
 create new tasks for their children.
I was finally able to try this out, but I'm having a problem. My test case right now is a simple linked list, and what I think happens is that the queue empties and makes it past the pool.finish() before the worker thread can add another task to the queue. Here's the code: http://pastebin.com/LLfMyKVp Thanks again
This pool.finish seems to be the wrong way to wait. I think you have to implement your own.
Oct 23 2013