www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - How to create Multi Producer-Single Consumer concurrency

reply adnan338 <relay.dev.adnan protonmail.com> writes:
I have a list of files to download from the internet. Each of 
those downloads are a time consuming task.

What I want to do is download them concurrently and when each of 
those downloads finish, I want to activate something in my main 
thread, which will update a progressbar in the GUI thread.

So there are multiple "download finished" message producers, and 
one consumer of those messages. Furthermore, that producer has a 
callback that triggers an UI object.

For example,

auto list = [...]; // list of URLs
auto downloadableItems = list.length;
__gshared int downloaded = 0;
auto progressBar = // a gtk progressbar;

How should I proceed? I can do a parallel foreach and call 
download() on each of the URL and update `downloaded`, but I 
don't know how to listen to `downloaded` when it gets updated so 
I can make changes in `progressBar`

Not that my UI library is not thread safe and I cannot access UI 
object from different threads.
Jun 12 2020
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 6/12/20 3:02 PM, adnan338 wrote:

 So there are multiple "download finished" message producers, and one
 consumer of those messages. Furthermore, that producer has a callback
 that triggers an UI object.
That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do. I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%. import std.stdio; import std.concurrency; import std.algorithm; import std.range; import std.exception; import std.format; import core.thread; struct Progress { Tid tid; // The id of the reporting thread size_t amount; // The amount of progress so far size_t total; // Total progress (can be file size) } void display(Progress[Tid] progresses) { const amount = progresses.byValue.map!(p => p.amount).sum; const total = progresses.byValue.map!(p => p.total).sum; writefln!"%6.2f%%"(100.0 * amount / total); } // The worker thread function void download(string url) { writefln!"Worker %s downloading %s."(thisTid, url); enum total = 20; foreach (i; 0 .. total) { // Imitate some progress Thread.sleep(100.msecs); // Report progress to owner ownerTid.send(Progress(thisTid, i + 1, total)); } } void main() { auto list = [ "dlang.org", "ddili.org" ]; auto downloaders = list.length .iota .map!(i => spawnLinked(&download, list[i])) .array; Progress[Tid] progresses; size_t finished = 0; while (finished != list.length) { receive( (LinkTerminated arg) { ++finished; // Check whether this thread is exiting prematurely enforce((arg.tid in progresses) && (progresses[arg.tid].amount == progresses[arg.tid].total), format!"Thread %s exited unexpectedly"(arg.tid)); }, (Progress progress) { progresses[progress.tid] = progress; progresses.display(); } ); } writeln("Processing the downloaded files."); } Ali
Jun 16 2020
parent reply Bagomot <bagomot gmail.com> writes:
On Tuesday, 16 June 2020 at 09:10:09 UTC, Ali Çehreli wrote:
 On 6/12/20 3:02 PM, adnan338 wrote:

 So there are multiple "download finished" message producers,
and one
 consumer of those messages. Furthermore, that producer has a
callback
 that triggers an UI object.
That's almost exactly what I do in some of my programs. I use std.concurrency and the following is a working sketch of what I do. I assumed you get finer individual granularity of progress as opposed to the binary 0% -> 100%. import std.stdio; import std.concurrency; import std.algorithm; import std.range; import std.exception; import std.format; import core.thread; struct Progress { Tid tid; // The id of the reporting thread size_t amount; // The amount of progress so far size_t total; // Total progress (can be file size) } void display(Progress[Tid] progresses) { const amount = progresses.byValue.map!(p => p.amount).sum; const total = progresses.byValue.map!(p => p.total).sum; writefln!"%6.2f%%"(100.0 * amount / total); } // The worker thread function void download(string url) { writefln!"Worker %s downloading %s."(thisTid, url); enum total = 20; foreach (i; 0 .. total) { // Imitate some progress Thread.sleep(100.msecs); // Report progress to owner ownerTid.send(Progress(thisTid, i + 1, total)); } } void main() { auto list = [ "dlang.org", "ddili.org" ]; auto downloaders = list.length .iota .map!(i => spawnLinked(&download, list[i])) .array; Progress[Tid] progresses; size_t finished = 0; while (finished != list.length) { receive( (LinkTerminated arg) { ++finished; // Check whether this thread is exiting prematurely enforce((arg.tid in progresses) && (progresses[arg.tid].amount == progresses[arg.tid].total), format!"Thread %s exited unexpectedly"(arg.tid)); }, (Progress progress) { progresses[progress.tid] = progress; progresses.display(); } ); } writeln("Processing the downloaded files."); } Ali
How to do the same with `taskPool` instead of `spawnLinked`?
Jul 13 2022
parent reply =?UTF-8?Q?Ali_=c3=87ehreli?= <acehreli yahoo.com> writes:
On 7/13/22 02:25, Bagomot wrote:

 How to do the same with `taskPool` instead of `spawnLinked`?
You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent. However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes. You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides. The following are my understandings of these topics: http://ddili.org/ders/d.en/parallelism.html http://ddili.org/ders/d.en/concurrency.html http://ddili.org/ders/d.en/concurrency_shared.html The introduction section of the Concurrency chapter lists some differences. Ali
Jul 13 2022
parent Bagomot <bagomot gmail.com> writes:
On Wednesday, 13 July 2022 at 19:06:48 UTC, Ali Çehreli wrote:
 On 7/13/22 02:25, Bagomot wrote:

 How to do the same with `taskPool` instead of `spawnLinked`?
You are hitting the nail on the head. :) std.parallelism, which taskPool is a concept of, is for cases where operations are independent. However, producer and consumer are by definition dependent, so it's a problem for std.concurrency, which involves message boxes. You can do the same with std.parallelism or core.thread but you would be implementing some of what std.concurrency already provides. The following are my understandings of these topics: http://ddili.org/ders/d.en/parallelism.html http://ddili.org/ders/d.en/concurrency.html http://ddili.org/ders/d.en/concurrency_shared.html The introduction section of the Concurrency chapter lists some differences. Ali
Thank you! I understood the difference between `std.parallelism` and `std.concurrency`. My question no longer relevant :)
Jul 13 2022