www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - std.parallelism Question

reply Jonathan Crapuchettes <jcrapuchettes gmail.com> writes:
I've been looking at your std.parallelism module and am really interested in
it, 
but there is one problem that I have run into that I can't seem to find a 
solution. If I have code that requires a database connection in each thread, I 
don't really want to be creating and then deleting a lot (e.g. 500,000) of 
connections while using taskPool.parallel. Normally I would create a set of 
objects that extend Thread and create a single connection in each. Can you
offer 
a suggestion how I might handle this situation? Here is an example of my
problem:

string[] array = ...;
foreach (string element, taskPool.parallel(array))
{
     //Create database connection
     //Run some complex code with database query
}

I did come up with a solution, but it was a bit of a hack. I changed the 
definition of TaskPool to be a templated class:

final class TaskPool(ThreadType : Thread = Thread)

And this allowed me to create instances of my own worker class. The final usage 
looks like the following:

//Init the thread pool
TaskPool!(Worker) pool = new TaskPool!(Worker)(THREADS);

Worker mainThreadWorker = new Worker();
try {
     foreach (string element; pool.parallel(list, 4))
     {
         if (thread_isMainThread())
             mainThreadWorker.work(element);
         else
             (cast(Worker)Thread.getThis()).work(element);
     }
} finally {
     pool.finish();
     thread_joinAll();
}

class Worker : Thread
{
	private MysqlConnection db;

	public this(void delegate() dg)
	{
		super(dg);
		db = new MysqlConnection();
	}
}


The code works and I am using it to do millions of calculations. Thoughts?
Thank you,
Jonathan
Apr 19 2011
parent reply dsimcha <dsimcha yahoo.com> writes:
== Quote from Jonathan Crapuchettes (jcrapuchettes gmail.com)'s article
 I've been looking at your std.parallelism module and am really interested in
it,
 but there is one problem that I have run into that I can't seem to find a
 solution. If I have code that requires a database connection in each thread, I
 don't really want to be creating and then deleting a lot (e.g. 500,000) of
 connections while using taskPool.parallel. Normally I would create a set of
 objects that extend Thread and create a single connection in each. Can you
offer
 a suggestion how I might handle this situation?

These kinds of situations are **exactly** what worker-local storage is for. Roughly: // new MysqlConnection() is lazy, you get one per worker thread. auto connections = taskPool.workerLocalStorage(new MysqlConnection()); foreach(element; taskPool.parallel(array)) { // Get the worker-local instance of connection. auto connection = connections.get; // Do stuff. } // Now that we're done with the parallel part of the algorithm, // we can access all the connections to free them. foreach(c; connections.toRange) { c.free(); }
Apr 19 2011
parent Jonathan Crapuchettes <jcrapuchettes gmail.com> writes:
Thank you for your quick reply. I am now starting to see how it should be used.

As a little bit of encouragement, your code is being used in in-house 
applications doing millions to trillions of calculations that are creating 
leading edge economic data.

Thank you again,
Jonathan

dsimcha wrote:
 == Quote from Jonathan Crapuchettes (jcrapuchettes gmail.com)'s article
 I've been looking at your std.parallelism module and am really interested in
it,
 but there is one problem that I have run into that I can't seem to find a
 solution. If I have code that requires a database connection in each thread, I
 don't really want to be creating and then deleting a lot (e.g. 500,000) of
 connections while using taskPool.parallel. Normally I would create a set of
 objects that extend Thread and create a single connection in each. Can you
offer
 a suggestion how I might handle this situation?

These kinds of situations are **exactly** what worker-local storage is for. Roughly: // new MysqlConnection() is lazy, you get one per worker thread. auto connections = taskPool.workerLocalStorage(new MysqlConnection()); foreach(element; taskPool.parallel(array)) { // Get the worker-local instance of connection. auto connection = connections.get; // Do stuff. } // Now that we're done with the parallel part of the algorithm, // we can access all the connections to free them. foreach(c; connections.toRange) { c.free(); }

Apr 19 2011