www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.announce - the semi-resident thread pool

reply zsxxsz <zhengshuxin hexun.com> writes:
Hi, I written one thread pool in which each thread is semi-resident. The
thread-pool is different from the Tango's one. Any thread of the thread-pool
will exit when it is idle for the timeout. That is to say, all threads for
jobs, and no job no thread. The thread-pool was from my C version. With D, I
wrote it more easier with the delegate function.
Below is the source code:

module adl.thread_pool;

import core.sys.posix.pthread;  // just for pthread_self()
import core.thread;
import core.sync.mutex;
import core.sync.condition;
import std.c.time;

private struct Job
{
        Job *next;
        void function() fn;
        void delegate() dg;
        void *arg;
        int   call;
}

/**
 * semi-daemon thread of thread pool
 */
class CThreadPool
{
public:
        /**
         * Constructs a CThreadPool
         *  param nMaxThread {int} the max number threads in thread pool
         *  param idleTimeout {int} when > 0, the idle thread will
         *  exit after idleTimeout seconds, if == 0, the idle thread
         *  will not exit
         *  param sz {size_t} when > 0, the thread will be created which
         *  stack size is sz.
         */
        this(int nMaxThread, int idleTimeout, size_t sz = 0)
        {
                m_nMaxThread = nMaxThread;
                m_idleTimeout = idleTimeout;
                m_stackSize = sz;
                m_mutex = new Mutex;
                m_cond = new Condition(m_mutex);
        }

        /**
         * Append one task into the thread pool's task queue
         *  param fn {void function()}
         */
        void append(void function() fn)
        {
                Job *job;
                char  buf[256];

                if (fn == null)
                        throw new Exception("fn null");

                job = new Job;
                job.fn = fn;
                job.next = null;
                job.call = Call.FN;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * Append one task into the thread pool's task queue
         *  param dg {void delegate()}
         */
        void append(void delegate() dg)
        {
                Job *job;
                char  buf[256];

                if (dg == null)
                        throw new Exception("dg null");

                job = new Job;
                job.dg = dg;
                job.next = null;
                job.call = Call.DG;

                m_mutex.lock();
                append(job);
                m_mutex.unlock();
        }

        /**
         * If dg not null, when one new thread is created, dg will be called.
         *  param dg {void delegate()}
         */
        void onThreadInit(void delegate() dg)
        {
                m_onThreadInit = dg;
        }

        /**
         * If dg not null, before one thread exits, db will be called.
         *  param dg {void delegate()}
         */
        void onThreadExit(void delegate() dg)
        {
                m_onThreadExit = dg;
        }

private:
        enum Call { NO, FN, DG }

        Mutex m_mutex;
        Condition m_cond;
        size_t m_stackSize = 0;

        Job* m_jobHead = null, m_jobTail = null;
        int m_nJob = 0;
        bool m_isQuit = false;
        int m_nThread = 0;
        int m_nMaxThread;
        int m_nIdleThread = 0;
        int m_overloadTimeWait = 0;
        int m_idleTimeout;
        time_t m_lastWarn;

        void delegate() m_onThreadInit;
        void delegate() m_onThreadExit;
        void append(Job *job)
        {
                if (m_jobHead == null)
                        m_jobHead = job;
                else
                        m_jobTail.next = job;
                m_jobTail = job;

                m_nJob++;

                if (m_nIdleThread > 0) {
                        m_cond.notify();
                } else if (m_nThread < m_nMaxThread) {
                        Thread thread = new Thread(&doJob);
                        thread.isDaemon = true;
                        thread.start();
                        m_nThread++;
                } else if (m_nJob > 10 * m_nMaxThread) {
                        time_t now = time(null);
                        if (now - m_lastWarn >= 2) {
                                m_lastWarn = now;
                        }
                        if (m_overloadTimeWait > 0) {
                                Thread.sleep(m_overloadTimeWait);
                        }
                }
        }
        void doJob()
        {
                Job *job;
                int status;
                bool timedout;
                long period = m_idleTimeout * 10_000_000;

                if (m_onThreadInit != null)
                        m_onThreadInit();

                m_mutex.lock();
                for (;;) {
                        timedout = false;

                        while (m_jobHead == null && !m_isQuit) {
                                m_nIdleThread++;
                                if (period > 0) {
                                        try {
                                                if (m_cond.wait(period) ==
false) {
                                                        timedout = true;
                                                        break;
                                                }
                                        } catch (SyncException e) {
                                                m_nIdleThread--;
                                                m_nThread--;
                                                m_mutex.unlock();
                                                if (m_onThreadExit != null)
                                                        m_onThreadExit();
                                                throw e;
                                        }
                                } else {
                                        m_cond.wait();
                                }
                                m_nIdleThread--;
                        }  /* end while */
                        job = m_jobHead;

                        if (job != null) {
                                m_jobHead = job.next;
                                m_nJob--;
                                if (m_jobTail == job)
                                        m_jobTail = null;
                                /* the lock shuld be unlocked before enter
working processs */
                                m_mutex.unlock();
                                switch (job.call) {
                                case Call.FN:
                                        job.fn();
                                        break;
                                case Call.DG:
                                        job.dg();
                                        break;
                                default:
                                        break;
                                }

                                /* lock again */
                                m_mutex.lock();
                        }
                        if (m_jobHead == null && m_isQuit) {
                                m_nThread--;
                                if (m_nThread == 0)
                                        m_cond.notifyAll();
                                break;
                        }
                        if (m_jobHead == null && timedout) {
                                m_nThread--;
                                break;
                        }
                }

                m_mutex.unlock();

                writefln("Thread(%d) of ThreadPool exit now", pthread_self());
                if (m_onThreadExit != null)
                        m_onThreadExit();
        }
}

import std.stdio;
unittest
{
        CThreadPool pool = new CThreadPool(10, 10);

        void testThreadInit(string s)
        {
                void onThreadInit()
                {
                        writefln("thread(%d) was created now, s: %s",
pthread_self(), s);
                }
                pool.onThreadInit(&onThreadInit);
        }

        void testThreadExit(string s)
        {
                void onThreadExit()
                {
                        writefln("thread(%d) was to exit now, s: %s",
pthread_self(), s);
                }
                pool.onThreadExit(&onThreadExit);
        }

        void testAddJobs(string s)
        {
                void threadFun()
                {
                        writef("doJob thread id: %d, str: %s\n",
pthread_self(), s);
                        Thread.sleep(10_000_000);
                        writef("doJob thread id: %d, wakeup now\n",
pthread_self());
                }
                pool.append(&threadFun);
                pool.append(&threadFun);
                pool.append(&threadFun);
        }

        string s = "hello world";
        string s1 = "new thread was ok now";
        string s2 = "thread exited now";
        testThreadInit(s1);
        testThreadExit(s2);

        testAddJobs(s);

        Thread.sleep(100_000_000);
}
May 29 2009
next sibling parent reply Robert Fraser <fraserofthenight gmail.com> writes:
zsxxsz wrote:
 Hi, I written one thread pool in which each thread is semi-resident. The
 thread-pool is different from the Tango's one. Any thread of the thread-pool
 will exit when it is idle for the timeout. That is to say, all threads for
 jobs, and no job no thread. The thread-pool was from my C version. With D, I
 wrote it more easier with the delegate function.
 Below is the source code:
 
