www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - parallel threads stalls until all thread batches are finished.

reply Joe bloow.edu <Joe bloow.edu> writes:
I use

foreach(s; taskPool.parallel(files, numParallel))
{ L(s); } // L(s) represents the work to be done.

to download files from the internet.

Everything works. The issue is this:

the foreach will download 8 files at once. BUT it will not start 
the next batch of 8 *until* ALL of the previous 8 are done. It 
seems that taskPool.parallel will not immediately start a new 
thread once a task is done

E.g., I get

L(s1);
L(s2);
...
L(s8);
--- // nothing below is executed until all L(s1) through L(s8) 
are finished.
L(s9);
L(s10);
...

My expectation is that, say, when the first task is complete, say 
L(s4), that L(s9) is then executed.

The reason why this causes me problems is that the downloaded 
files, which are cashed to a temporary file, stick around and do 
not free up space(think of it just as using memory) and this can 
cause some problems some of the time. Also, the point of parallel 
tasks is to allow paralleling but the way the code is working is 
that it starts the tasks in parallel but then essentially stalls 
the paralleling a large portion of the time. E.g.,

If there are a bunch of small downloads but one large one, then 
that one large download stalls the everything. E.g., say L(s5) is 
a very long download while all others are very quick. Then L(s5) 
will prevent downloading anything afterwards until it is 
finished(I'll get L(s1) through L(s8) but nothing else until 
L(s5) is finished).

What's going on and how to reconcile?
Aug 23 2023
next sibling parent reply Sergey <kornburn yandex.ru> writes:
On Wednesday, 23 August 2023 at 13:03:36 UTC, Joe wrote:
 I use

 foreach(s; taskPool.parallel(files, numParallel))
 { L(s); } // L(s) represents the work to be done.
If you make for example that L function return “ok” in case file successfully downloaded, you can try to use TaskPool.amap. The other option - use std.concurrency probably.
Aug 23 2023
next sibling parent Joe bloow.edu <Joe bloow.edu> writes:
On Wednesday, 23 August 2023 at 14:43:33 UTC, Sergey wrote:
 On Wednesday, 23 August 2023 at 13:03:36 UTC, Joe wrote:
 I use

 foreach(s; taskPool.parallel(files, numParallel))
 { L(s); } // L(s) represents the work to be done.
If you make for example that L function return “ok” in case file successfully downloaded, you can try to use TaskPool.amap. The other option - use std.concurrency probably.
Any idea why it is behaving the way it is?
Aug 25 2023
prev sibling parent reply Joe bloow.edu <Joe bloow.edu> writes:
On Wednesday, 23 August 2023 at 14:43:33 UTC, Sergey wrote:
 On Wednesday, 23 August 2023 at 13:03:36 UTC, Joe wrote:
 I use

 foreach(s; taskPool.parallel(files, numParallel))
 { L(s); } // L(s) represents the work to be done.
If you make for example that L function return “ok” in case file successfully downloaded, you can try to use TaskPool.amap. The other option - use std.concurrency probably.
I think I might know what is going on but not sure: The tasks are split up in batches and each batch gets a thread. What happens then is some long task will block it's entire batch and there will be no re-balancing of the batches. "A work unit is a set of consecutive elements of range to be processed by a worker thread between communication with any other thread. The number of elements processed per work unit is controlled by the workUnitSize parameter. " So the question is how to rebalance these work units? E.g., when a worker thread is done with it's batch it should look to help finish that batch rather than terminating and leaving all the work for the last thread. This seems like a flaw in the design. E.g., if one happens to have n batches and every batch but one has tasks that finish instantly then essentially one has no parallelization.
Aug 25 2023
parent reply =?UTF-8?Q?Ali_=C3=87ehreli?= <acehreli yahoo.com> writes:
On 8/25/23 14:27, Joe bloow.edu wrote:

 "A work unit is a set of consecutive elements of range to be processed
 by a worker thread between communication with any other thread. The
 number of elements processed per work unit is controlled by the
 workUnitSize parameter. "

 So the question is how to rebalance these work units?
