www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Support for feeding data to engine threads?

reply Jerry Quinn <jlquinn optonline.net> writes:
I'm looking at porting an app that maintains a work queue to be processed by
one of N engines and written out in order.  At first glance, std.parallelism
already provides the queue, but the Task concept appears to assume that there's
no startup cost per thread.

Am I missing something or do I need to roll a shared queue object?
Sep 12 2011
next sibling parent reply Timon Gehr <timon.gehr gmx.ch> writes:
On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by
one of N engines and written out in order.  At first glance, std.parallelism
already provides the queue, but the Task concept appears to assume that there's
no startup cost per thread.

 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.
Sep 12 2011
parent reply Jerry Quinn <jlquinn optonline.net> writes:
Timon Gehr Wrote:

 On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by
one of N engines and written out in order.  At first glance, std.parallelism
already provides the queue, but the Task concept appears to assume that there's
no startup cost per thread.

 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.

OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i < nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.
Sep 12 2011
next sibling parent reply Timon Gehr <timon.gehr gmx.ch> writes:
On 09/12/2011 08:01 PM, Jerry Quinn wrote:
 Timon Gehr Wrote:

 On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by
one of N engines and written out in order.  At first glance, std.parallelism
already provides the queue, but the Task concept appears to assume that there's
no startup cost per thread.

 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.

OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i< nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.

Calling workerLocalStorage once suffices. auto engine=taskPool.workerLocalStorage(new Engine(datafile)); This will create one engine per working thread and and the same datafile. You can access the engine from each thread with engine.get. What is the exact role of datafile? Does it have to be distinct for each engine?
Sep 12 2011
parent Jerry Quinn <jlquinn optonline.net> writes:
Timon Gehr Wrote:

 On 09/12/2011 08:01 PM, Jerry Quinn wrote:
 Timon Gehr Wrote:

 On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by
one of N engines and written out in order.  At first glance, std.parallelism
already provides the queue, but the Task concept appears to assume that there's
no startup cost per thread.

 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.

OK I guess what I'm looking for is WorkerLocalStorage. I can create an engine per thread. However, I probably need to have each thread do the initialization work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool. The impression I get is that for (int i=0; i< nthreads; i++) taskPool.workerLocalStorage(new engine(datafile)) will not get me what I want.

Calling workerLocalStorage once suffices. auto engine=taskPool.workerLocalStorage(new Engine(datafile)); This will create one engine per working thread and and the same datafile. You can access the engine from each thread with engine.get. What is the exact role of datafile? Does it have to be distinct for each engine?

datafile is just a set of parameters for configuring the engine, i.e location of data, parameter values, etc. In this setting it would be the same for each engine.
Sep 12 2011
prev sibling parent reply dsimcha <dsimcha yahoo.com> writes:
== Quote from Jerry Quinn (jlquinn optonline.net)'s article
 Timon Gehr Wrote:
 On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by



already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.


work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool.
 The impression I get is that
 for (int i=0; i < nthreads; i++)
   taskPool.workerLocalStorage(new engine(datafile))
 will not get me what I want.

