www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Starting and managing threads

reply Bagomot <bagomot gmail.com> writes:
Hello everybody!

My program uses the fswatch library to track changes in a 
directory. It runs on the main thread of the program. I need it 
to do its work in a separate thread, without blocking the main 
one. In addition, I need to be able to terminate the thread at 
the moment I want from the main thread of the program.

I tried to get my head around Thread and Fiber but still didn't 
figure out how to properly start and manage threads. I have using 
Thread turns it into a zombie when the main thread of the program 
ends.

I will not even write my code here, because it is at the level of 
examples from the documentation. Please tell me how to start 
threads correctly, how to manage them and how to end them without 
turning them into zombies.
Dec 27 2021
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 12/27/21 1:33 AM, Bagomot wrote:

 separate thread, without blocking the main one.
I think you can use std.concurrency there. I have a chapter here: http://ddili.org/ders/d.en/concurrency.html Look for 'struct Exit' to see how the main thread signals workers to stop running. And some std.concurrency hints appear in my DConf Online 2020 presentation here: https://dconf.org/2020/online/#ali1 Ali
Dec 27 2021
next sibling parent reply Bagomot <bagomot gmail.com> writes:
On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:
 On 12/27/21 1:33 AM, Bagomot wrote:

 separate thread, without blocking the main one.
I think you can use std.concurrency there. I have a chapter here: http://ddili.org/ders/d.en/concurrency.html Look for 'struct Exit' to see how the main thread signals workers to stop running. And some std.concurrency hints appear in my DConf Online 2020 presentation here: https://dconf.org/2020/online/#ali1 Ali
I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this: ```d import std.concurrency; import std.thread; void main() { Test.getInstance.run; } class Test { private { __gshared Test instance; Watcher[] watchers; } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (Watcher watcher; this.watchers) { spawn(&watcher.run); } } } class Watcher { public void run() { while (true) { // job } } } ``` Error: template `std.concurrency.spawn` cannot deduce function from argument types `!()(void delegate())`. I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?
Dec 28 2021
parent reply Tejas <notrealemail gmail.com> writes:
On Tuesday, 28 December 2021 at 14:19:46 UTC, Bagomot wrote:
 On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:
 On 12/27/21 1:33 AM, Bagomot wrote:

 separate thread, without blocking the main one.