Ok, your question brings me back from summer hibernation. :) This is what I do: - Sort the tasks in decreasing time order; the ones that will take the most time should go first. - Use a work unit size of 1. The longest running task will start first. You can't get better than that. When I print some progress reporting, I see that most of the time N-1 tasks have finished and we are waiting for that one longest running task. Ali "back to sleep"
Aug 25 2023
parent reply Joe bloow.edu <Joe bloow.edu> writes:
On Friday, 25 August 2023 at 21:31:37 UTC, Ali Çehreli wrote:
 On 8/25/23 14:27, Joe bloow.edu wrote:

 "A work unit is a set of consecutive elements of range to be
processed
 by a worker thread between communication with any other
thread. The
 number of elements processed per work unit is controlled by
the
 workUnitSize parameter. "

 So the question is how to rebalance these work units?
Ok, your question brings me back from summer hibernation. :) This is what I do: - Sort the tasks in decreasing time order; the ones that will take the most time should go first. - Use a work unit size of 1. The longest running task will start first. You can't get better than that. When I print some progress reporting, I see that most of the time N-1 tasks have finished and we are waiting for that one longest running task. Ali "back to sleep"
I do not know the amount of time they will run. They are files that are being downloaded and I neither know the file size nor the download rate(in fact, the actual download happens externally). While I could use work unit of size 1 then problem then is I would be downloading N files at once and that will cause other problems if N is large(and sometimes it is). There should be a "work unit size" and a "max simultaneous workers". Then I could set the work unit size to 1 and say the max simultaneous workers to 8 to get 8 simultaneous downloads without stalling.
Aug 25 2023
parent reply =?UTF-8?Q?Christian_K=c3=b6stlin?= <christian.koestlin gmail.com> writes:
On 26.08.23 05:39, Joe bloow.edu wrote:
 On Friday, 25 August 2023 at 21:31:37 UTC, Ali Çehreli wrote:
 On 8/25/23 14:27, Joe bloow.edu wrote:

 "A work unit is a set of consecutive elements of range to be
processed
 by a worker thread between communication with any other
thread. The
 number of elements processed per work unit is controlled by
the
 workUnitSize parameter. "

 So the question is how to rebalance these work units?
Ok, your question brings me back from summer hibernation. :) This is what I do: - Sort the tasks in decreasing time order; the ones that will take the most time should go first. - Use a work unit size of 1. The longest running task will start first. You can't get better than that. When I print some progress reporting, I see that most of the time N-1 tasks have finished and we are waiting for that one longest running task. Ali "back to sleep"
I do not know the amount of time they will run. They are files that are being downloaded and I neither know the file size nor the download rate(in fact, the actual download happens externally). While I could use work unit of size 1 then problem then is I would be downloading N files at once and that will cause other problems if N is large(and sometimes it is). There should be a "work unit size" and a "max simultaneous workers". Then I could set the work unit size to 1 and say the max simultaneous workers to 8 to get 8 simultaneous downloads without stalling.
I think thats what is implemented atm ... `taskPool` creates a `TaskPool` of size `defaultPoolThreads` (defaulting to totalCPUs - 1). The work unit size is only there to optimize for small workloads where task / thread switching would be a big performance problem (I guess). So in your case a work unit size of 1 should be good. Did you try this already? Kind regards, Christian
Aug 28 2023
parent reply Joe bloow.edu <Joe bloow.edu> writes:
On Monday, 28 August 2023 at 10:33:15 UTC, Christian Köstlin 
wrote:
 On 26.08.23 05:39, Joe bloow.edu wrote:
 On Friday, 25 August 2023 at 21:31:37 UTC, Ali Çehreli wrote:
 On 8/25/23 14:27, Joe bloow.edu wrote:

 "A work unit is a set of consecutive elements of range to be
processed
 by a worker thread between communication with any other
