www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.announce - the semi-resident thread pool

reply zsxxsz <zhengshuxin hexun.com> writes:
Hi, I written one thread pool in which each thread is semi-resident. The
thread-pool is different from the Tango's one. Any thread of the thread-pool
will exit when it is idle for the timeout. That is to say, all threads for
jobs, and no job no thread. The thread-pool was from my C version. With D, I
wrote it more easier with the delegate function.
Below is the source code:

module adl.thread_pool;

import core.sys.posix.pthread;  // just for pthread_self()
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.c.time;

private struct Job
{
        Job *next;
        void function() fn;
        void delegate() dg;
        void *arg;
        int   call;
}

/**
 * semi-daemon thread of thread pool
 */
class CThreadPool
{
public:
        /**
         * Constructs a CThreadPool
         *  param nMaxThread {int} the max number threads in thread pool
         *  param idleTimeout {int} when > 0, the idle thread will
         *  exit after idleTimeout seconds, if == 0, the idle thread
         *  will not exit
         *  param sz {size_t} when > 0, the thread will be created which
         *  stack size is sz.
         */
        this(int nMaxThread, int idleTimeout, size_t sz = 0)
        {
                m_nMaxThread = nMaxThread;
                m_idleTimeout = idleTimeout;
                m_stackSize = sz;
                m_mutex = new Mutex;
                m_cond = new Condition(m_mutex);
        }