I think you can use std.concurrency there. I have a chapter here: http://ddili.org/ders/d.en/concurrency.html Look for 'struct Exit' to see how the main thread signals workers to stop running. And some std.concurrency hints appear in my DConf Online 2020 presentation here: https://dconf.org/2020/online/#ali1 Ali
I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this: ```d import std.concurrency; import std.thread; void main() { Test.getInstance.run; } class Test { private { __gshared Test instance; Watcher[] watchers; } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (Watcher watcher; this.watchers) { spawn(&watcher.run); } } } class Watcher { public void run() { while (true) { // job } } } ``` Error: template `std.concurrency.spawn` cannot deduce function from argument types `!()(void delegate())`. I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?
Yes, you'll have to make the function that you want to run static and all the arguments must be `shared` qualified, ie, TLS not allowed ```d import std.concurrency; import core.thread; import std.stdio:writeln; void main() { Test.getInstance.run; } class Test { private { __gshared Test instance; shared Watcher[] watchers = [new Watcher(), new Watcher()]; //notice the shared // I used 2 values just to show some output } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (ref/+use ref to ensure no copies are made. I don't tknow the right thing to do here, the errors went away when I used ref so...+/ watcher; this.watchers) { spawn(&Watcher.run, watcher); } } } class Watcher { static public void run(shared Watcher watcher/+sending as argument since function can't have an invisible this parameter anymore+/) {//now this is static while (true) { // job writeln("It works now :D"); break; //wrote this so that you can copy-paste to run.dlang.io } } } ```
Dec 28 2021
parent reply Bagomot <bagomot gmail.com> writes:
On Tuesday, 28 December 2021 at 15:42:04 UTC, Tejas wrote:
 On Tuesday, 28 December 2021 at 14:19:46 UTC, Bagomot wrote:
 On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:
 On 12/27/21 1:33 AM, Bagomot wrote:

 separate thread, without blocking the main one.
I think you can use std.concurrency there. I have a chapter here: http://ddili.org/ders/d.en/concurrency.html Look for 'struct Exit' to see how the main thread signals workers to stop running. And some std.concurrency hints appear in my DConf Online 2020 presentation here: https://dconf.org/2020/online/#ali1 Ali
I tried to run with std.concurrency via spawn, but this does not work for me for the reason that in the program I run the thread not from main, but from the object. It looks something like this: ```d import std.concurrency; import std.thread; void main() { Test.getInstance.run; } class Test { private { __gshared Test instance; Watcher[] watchers; } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (Watcher watcher; this.watchers) { spawn(&watcher.run); } } } class Watcher { public void run() { while (true) { // job } } } ``` Error: template `std.concurrency.spawn` cannot deduce function from argument types `!()(void delegate())`. I would not want to do this from main because it breaks the structure of my program. Is there a way to do it the way I want?
Yes, you'll have to make the function that you want to run static and all the arguments must be `shared` qualified, ie, TLS not allowed ```d import std.concurrency; import core.thread; import std.stdio:writeln; void main() { Test.getInstance.run; } class Test { private { __gshared Test instance; shared Watcher[] watchers = [new Watcher(), new Watcher()]; //notice the shared // I used 2 values just to show some output } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (ref/+use ref to ensure no copies are made. I don't tknow the right thing to do here, the errors went away when I used ref so...+/ watcher; this.watchers) { spawn(&Watcher.run, watcher); } } } class Watcher { static public void run(shared Watcher watcher/+sending as argument since function can't have an invisible this parameter anymore+/) {//now this is static while (true) { // job writeln("It works now :D"); break; //wrote this so that you can copy-paste to run.dlang.io } } } ```
I can't do it according to your example, my Watcher list fills up at runtime.
Dec 28 2021
parent reply Tejas <notrealemail gmail.com> writes:
On Tuesday, 28 December 2021 at 16:29:05 UTC, Bagomot wrote:

 I can't do it according to your example, my Watcher list fills 
 up at runtime.
Yes, it's possible to do it at runtime as well(it already _was_ happening at runtime), although I'll be using a `cast` for convenience now. ```d import std.concurrency; import core.thread; import std.stdio:writeln,readf; void main() { writeln("Please enter num of elements"); int a; readf!"%d"(a); foreach(number; 0..a){ Test.getInstance.watchers ~= new Watcher();//will have to use operations from core.atomic if you want to read/write shared variables, that's why I didn't declare the array as shared } Test.getInstance.run; } class Test { private { __gshared Test instance; /+shared+/ Watcher[] watchers; } protected this() { } public static Test getInstance() { if (!instance) { synchronized (Test.classinfo) { if (!instance) instance = new Test; } } return instance; } public void run() { foreach (ref watcher; cast(shared)/+using cast so that TLS gets disabled)+/this.watchers) { spawn(&Watcher.run, watcher); } } } class Watcher { static public void run(shared Watcher watcher) { while (true) { // job writeln("It works now :D"); break; } } } ```
Dec 28 2021
parent reply Bagomot <bagomot gmail.com> writes:
Thanks! It works.
Perhaps there will still be difficulties, I will write here.
Dec 28 2021
parent reply Bagomot <bagomot gmail.com> writes:
Good day! I keep giving rise to problems.
Above, Tejas helped me a lot, but still doesn't work.
I gave up using the fswatch library, thinking that the problem 
was in it.
Now trying to do it using libasync.

Here is the code that runs on the main thread, it blocks further 
actions on that thread.
```d

/+ dub.sdl:
	dependency "libasync" version="~>0.8.6"
+/

import std.stdio;
import std.file;
import std.algorithm;
import core.thread;
import std.concurrency;

import libasync;
import libasync.watcher;
import libasync.threads;

void main() {

	string testDir = "temp";
	if (!testDir.exists)
		mkdir(testDir);

	Guard guard = Guard.getInstance;
	guard.addWatchedDir(testDir, false);
	guard.run;

	writeln("Some kind of action...");
}

class Guard {
	private {
		__gshared Guard instance;
		static EventLoop eventLoop;
		WatchedDir[] watchedDirs;
	}

	protected this() {
		this.eventLoop = getThreadEventLoop();
	}

	shared static ~this() {
		destroyAsyncThreads();
	}

	public static Guard getInstance() {
		if (!instance) {
			synchronized (Guard.classinfo) {
				if (!instance)
					instance = new Guard;
			}
		}

		return instance;
	}

	public void run() {
		while (eventLoop.loop()) {
			continue;
		}
	}

	public void addWatchedDir(string dir, bool recursive = true, 
string[] exclude = [
		]) {
		if (this.watchedDirs.canFind!(a => a.dir == dir))
			return;

		this.watchedDirs ~= new WatchedDir(dir, recursive, exclude);
	}

	class WatchedDir {
		private {
			string dir;
			bool recursive;
			string[] exclude;
			AsyncDirectoryWatcher watcher;
			DWChangeInfo[8] changeBuf;
		}

		this(string dir, bool recursive, string[] exclude) {
			this.dir = dir;
			this.recursive = recursive;
			this.exclude = exclude;
			this.watcher = new AsyncDirectoryWatcher(eventLoop);

			this.watcher.run({
				DWChangeInfo[] changes = changeBuf[];
				uint cnt;

				do {
					cnt = this.watcher.readChanges(changes);
					foreach (i; 0 .. cnt) {
						writeln("Main Callback got directory event: ", changes[i]);
					}
				}
				while (cnt > 0);
			});
			this.watcher.watchDir(this.dir, DWFileEvent.ALL, 
this.recursive);
		}
	}
}
```
If I change the run method of the Guard class so that it starts a 
new thread, the program just does nothing:
```d
public void run() {
	spawn((shared EventLoop eventLoop) {
		while ((cast() eventLoop).loop()) {
			continue;
		}
	}, cast(shared) this.eventLoop);
}
```
Why? What am I doing wrong?
Jan 12 2022
next sibling parent Bagomot <bagomot gmail.com> writes:
On Wednesday, 12 January 2022 at 08:50:09 UTC, Bagomot wrote:
 Good day! I keep giving rise to problems.
 Above, Tejas helped me a lot, but still doesn't work.
 I gave up using the fswatch library, thinking that the problem 
 was in it.
 Now trying to do it using libasync.

 Here is the code that runs on the main thread, it blocks 
 further actions on that thread.
 ```d

 /+ dub.sdl:
 	dependency "libasync" version="~>0.8.6"
 +/

 import std.stdio;
 import std.file;
 import std.algorithm;
 import core.thread;
 import std.concurrency;

 import libasync;
 import libasync.watcher;
 import libasync.threads;

 void main() {

 	string testDir = "temp";
 	if (!testDir.exists)
 		mkdir(testDir);

 	Guard guard = Guard.getInstance;
 	guard.addWatchedDir(testDir, false);
 	guard.run;

 	writeln("Some kind of action...");
 }

 class Guard {
 	private {
 		__gshared Guard instance;
 		static EventLoop eventLoop;
 		WatchedDir[] watchedDirs;
 	}

 	protected this() {
 		this.eventLoop = getThreadEventLoop();
 	}

 	shared static ~this() {
 		destroyAsyncThreads();
 	}

 	public static Guard getInstance() {
 		if (!instance) {
 			synchronized (Guard.classinfo) {
 				if (!instance)
 					instance = new Guard;
 			}
 		}

 		return instance;
 	}

 	public void run() {
 		while (eventLoop.loop()) {
 			continue;
 		}
 	}

 	public void addWatchedDir(string dir, bool recursive = true, 
 string[] exclude = [
 		]) {
 		if (this.watchedDirs.canFind!(a => a.dir == dir))
 			return;

 		this.watchedDirs ~= new WatchedDir(dir, recursive, exclude);
 	}

 	class WatchedDir {
 		private {
 			string dir;
 			bool recursive;
 			string[] exclude;
 			AsyncDirectoryWatcher watcher;
 			DWChangeInfo[8] changeBuf;
 		}

 		this(string dir, bool recursive, string[] exclude) {
 			this.dir = dir;
 			this.recursive = recursive;
 			this.exclude = exclude;
 			this.watcher = new AsyncDirectoryWatcher(eventLoop);

 			this.watcher.run({
 				DWChangeInfo[] changes = changeBuf[];
 				uint cnt;

 				do {
 					cnt = this.watcher.readChanges(changes);
 					foreach (i; 0 .. cnt) {
 						writeln("Main Callback got directory event: ", 
 changes[i]);
 					}
 				}
 				while (cnt > 0);
 			});
 			this.watcher.watchDir(this.dir, DWFileEvent.ALL, 
 this.recursive);
 		}
 	}
 }
 ```
 If I change the run method of the Guard class so that it starts 
 a new thread, the program just does nothing:
 ```d
 public void run() {
 	spawn((shared EventLoop eventLoop) {
 		while ((cast() eventLoop).loop()) {
 			continue;
 		}
 	}, cast(shared) this.eventLoop);
 }
 ```
 Why? What am I doing wrong?
Actual!
Jan 15 2022
prev sibling next sibling parent reply frame <frame86 live.com> writes:
On Wednesday, 12 January 2022 at 08:50:09 UTC, Bagomot wrote:

 Why? What am I doing wrong?
I guess your main() exits and just ends all threads?
Jan 15 2022
parent Bagomot <bagomot gmail.com> writes:
On Saturday, 15 January 2022 at 19:07:20 UTC, frame wrote:
 On Wednesday, 12 January 2022 at 08:50:09 UTC, Bagomot wrote:

 Why? What am I doing wrong?
I guess your main() exits and just ends all threads?
No, the program continues to run. And I tested it with while in main.
Jan 15 2022
prev sibling parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 1/12/22 00:50, Bagomot wrote:

 If I change the run method of the Guard class so that it starts a new
 thread, the program just does nothing:
 ```d
 public void run() {
      spawn((shared EventLoop eventLoop) {
          while ((cast() eventLoop).loop()) {
              continue;
The program does nothing probably because of that continue. (?)
          }
      }, cast(shared) this.eventLoop);
 }
So, the event loop is in a separate thread. What should happen when the events trigger? Do you want this thread to handle the events or should this thread send a message to the main thread (or perhaps a separate thread that responds to these events). Ali
Jan 15 2022
parent Bagomot <bagomot gmail.com> writes:
 The program does nothing probably because of that continue. (?)
No, it does work inside the loop.
 So, the event loop is in a separate thread. What should happen 
 when the events trigger? Do you want this thread to handle the 
 events or should this thread send a > message to the main 
 thread (or perhaps a separate thread that responds to these > 
 events).
I want the events to be handled on the same thread (not on the main).
Jan 16 2022
prev sibling parent forkit <forkit gmail.com> writes:
On Monday, 27 December 2021 at 10:59:07 UTC, Ali Çehreli wrote:
 ...my DConf Online 2020 presentation here:
   https://dconf.org/2020/online/#ali1

 Ali
Hey, that is a really great presentation! Many more people should watch it, and learn ;-)
Jan 16 2022