www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Implementing a Monitor

reply "Minas Mina" <minas_mina1990 hotmail.co.uk> writes:
I'm using condition variables to implement a monitor that 
simulates the producer-consumer(unbounded buffer) scenario.

Note:
monitor is __gshared and is initialized in main(). Then the other 
threads are created and run.

There is one producer thread, and 5 consumer threads.

void producer()
{
	while( 1 )
	{
		monitor.produce();
	}
}

void consumer()
{
	while( 1 )
	{
		monitor.consume();
	}
}

class Monitor
{
	Cond cond;
	
	char[] buffer;
	ulong sz;
	
	this()
	{
		// necessary: The condition variable constructor requires a 
mutex passed to it
		cond = new Cond(new Mutex());
	}
	
	void produce()
	{
		// produce a letter a-z
		char c = 'a' + rand() % 26;
		writeln("* Producer: Produced a '", c, "' buffer.length = ", 
buffer.length, "\n");
		
		// put it into the buffer
		++buffer.length;
		buffer[buffer.length - 1] = c;
		
		//if( buffer.length > 1 )
		notify(cond); // calls condition.notify()
		
		//Thread.sleep(dur!"msecs"(1000));
	}
	
	void consume()
	{
		if( buffer.length == 0 )
		{
			writeln("put to sleep");
			cwait(cond); // calls Condition.wait()
		}
		
		// take
		char c = buffer[buffer.length-1];
		--buffer.length;
		
		writeln("# Consumer has taken a '", c, "' buffer = [", buffer, 
"] (", buffer.length, " elements)\n");
	}
}





The output is something like:
put to sleep
put to sleep
put to sleep
put to sleep
* Producer: Produced a 'n' buffer.length = 0

put to sleep
* Producer: Produced a 'w' buffer.length = 1

* Producer: Produced a 'l' buffer.length = 2

* Producer: Produced a 'r' buffer.length = 3

* Producer: Produced a 'b' buffer.length = 4

* Producer: Produced a 'b' buffer.length = 5

* Producer: Produced a 'm' buffer.length = 6

* Producer: Produced a 'q' buffer.length = 7

...

Even though the producer calls notify() when he finishes, none of 
the consumer threads is ever resumed... (I ran the program for a 
while and redirected output to a file. Then I searched for "#" 
that the consumer prints but nothing).

Why is that happening? Is there something wrong with 
Condition.notify()?

Also, is there a function that I can call to immediatly suspend 
the running thread?

Thanks
Jul 27 2012
next sibling parent Sean Kelly <sean invisibleduck.org> writes:
You need to lock the mutex to synchronize access to the shared data.

On Jul 27, 2012, at 2:31 PM, Minas Mina <minas_mina1990 hotmail.co.uk> =
wrote:
=20
 class Monitor
 {
 	Cond cond;

 =09
 	char[] buffer;
 	ulong sz;
 =09
 	this()
 	{
 		// necessary: The condition variable constructor =

mutex =3D new Mutex; cond =3D new Cond(mutex);
 	}
 =09
 	void produce()
 	{
 		// produce a letter a-z
 		char c =3D 'a' + rand() % 26;
 		writeln("* Producer: Produced a '", c, "' buffer.length =

synchronized(mutex) {
 	=09
 		// put it into the buffer
 		++buffer.length;
 		buffer[buffer.length - 1] =3D c;
 	=09
 		//if( buffer.length > 1 )
 		notify(cond); // calls condition.notify()

}
 	=09
 		//Thread.sleep(dur!"msecs"(1000));
 	}
 =09
 	void consume()
 	{

synchronized(mutex) { // note changed if to while
 		while( buffer.length =3D=3D 0 )
 		{
 			writeln("put to sleep");
 			cwait(cond); // calls Condition.wait()
 		}
 	=09
 		// take
 		char c =3D buffer[buffer.length-1];
 		--buffer.length;

}
 Also, is there a function that I can call to immediatly suspend the =

Thread.yield(). Though Cond.wait() will suspend the thread as well.=
Jul 30 2012
prev sibling parent "Minas" <minas_mina1990 hotmail.co.uk> writes:
Thank you for your reply.

I have some more questions:

I simplified my example. Now my monitor is used for mutual 
exclusion, to understand how monitors work first.

// 3 threads are running P() - the main thread is not one of them

void P()
{
	while( run )
	{
		monitor.EnterCritical2();
		
		// critical section
		writeln(count);
		
		monitor.ExitCritical2();
	}
}

class Monitor
{
	Mutex mutex;
	Cond cond;
	bool in_use = false;
	
	this()
	{
		mutex = new Mutex();
		cond = new Cond(mutex);
	}
	
	void EnterCritical()
	{
		// The synchronized statement is required to ensure only one 
thread will be able
		// to access the block
		synchronized(mutex)
		{
			while( in_use )
				cwait(cond);	
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			cnotify(cond);
		}
	}
}

1) In class (in college), I have learned that monitors "by 
nature" grant access only to one thread in their functions. 
That's what synchronized(mutex) is used for right?

2) Why synchronized(mutex) and not synchronized(this) (I know 
this is not good practise, but is there something more?) or not 
synchronized(some else object)?

3) You correctly changed my "EnterCritical()" code to "While" 
instead of "if", because, and please correct me if I'm wrong, we 
are using notify() and not signal().

4) I wrote my own version of signal. It's:
void csignal(Condition cond)
{
	cond.notify();
	Thread.yield();
}

The logic is that it notifies another Thread and then forces a 
context switch, so it's like calling signal(). Is it correct?

I changed my EnterCritical() to use "if" instead of "while" 
because I use my own version of signal() in ExitCritical().

void EnterCritical2()
	{
		synchronized(mutex)
		{
			if( in_use )
				cwait(cond);
			in_use = true;
			
			++count;
		}
	}
	
	void ExitCritical2()
	{
		synchronized(mutex)
		{
			--count;
			
			in_use = false;
			csignal(cond);
		}
	}

However, when "count" is printed in P(), its value is 3 (it was 1 
before which it ensured that mutual exclusion was correct). If I 
change the "if" to "while" it works, but that's not the purpose 
of signal(). I'm pretty sure that when using "signal" it's "if", 
not "while". What am I doing wrong?


5) Are the threads running concurrently on one core or in 
parallel (provided that the PC has more than one cores). Mine has 
2 cores x 2 threads each.

Thank you.

PS: I'm writing some small programs that demonstrate concurrency 
in D. The reason is I suggested it to my university professor as 
a way to learn about this stuff in a practical manner. So I guess 
if he likes them, D will be tought in a university :)
Jul 31 2012