 module adl.thread_pool;
 
 import core.sys.posix.pthread;  // just for pthread_self()
 import core.thread;
 import core.sync.mutex;
 import core.sync.condition;
 import std.c.time;
 
 private struct Job
 {
         Job *next;
         void function() fn;
         void delegate() dg;
         void *arg;
         int   call;
 }
 
 /**
  * semi-daemon thread of thread pool
  */
 class CThreadPool
 {
 public:
         /**
          * Constructs a CThreadPool
          *  param nMaxThread {int} the max number threads in thread pool
          *  param idleTimeout {int} when > 0, the idle thread will
          *  exit after idleTimeout seconds, if == 0, the idle thread
          *  will not exit
          *  param sz {size_t} when > 0, the thread will be created which
          *  stack size is sz.
          */
         this(int nMaxThread, int idleTimeout, size_t sz = 0)
         {
                 m_nMaxThread = nMaxThread;
                 m_idleTimeout = idleTimeout;
                 m_stackSize = sz;
                 m_mutex = new Mutex;
                 m_cond = new Condition(m_mutex);
         }
 
         /**
          * Append one task into the thread pool's task queue
          *  param fn {void function()}
          */
         void append(void function() fn)
         {
                 Job *job;
                 char  buf[256];
 
                 if (fn == null)
                         throw new Exception("fn null");
 
                 job = new Job;
                 job.fn = fn;
                 job.next = null;
                 job.call = Call.FN;
 
                 m_mutex.lock();
                 append(job);
                 m_mutex.unlock();
         }
 
         /**
          * Append one task into the thread pool's task queue
          *  param dg {void delegate()}
          */
         void append(void delegate() dg)
         {
                 Job *job;
                 char  buf[256];
 
                 if (dg == null)
                         throw new Exception("dg null");
 
                 job = new Job;
                 job.dg = dg;
                 job.next = null;
                 job.call = Call.DG;
 
                 m_mutex.lock();
                 append(job);
                 m_mutex.unlock();
         }
 
         /**
          * If dg not null, when one new thread is created, dg will be called.
          *  param dg {void delegate()}
          */
         void onThreadInit(void delegate() dg)
         {
                 m_onThreadInit = dg;
         }
 
         /**
          * If dg not null, before one thread exits, db will be called.
          *  param dg {void delegate()}
          */
         void onThreadExit(void delegate() dg)
         {
                 m_onThreadExit = dg;
         }
 
 private:
         enum Call { NO, FN, DG }
 
         Mutex m_mutex;
         Condition m_cond;
         size_t m_stackSize = 0;
 
         Job* m_jobHead = null, m_jobTail = null;
         int m_nJob = 0;
         bool m_isQuit = false;
         int m_nThread = 0;
         int m_nMaxThread;
         int m_nIdleThread = 0;
         int m_overloadTimeWait = 0;
         int m_idleTimeout;
         time_t m_lastWarn;
 