thread. The
 number of elements processed per work unit is controlled by
the
 workUnitSize parameter. "

 So the question is how to rebalance these work units?
Ok, your question brings me back from summer hibernation. :) This is what I do: - Sort the tasks in decreasing time order; the ones that will take the most time should go first. - Use a work unit size of 1. The longest running task will start first. You can't get better than that. When I print some progress reporting, I see that most of the time N-1 tasks have finished and we are waiting for that one longest running task. Ali "back to sleep"
I do not know the amount of time they will run. They are files that are being downloaded and I neither know the file size nor the download rate(in fact, the actual download happens externally). While I could use work unit of size 1 then problem then is I would be downloading N files at once and that will cause other problems if N is large(and sometimes it is). There should be a "work unit size" and a "max simultaneous workers". Then I could set the work unit size to 1 and say the max simultaneous workers to 8 to get 8 simultaneous downloads without stalling.
I think thats what is implemented atm ... `taskPool` creates a `TaskPool` of size `defaultPoolThreads` (defaulting to totalCPUs - 1). The work unit size is only there to optimize for small workloads where task / thread switching would be a big performance problem (I guess). So in your case a work unit size of 1 should be good. Did you try this already? Kind regards, Christian
Well, I have 32 cores so that would spawn 64-1 threads with hyper threading so not really a solution as it is too many simultaneous downs IMO. "These properties get and set the number of worker threads in the TaskPool instance returned by taskPool. The default value is totalCPUs - 1. Calling the setter after the first call to taskPool does not changes number of worker threads in the instance returned by taskPool. " I guess I could try to see if I can change this but I don't know what the "first call" is(and I'm using parallel to create it). Seems that the code should simply be made more robust. Probably a just a few lines of code to change/add at most. Maybe the constructor and parallel should take an argument to set the "totalCPUs" which defaults to getting the total number rather than it being hard coded. I currently don't need or have 32+ downlaods to test ATM so... this() trusted { this(totalCPUs - 1); } /** Allows for custom number of worker threads. */ this(size_t nWorkers) trusted { Basically everything is hard coded to use totalCPU's and that is the ultimate problem. Not all tasks should use all CPU's. What happens when we get 128 cores? or even 32k at some point? It shouldn't be a hard coded value, it's really that simple and where the problem originates because someone didn't think ahead.
Aug 28 2023
next sibling parent reply =?UTF-8?Q?Ali_=C3=87ehreli?= <acehreli yahoo.com> writes:
On 8/28/23 15:37, Joe bloow.edu wrote:

 Basically everything is hard coded to use totalCPU's
parallel() is a function that dispatches to a default TaskPool object, which uses totalCPUs. It's convenient but as you say, not all problems should use it. In such cases, you would create your own TaskPool object and call .parallel on it: https://youtu.be/dRORNQIB2wA?t=1611s Ali
Aug 28 2023
parent Joe bloow.edu <Joe bloow.edu> writes:
On Monday, 28 August 2023 at 22:43:56 UTC, Ali Çehreli wrote:
 On 8/28/23 15:37, Joe bloow.edu wrote:

 Basically everything is hard coded to use totalCPU's