The parameter for taskPool.workerLocalStorage is lazy, so it's even easier: taskPool.workerLocalStorage(new engine(datafile)); will create one new engine **per thread** because the lazy parameter is passed to it and evaluated **once per thread**. I'm not sure what an engine is in this context, though. As far as initialization, you could do something like: auto isInitialized = taskPool.workerLocalStorage(false); void doStuff() { if(!isInitialized.get) { engines.get.initialize(); isInitialized.get = true; } // Do some real processing. }
Sep 12 2011
parent reply Jerry Quinn <jlquinn optonline.net> writes:
dsimcha Wrote:

 == Quote from Jerry Quinn (jlquinn optonline.net)'s article
 Timon Gehr Wrote:
 On 09/12/2011 07:23 PM, Jerry Quinn wrote:
 I'm looking at porting an app that maintains a work queue to be processed by



already provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

I don't know if I get you right, but std.parallelism uses a task pool. Usually no threads are started or stopped during processing.


work. If I create the engine on the main thread, it won't be properly accessed by the worker thread, right? I.e. I need each thread to run engine.init() which will do a whole pile of loading and setup first before I can start feeding data to the pool.
 The impression I get is that
 for (int i=0; i < nthreads; i++)
   taskPool.workerLocalStorage(new engine(datafile))
 will not get me what I want.

The parameter for taskPool.workerLocalStorage is lazy, so it's even easier: taskPool.workerLocalStorage(new engine(datafile)); will create one new engine **per thread** because the lazy parameter is passed to it and evaluated **once per thread**. I'm not sure what an engine is in this context, though.

An engine is simply a complex class that takes a lot of memory and setup work to prepare. I'm working on language processing, so we use large multiGB models. Great, I'm getting closer. So the evaluation happens in the worker thread, not the main thread, right? The other thing I think I have to do is sequence the initializations. Otherwise the disk/network will thrash trying to load N copies simultaneously. I imagine this can be done with a shared variable inside the engine.
 As far as initialization, you could do something like:
 
 auto isInitialized = taskPool.workerLocalStorage(false);
 
 void doStuff() {
     if(!isInitialized.get) {
         engines.get.initialize();
         isInitialized.get = true;
     }
 
     // Do some real processing.
 }

I prefer to do all the initializations before work starts so that timing can be more accurate. These things take a lot to get fully loaded and ready to run. Thanks for all the help! It might be useful to indicate in the workerLocalData call that the lazy processing happens in the worker's thread. Jerry
Sep 12 2011
next sibling parent dsimcha <dsimcha yahoo.com> writes:
On 9/12/2011 3:32 PM, Jerry Quinn wrote:
 Thanks for all the help!  It might be useful to indicate in the
workerLocalData call that the lazy processing happens in the worker's thread.

Sorry for the misunderstanding. When workerLocalStorage() is called, the lazy variable is evaluated once per worker thread, but it's evaluated **in the thread that workerLocalStorage() is called from**, **before workerLocalStorage() returns**.
Sep 12 2011
prev sibling parent dsimcha <dsimcha yahoo.com> writes:
On 9/12/2011 3:32 PM, Jerry Quinn wrote:
 The other thing I think I have to do is sequence the initializations. 
Otherwise the disk/network will thrash trying to load N copies simultaneously. 
I imagine this can be done with a shared variable inside the engine.

This can easily be done by creating null engines in the WorkerLocalStorage and then initializing them inside a synchronized block: auto wlEngines = taskPool.workerLocalStorage(Engine.init); auto wlIsInitialized = taskPool.workerLocalStorage(false); // This gets executed on the pool in a Task: void taskFunction() { if(!wlIsInitialized.get) { // Do initialization of thread-local engine in worker thread. synchronized wlEngines.get = new Engine(ctorArgs); wlIsInitialized.get = true; } // Do main processing. }
 I prefer to do all the initializations before work starts so that timing can
be more accurate.  These things take a lot to get fully loaded and ready to run.

Unfortunately I don't have a good answer for this constraint. If you can offer a concrete enhancement request for std.parallelism, I'll certainly consider it, but I can't think of a good design for this kind of per-thread initialization routine off the top of my head, either in terms of API or implementation.
Sep 12 2011
prev sibling next sibling parent reply dsimcha <dsimcha yahoo.com> writes:
== Quote from Jerry Quinn (jlquinn optonline.net)'s article
 I'm looking at porting an app that maintains a work queue to be processed by
one

provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.
Sep 12 2011
parent reply Jerry Quinn <jlquinn optonline.net> writes:
dsimcha Wrote:

 == Quote from Jerry Quinn (jlquinn optonline.net)'s article
 I'm looking at porting an app that maintains a work queue to be processed by
one

provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.

The question I was asking was how to execute a huge amount of per-thread initialization once per thread in the TaskPool framework.
Sep 12 2011
parent reply dsimcha <dsimcha yahoo.com> writes:
== Quote from Jerry Quinn (jlquinn optonline.net)'s article
 dsimcha Wrote:
 == Quote from Jerry Quinn (jlquinn optonline.net)'s article
 I'm looking at porting an app that maintains a work queue to be processed by
one

provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.


Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one should be added. I've kind of needed it, too, and I don't anticipate it being very hard to implement. If you can suggest a good API for this, I'll work on it for next release.
Sep 12 2011
parent reply Jerry Quinn <jlquinn optonline.net> writes:
dsimcha Wrote:

 == Quote from Jerry Quinn (jlquinn optonline.net)'s article
 dsimcha Wrote:
 == Quote from Jerry Quinn (jlquinn optonline.net)'s article
 I'm looking at porting an app that maintains a work queue to be processed by
one

provides the queue, but the Task concept appears to assume that there's no startup cost per thread.
 Am I missing something or do I need to roll a shared queue object?

Not sure if I'm misunderstanding what you're asking, but I'll answer the question I think you're asking. std.parallelism's Task can be executed in two ways. executeInNewThread() does start a new thread for every Task. However, you can also submit a Task to a TaskPool and have it executed by an existing worker thread. The whole point of using TaskPool to execute a Task is to avoid paying the thread start-up cost for every Task.


Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one should be added. I've kind of needed it, too, and I don't anticipate it being very hard to implement. If you can suggest a good API for this, I'll work on it for next release.

I'd probably naturally want to do something like: auto engines = taskPool.init(new Engine(params)); auto lines = File("foo.txt").byLine(); auto results = taskPool.map!(engines.process)(lines) foreach (auto r; results) writeln(r); This would create a set of new Engine objects per object, but initialized within each thread as thread-local data. Each Engine object has a process() call taking a line as input and returning a string in this case. Any free engine can process more work as long as there is still work available. For the large-scale use case, it could take an extra argument and allow only N simultaneous init's under the covers.
Sep 12 2011
parent dsimcha <dsimcha yahoo.com> writes:
== Quote from Jerry Quinn (jlquinn optonline.net)'s article
 Hmm, currently there isn't an easy/obvious way, though I'm thinking maybe one
 should be added.  I've kind of needed it, too, and I don't anticipate it being
 very hard to implement.  If you can suggest a good API for this, I'll work on
it
 for next release.

auto engines = taskPool.init(new Engine(params)); auto lines = File("foo.txt").byLine(); auto results = taskPool.map!(engines.process)(lines) foreach (auto r; results) writeln(r); This would create a set of new Engine objects per object, but initialized within

a line as input and returning a string in this case. Any free engine can process more work as long as there is still work available.
 For the large-scale use case, it could take an extra argument and allow only N

From reading your later posts, it seems like what you really want is worker-local storage that's initialized from the thread that owns it rather than the thread that workerLocalStorage() was called from. This may be do-able but I'll have to think about the consequences and the implementation. In the mean time, see my other posts for a workaround.
Sep 12 2011
prev sibling parent reply David Nadlinger <see klickverbot.at> writes:
On 9/12/11 7:23 PM, Jerry Quinn wrote:
 Am I missing something or do I need to roll a shared queue object?

A concurrent queue (SharedQueue?) would be a nice addition to Phobos regardless, imho. David
Sep 13 2011
parent dsimcha <dsimcha yahoo.com> writes:
== Quote from David Nadlinger (see klickverbot.at)'s article
 On 9/12/11 7:23 PM, Jerry Quinn wrote:
 Am I missing something or do I need to roll a shared queue object?

regardless, imho. David

A lock-free queue is one of the examples in TDPL if I recall correctly. Maybe we should just put a slightly embellished version of that in.
Sep 13 2011