         void delegate() m_onThreadInit;
         void delegate() m_onThreadExit;
         void append(Job *job)
         {
                 if (m_jobHead == null)
                         m_jobHead = job;
                 else
                         m_jobTail.next = job;
                 m_jobTail = job;
 
                 m_nJob++;
 
                 if (m_nIdleThread > 0) {
                         m_cond.notify();
                 } else if (m_nThread < m_nMaxThread) {
                         Thread thread = new Thread(&doJob);
                         thread.isDaemon = true;
                         thread.start();
                         m_nThread++;
                 } else if (m_nJob > 10 * m_nMaxThread) {
                         time_t now = time(null);
                         if (now - m_lastWarn >= 2) {
                                 m_lastWarn = now;
                         }
                         if (m_overloadTimeWait > 0) {
                                 Thread.sleep(m_overloadTimeWait);
                         }
                 }
         }
         void doJob()
         {
                 Job *job;
                 int status;
                 bool timedout;
                 long period = m_idleTimeout * 10_000_000;
 
                 if (m_onThreadInit != null)
                         m_onThreadInit();
 
                 m_mutex.lock();
                 for (;;) {
                         timedout = false;
 
                         while (m_jobHead == null && !m_isQuit) {
                                 m_nIdleThread++;
                                 if (period > 0) {
                                         try {
                                                 if (m_cond.wait(period) ==
 false) {
                                                         timedout = true;
                                                         break;
                                                 }
                                         } catch (SyncException e) {
                                                 m_nIdleThread--;
                                                 m_nThread--;
                                                 m_mutex.unlock();
                                                 if (m_onThreadExit != null)
                                                         m_onThreadExit();
                                                 throw e;
                                         }
                                 } else {
                                         m_cond.wait();
                                 }
                                 m_nIdleThread--;
                         }  /* end while */
                         job = m_jobHead;
 
                         if (job != null) {
                                 m_jobHead = job.next;
                                 m_nJob--;
                                 if (m_jobTail == job)
                                         m_jobTail = null;
                                 /* the lock shuld be unlocked before enter
 working processs */
                                 m_mutex.unlock();
                                 switch (job.call) {
                                 case Call.FN:
                                         job.fn();
                                         break;
                                 case Call.DG:
                                         job.dg();
                                         break;
                                 default:
                                         break;
                                 }
 
                                 /* lock again */
                                 m_mutex.lock();
                         }
                         if (m_jobHead == null && m_isQuit) {
                                 m_nThread--;
                                 if (m_nThread == 0)
                                         m_cond.notifyAll();
                                 break;
                         }
                         if (m_jobHead == null && timedout) {
                                 m_nThread--;
                                 break;
                         }
                 }
 
                 m_mutex.unlock();
 
                 writefln("Thread(%d) of ThreadPool exit now", pthread_self());
                 if (m_onThreadExit != null)
                         m_onThreadExit();
         }
 }
 
 import std.stdio;
 unittest
 {
         CThreadPool pool = new CThreadPool(10, 10);
 
         void testThreadInit(string s)
         {
                 void onThreadInit()
                 {
                         writefln("thread(%d) was created now, s: %s",
 pthread_self(), s);
                 }
                 pool.onThreadInit(&onThreadInit);
         }
 
         void testThreadExit(string s)
         {
                 void onThreadExit()
                 {
                         writefln("thread(%d) was to exit now, s: %s",
 pthread_self(), s);
                 }
                 pool.onThreadExit(&onThreadExit);
         }
 
         void testAddJobs(string s)
         {
                 void threadFun()
                 {
                         writef("doJob thread id: %d, str: %s\n",
 pthread_self(), s);
                         Thread.sleep(10_000_000);
                         writef("doJob thread id: %d, wakeup now\n",
 pthread_self());
                 }
                 pool.append(&threadFun);
                 pool.append(&threadFun);
                 pool.append(&threadFun);
         }
 
         string s = "hello world";
         string s1 = "new thread was ok now";
         string s2 = "thread exited now";
         testThreadInit(s1);
         testThreadExit(s2);
 
         testAddJobs(s);
 
         Thread.sleep(100_000_000);
 }

Sweet! Does the code want a license?
May 30 2009
parent zsxxsz <zhengshuxin hexun.com> writes:
 Sweet! Does the code want a license?

with C to adl_project written with D. The original acl_project has many server framework. Anyone can use it under the GPL.
May 30 2009
prev sibling parent "Denis Koroskin" <2korden gmail.com> writes:
On Sat, 30 May 2009 08:14:08 +0400, zsxxsz <zhengshuxin hexun.com> wrote:

 Hi, I written one thread pool in which each thread is semi-resident. The
 thread-pool is different from the Tango's one. Any thread of the  
 thread-pool
 will exit when it is idle for the timeout. That is to say, all threads  
 for
 jobs, and no job no thread. The thread-pool was from my C version. With  
 D, I
 wrote it more easier with the delegate function.

You may consider adding your code to a scrapple project at dsource.org (http://www.dsource.org/projects/scrapple) This way you will keep your code up-to-date and more people will be able to use it.
May 30 2009