parallel() is a function that dispatches to a default TaskPool object, which uses totalCPUs. It's convenient but as you say, not all problems should use it. In such cases, you would create your own TaskPool object and call .parallel on it: https://youtu.be/dRORNQIB2wA?t=1611s Ali
Thanks. Seems to work. Didn't realize it was that easy ;)
Aug 29 2023
prev sibling parent =?UTF-8?Q?Christian_K=c3=b6stlin?= <christian.koestlin gmail.com> writes:
On 29.08.23 00:37, Joe bloow.edu wrote:
 Well, I have 32 cores so that would spawn 64-1 threads with hyper 
 threading so not really a solution as it is too many simultaneous downs 
 IMO.
 
 
 "These properties get and set the number of worker threads in the 
 TaskPool instance returned by taskPool. The default value is totalCPUs - 
 1. Calling the setter after the first call to taskPool does not changes 
 number of worker threads in the instance returned by taskPool. "
 
 I guess I could try to see if I can change this but I don't know what 
 the "first call" is(and I'm using parallel to create it).
The call that is interesting in your case is the call to `taskPool` in your foreach loop. So if you call `defaultPoolThreads(8)` before your loop you should be good.
 Seems that the code should simply be made more robust. Probably a just a 
 few lines of code to change/add at most. Maybe the constructor and 
 parallel should take an argument to set the "totalCPUs" which defaults 
 to getting the total number rather than it being hard coded.
 
 I currently don't need or have 32+ downlaods to test ATM so...
 
 
     this()  trusted
      {
          this(totalCPUs - 1);
      }
 
      /**
      Allows for custom number of worker threads.
      */
      this(size_t nWorkers)  trusted
      {
 
 
 Basically everything is hard coded to use totalCPU's and that is the 
 ultimate problem. Not all tasks should use all CPU' >
 What happens when we get 128 cores? or even 32k at some poin >
 It shouldn't be a hard coded value, it's really that simple and where 
 the problem originates because someone didn't think ahead.
You have the option to not use the default value. Kind regards, Christian
Aug 29 2023
prev sibling parent reply Adam D Ruppe <destructionator gmail.com> writes:
On Wednesday, 23 August 2023 at 13:03:36 UTC, Joe wrote:
 to download files from the internet.
Are they particularly big files? You might consider using one of the other libs that does it all in one thread. (i ask about size cuz mine ive never tested doing big files at once, i usually use it for smaller things, but i think it can do it)
 The reason why this causes me problems is that the downloaded 
 files, which are cashed to a temporary file, stick around and 
 do not free up space(think of it just as using memory) and this 
 can cause some problems some of the time.
this is why im a lil worried about my thing, like do they have to be temporary files or can it be memory that is recycled?
Aug 25 2023
parent Joe bloow.edu <Joe bloow.edu> writes:
On Friday, 25 August 2023 at 21:43:26 UTC, Adam D Ruppe wrote:
 On Wednesday, 23 August 2023 at 13:03:36 UTC, Joe wrote:
 to download files from the internet.
Are they particularly big files? You might consider using one of the other libs that does it all in one thread. (i ask about size cuz mine ive never tested doing big files at once, i usually use it for smaller things, but i think it can do it)
 The reason why this causes me problems is that the downloaded 
 files, which are cashed to a temporary file, stick around and 
 do not free up space(think of it just as using memory) and 
 this can cause some problems some of the time.
this is why im a lil worried about my thing, like do they have to be temporary files or can it be memory that is recycled?
The downloading is simply a wrapper that provides some caching to a ram drive and management of other things and doesn't have any clue how or what is being downloaded. It passes a link to something like youtube-dl or yt-dlp and has it do the downloaded. Everything works great except for the bottle neck when things are not balancing out. It's not a huge deal since it does work and, for the most part, gets everything downloaded but sorta defeats the purpose of having multiple downloads(which is much faster since each download seems to be throttled). Increasing the work unit size will make the problem worse while reducing it to 1 will flood the downloads(e.g., having 200 or even 2000 downloads at once). Ultimately this seems like a design flaw in ThreadPool which should auto rebalance the threads and not treat the number of threads as identical to the worker unit size(well, length/workerunitsize). e.g., suppose we have 1000 tasks and set worker unit size to 100. This gives 10 workers and 10 workers will be spawned(not sure if this is limited to total number of cpu threads or not) What would be nice is to be able to set worker unit size to 1 and this gives 1000 workers but limit concurent workers to, say 10. So we would have at any time 10 workers each working on 1 element. When one gets finished it can be repurposed for any unfinished tasks. The second case is preferable since there should be no issues with balancing but one still gets 10 workers. The stalling comes from the algorithm design and not anything innate in the problem or workload itself.
Aug 25 2023