        /**
         * Append one task into the thread pool's task queue
         *  param fn {void function()}
         */
        void append(void function() fn)
        {
                Job *job;
                char  buf[256];

                if (fn == null)
                        throw new Exception("fn null");

                job = new Job;
                job.fn = fn;
                job.next = null;
                job.call = Call.FN;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * Append one task into the thread pool's task queue
         *  param dg {void delegate()}
         */
        void append(void delegate() dg)
        {
                Job *job;
                char  buf[256];

                if (dg == null)
                        throw new Exception("dg null");

                job = new Job;
                job.dg = dg;
                job.next = null;
                job.call = Call.DG;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * If dg not null, when one new thread is created, dg will be called.
         *  param dg {void delegate()}
         */
        void onThreadInit(void delegate() dg)
        {
                m_onThreadInit = dg;
        }

        /**
         * If dg not null, before one thread exits, db will be called.
         *  param dg {void delegate()}
         */
        void onThreadExit(void delegate() dg)
        {
                m_onThreadExit = dg;
        }

private:
        enum Call { NO, FN, DG }

        Mutex m_mutex;
        Condition m_cond;
        size_t m_stackSize = 0;

        Job* m_jobHead = null, m_jobTail = null;
        int m_nJob = 0;
        bool m_isQuit = false;
        int m_nThread = 0;
        int m_nMaxThread;
        int m_nIdleThread = 0;
        int m_overloadTimeWait = 0;
        int m_idleTimeout;
        time_t m_lastWarn;

        void delegate() m_onThreadInit;
        void delegate() m_onThreadExit;
        void append(Job *job)
        {
                if (m_jobHead == null)
                        m_jobHead = job;
                else
                        m_jobTail.next = job;
                m_jobTail = job;

                m_nJob++;

                if (m_nIdleThread > 0) {
                        m_cond.notify();
                } else if (m_nThread < m_nMaxThread) {
                        Thread thread = new Thread(&doJob);
                        thread.isDaemon = true;
                        thread.start();
                        m_nThread++;
                } else if (m_nJob > 10 * m_nMaxThread) {
                        time_t now = time(null);
                        if (now - m_lastWarn >= 2) {
                                m_lastWarn = now;
                        }
                        if (m_overloadTimeWait > 0) {
                                Thread.sleep(m_overloadTimeWait);
                        }
                }
        }
        void doJob()
        {
                Job *job;
                int status;
                bool timedout;
                long period = m_idleTimeout * 10_000_000;

                if (m_onThreadInit != null)
                        m_onThreadInit();

                m_mutex.lock();
                for (;;) {
                        timedout = false;

                        while (m_jobHead == null && !m_isQuit) {
                                m_nIdleThread++;
                                if (period > 0) {
                                        try {
                                                if (m_cond.wait(period) ==
false) {
                                                        timedout = true;
                                                        break;
                                                }
                                        } catch (SyncException e) {
                                                m_nIdleThread--;
                                                m_nThread--;
                                                m_mutex.unlock();
                                                if (m_onThreadExit != null)
                                                        m_onThreadExit();
                                                throw e;
                                        }
                                } else {
                                        m_cond.wait();
                                }
                                m_nIdleThread--;
                        }  /* end while */
                        job = m_jobHead;

                        if (job != null) {
                                m_jobHead = job.next;
                                m_nJob--;
                                if (m_jobTail == job)
                                        m_jobTail = null;
                                /* the lock shuld be unlocked before enter
working processs */
                                m_mutex.unlock();
                                switch (job.call) {
                                case Call.FN:
                                        job.fn();
                                        break;
                                case Call.DG:
                                        job.dg();
                                        break;
                                default:
                                        break;
                                }

                                /* lock again */
                                m_mutex.lock();
                        }
                        if (m_jobHead == null && m_isQuit) {
                                m_nThread--;
                                if (m_nThread == 0)
                                        m_cond.notifyAll();
                                break;
                        }
                        if (m_jobHead == null && timedout) {
                                m_nThread--;
                                break;
                        }
                }

                m_mutex.unlock();

                writefln("Thread(%d) of ThreadPool exit now", pthread_self());
                if (m_onThreadExit != null)
                        m_onThreadExit();
        }
}

import std.stdio;
unittest
{
        CThreadPool pool = new CThreadPool(10, 10);

        void testThreadInit(string s)
        {
                void onThreadInit()
                {
                        writefln("thread(%d) was created now, s: %s",
pthread_self(), s);
                }
                pool.onThreadInit(&onThreadInit);
        }

        void testThreadExit(string s)
        {
                void onThreadExit()
                {
                        writefln("thread(%d) was to exit now, s: %s",
pthread_self(), s);
                }
                pool.onThreadExit(&onThreadExit);
        }

        void testAddJobs(string s)
        {
                void threadFun()
                {
                        writef("doJob thread id: %d, str: %s\n",
pthread_self(), s);
                        Thread.sleep(10_000_000);
                        writef("doJob thread id: %d, wakeup now\n",
pthread_self());
                }
                pool.append(&threadFun);
                pool.append(&threadFun);
                pool.append(&threadFun);
        }

        string s = "hello world";
        string s1 = "new thread was ok now";
        string s2 = "thread exited now";
        testThreadInit(s1);
        testThreadExit(s2);

        testAddJobs(s);

        Thread.sleep(100_000_000);
}
May 29 2009
next sibling parent reply Robert Fraser <fraserofthenight gmail.com> writes:
zsxxsz wrote:
 Hi, I written one thread pool in which each thread is semi-resident. The
 thread-pool is different from the Tango's one. Any thread of the thread-pool
 will exit when it is idle for the timeout. That is to say, all threads for
 jobs, and no job no thread. The thread-pool was from my C version. With D, I
 wrote it more easier with the delegate function.
 Below is the source code:
 
 module adl.thread_pool;
 
 import core.sys.posix.pthread;  // just for pthread_self()
 import core.thread;
 import core.sync.mutex;
 import core.sync.condition;
 import std.c.time;
 
 private struct Job
 {
         Job *next;
         void function() fn;
         void delegate() dg;
         void *arg;
         int   call;
 }
 
 /**
  * semi-daemon thread of thread pool
  */
 class CThreadPool
 {
 public:
         /**
          * Constructs a CThreadPool
          *  param nMaxThread {int} the max number threads in thread pool
          *  param idleTimeout {int} when > 0, the idle thread will
          *  exit after idleTimeout seconds, if == 0, the idle thread
          *  will not exit
          *  param sz {size_t} when > 0, the thread will be created which
          *  stack size is sz.
          */
         this(int nMaxThread, int idleTimeout, size_t sz = 0)
         {
                 m_nMaxThread = nMaxThread;
                 m_idleTimeout = idleTimeout;
                 m_stackSize = sz;
                 m_mutex = new Mutex;
                 m_cond = new Condition(m_mutex);
         }
 
         /**
          * Append one task into the thread pool's task queue
          *  param fn {void function()}
          */
         void append(void function() fn)
         {
                 Job *job;
                 char  buf[256];
 
                 if (fn == null)
                         throw new Exception("fn null");
 
                 job = new Job;
                 job.fn = fn;
                 job.next = null;
                 job.call = Call.FN;
 
                 m_mutex.lock();
                 append(job);
                 m_mutex.unlock();
         }
 
         /**
          * Append one task into the thread pool's task queue
          *  param dg {void delegate()}
          */
         void append(void delegate() dg)
         {
                 Job *job;
                 char  buf[256];
 
                 if (dg == null)
                         throw new Exception("dg null");
 
                 job = new Job;
                 job.dg = dg;
                 job.next = null;
                 job.call = Call.DG;
 
                 m_mutex.lock();
                 append(job);
                 m_mutex.unlock();
         }
 
         /**
          * If dg not null, when one new thread is created, dg will be called.
          *  param dg {void delegate()}
          */
         void onThreadInit(void delegate() dg)
         {
                 m_onThreadInit = dg;
         }
 
         /**
          * If dg not null, before one thread exits, db will be called.
          *  param dg {void delegate()}
          */
         void onThreadExit(void delegate() dg)
         {
                 m_onThreadExit = dg;
         }
 
 private:
         enum Call { NO, FN, DG }
 
         Mutex m_mutex;
         Condition m_cond;
         size_t m_stackSize = 0;
 
         Job* m_jobHead = null, m_jobTail = null;
         int m_nJob = 0;
         bool m_isQuit = false;
         int m_nThread = 0;
         int m_nMaxThread;
         int m_nIdleThread = 0;
         int m_overloadTimeWait = 0;
         int m_idleTimeout;
         time_t m_lastWarn;
 
         void delegate() m_onThreadInit;
         void delegate() m_onThreadExit;
         void append(Job *job)
         {
                 if (m_jobHead == null)
                         m_jobHead = job;
                 else
                         m_jobTail.next = job;
                 m_jobTail = job;
 
                 m_nJob++;
 
                 if (m_nIdleThread > 0) {
                         m_cond.notify();
                 } else if (m_nThread < m_nMaxThread) {
                         Thread thread = new Thread(&doJob);
                         thread.isDaemon = true;
                         thread.start();
                         m_nThread++;
                 } else if (m_nJob > 10 * m_nMaxThread) {
                         time_t now = time(null);
                         if (now - m_lastWarn >= 2) {
                                 m_lastWarn = now;
                         }
                         if (m_overloadTimeWait > 0) {
                                 Thread.sleep(m_overloadTimeWait);
                         }
                 }
         }
         void doJob()
         {
                 Job *job;
                 int status;
                 bool timedout;
                 long period = m_idleTimeout * 10_000_000;
 
                 if (m_onThreadInit != null)
                         m_onThreadInit();
 
                 m_mutex.lock();
                 for (;;) {
                         timedout = false;
 
                         while (m_jobHead == null && !m_isQuit) {
                                 m_nIdleThread++;
                                 if (period > 0) {
                                         try {
                                                 if (m_cond.wait(period) ==
 false) {
                                                         timedout = true;
                                                         break;
                                                 }
                                         } catch (SyncException e) {
                                                 m_nIdleThread--;
                                                 m_nThread--;
                                                 m_mutex.unlock();
                                                 if (m_onThreadExit != null)
                                                         m_onThreadExit();
                                                 throw e;
                                         }
                                 } else {
                                         m_cond.wait();
                                 }
                                 m_nIdleThread--;
                         }  /* end while */
                         job = m_jobHead;
 
                         if (job != null) {
                                 m_jobHead = job.next;
                                 m_nJob--;
                                 if (m_jobTail == job)
                                         m_jobTail = null;
                                 /* the lock shuld be unlocked before enter
 working processs */
                                 m_mutex.unlock();
                                 switch (job.call) {
                                 case Call.FN:
                                         job.fn();
                                         break;
                                 case Call.DG:
                                         job.dg();
                                         break;
                                 default:
                                         break;
                                 }
 
                                 /* lock again */
                                 m_mutex.lock();
                         }
                         if (m_jobHead == null && m_isQuit) {
                                 m_nThread--;
                                 if (m_nThread == 0)
                                         m_cond.notifyAll();
                                 break;
                         }
                         if (m_jobHead == null && timedout) {
                                 m_nThread--;
                                 break;
                         }
                 }
 
                 m_mutex.unlock();
 
                 writefln("Thread(%d) of ThreadPool exit now", pthread_self());
                 if (m_onThreadExit != null)
                         m_onThreadExit();
         }
 }
 
 import std.stdio;
 unittest
 {
         CThreadPool pool = new CThreadPool(10, 10);
 
         void testThreadInit(string s)
         {
                 void onThreadInit()
                 {
                         writefln("thread(%d) was created now, s: %s",
 pthread_self(), s);
                 }
                 pool.onThreadInit(&onThreadInit);
         }
 
         void testThreadExit(string s)
         {
                 void onThreadExit()
                 {
                         writefln("thread(%d) was to exit now, s: %s",
 pthread_self(), s);
                 }
                 pool.onThreadExit(&onThreadExit);
         }
 
         void testAddJobs(string s)
         {
                 void threadFun()
                 {
                         writef("doJob thread id: %d, str: %s\n",
 pthread_self(), s);
                         Thread.sleep(10_000_000);
                         writef("doJob thread id: %d, wakeup now\n",
 pthread_self());
                 }
                 pool.append(&threadFun);
                 pool.append(&threadFun);
                 pool.append(&threadFun);
         }
 
         string s = "hello world";
         string s1 = "new thread was ok now";
         string s2 = "thread exited now";
         testThreadInit(s1);
         testThreadExit(s2);
 
         testAddJobs(s);
 
         Thread.sleep(100_000_000);
 }

Sweet! Does the code want a license?
May 30 2009
parent reply zsxxsz <zhengshuxin hexun.com> writes:
 Sweet! Does the code want a license?

The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.
May 30 2009
parent "Jarl =?UTF-8?B?QW5kcsOpIg==?= <jarl.andre gmail.com> writes:
On Saturday, 30 May 2009 at 13:36:41 UTC, zsxxsz wrote:
 Sweet! Does the code want a license?

The thread-pool is just one little part of my plan migrating acl_project written with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.

I have copied the source in this article into my own source files and have just started to use the thread pool. Since you did not provide any license it will effectively be licensed under the GPL v2 license as stated for my own source. I'll add a comment stating who made the particular code above the code.
May 25 2012
prev sibling parent "Denis Koroskin" <2korden gmail.com> writes:
On Sat, 30 May 2009 08:14:08 +0400, zsxxsz <zhengshuxin hexun.com> wrote:

 Hi, I written one thread pool in which each thread is semi-resident. The
 thread-pool is different from the Tango's one. Any thread of the  
 thread-pool
 will exit when it is idle for the timeout. That is to say, all threads  
 for
 jobs, and no job no thread. The thread-pool was from my C version. With  
 D, I
 wrote it more easier with the delegate function.

You may consider adding your code to a scrapple project at dsource.org (http://www.dsource.org/projects/scrapple) This way you will keep your code up-to-date and more people will be able to use it.
May 30 2009