www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Re: Multithreaded file IO?

reply Jerry Quinn <jlquinn optonline.net> writes:
Jonathan M Davis Wrote:

 On Saturday, September 24, 2011 01:05:52 Jerry Quinn wrote:
 Jonathan M Davis Wrote:
 On Friday, September 23, 2011 23:01:17 Jerry Quinn wrote:
 
 A direct rewrite would involve using shared and synchronized (either on
 the class or a synchronized block around the code that you want to
 lock). However, the more idiomatic way to do it would be to use
 std.concurrency and have the threads pass messages to each other using
 send and receive.

I'm trying the direct rewrite but having problems with shared and synchronized. class queue { File file; this(string infile) { file.open(infile); } synchronized void put(string s) { file.writeln(s); } } queue.d(10): Error: template std.stdio.File.writeln(S...) does not match any function template declaration queue.d(10): Error: template std.stdio.File.writeln(S...) cannot deduce template function from argument types !()(string) Remove the synchronized and it compiles fine with 2.055.

Technically, sychronized should go on the _class_ and not the function, but I'm not sure if dmd is currently correctly implemented in that respect (since if it is, it should actually be an error to stick synchronized on a function). Regardless of that, however, unless you use shared (I don't know if you are), each instance of queue is going to be on its own thread. So, no mutexes will be necessary, but you won't be able to have multiple threads writing to the same File. It could get messy.

I get similar errors if I put synchronized on the class, or shared. My best guess is that a File struct cannot currently (2.055) be shared or accessed in a synchronized context. If I make the class synchronized and use __gshared on the File then it compiles. I don't know if it works, though.
 You could use a synchronized block instead of synchronizing the class
 
 void put(string s)
 {
     synchronized(this)
     {
         file.writeln(s);
     }
 }
 
 and see if that works. But you still need the variable to be shared.

I'll try that.
 So, what you'd probably do is spawn 3 threads from the main thread. One
 would read the file and send the data to another thread. That second
 thread would process the data, then it would send it to the third
 thread, which would write it to disk.

I think that would become messy when you have multiple processing threads. The reader and writer would have to handshake with all the processors.

The reader therad would be free to read at whatever rate that it can. And the processing thread would be free to process at whatever rate that it can. The writer thread would be free to write and whatever rate it can. I/O issues would be reduced (especially if the files be read and written to are on separate drives), since the reading and writing threads would be separate. I don't see why it would be particularly messy. It's essentially what TDPL suggests a file copy function should do except that there's a thread in the middle which does so processing of the data before sending it the thread doing the writing. Really, this is a classic example of the sort of situation which std.concurrency's message passing is intended for.

If you only have one processing thread, this is true. However if you have, say, 16 processing threads, the code becomes messy. The reader has to handshake with each processor and select which processor to send the next chunk of input data to. The same dance has to happen when the processors are ready to write, since I want all the output ordered correctly. Broadcast (which isn't present in std.concurrency) wouldn't work because only one processor should get an input chunk. The problem is really a single-writer, multiple-reader problem.
 std.parallelism actually looks the closest to what I want.  Not sure if I
 can make it work easily though.

For std.parallelism to work, each iteration of a parallel foreach _must_ be _completely_ separate from the others. They can't access the same data. So, for instance, they could access separate elements in an array, but they can't ever access the same element (unless none of the write to it), and something like a file is not going to work at all, since then you'd be trying to write to the file from multiple threads at once with no synchronization whatsoever. std.parallelism is for when you do the same thing many times _in parallel_ with each other, and your use case does not sound like that at all. I really think that std.concurrency is what you should be using if you don't want to do it the C/C++ way and use shared.

Well I would have used the C++ shared way if I could get it to compile :-) I actually am doing the same thing many times in parallel. I have a single input file, but each line is processed independently by a worker thread. The results are output to a single file in order. Is map() what I want, perhaps? With both parallel foreach and parallel map, it wasn't clear if all processors need to finish a work item before more work items get used. The issue is that each processor may take different amount of time and I want to keep all processors busy continuously. My reading of std.parallelism docs suggested that a set of inputs are passed to the processors and all must finish before the next set is handled. Thanks, Jerry
Sep 24 2011
next sibling parent Jonathan M Davis <jmdavisProg gmx.com> writes:
On Sunday, September 25, 2011 02:26:18 Jerry Quinn wrote:
 Jonathan M Davis Wrote:
 On Saturday, September 24, 2011 01:05:52 Jerry Quinn wrote:
 Jonathan M Davis Wrote:
 On Friday, September 23, 2011 23:01:17 Jerry Quinn wrote:
 
 A direct rewrite would involve using shared and synchronized
 (either on the class or a synchronized block around the code
 that you want to lock). However, the more idiomatic way to do
 it would be to use std.concurrency and have the threads pass
 messages to each other using send and receive.

I'm trying the direct rewrite but having problems with shared and synchronized. class queue { File file; this(string infile) { file.open(infile); } synchronized void put(string s) { file.writeln(s); } } queue.d(10): Error: template std.stdio.File.writeln(S...) does not match any function template declaration queue.d(10): Error: template std.stdio.File.writeln(S...) cannot deduce template function from argument types !()(string) Remove the synchronized and it compiles fine with 2.055.

Technically, sychronized should go on the _class_ and not the function, but I'm not sure if dmd is currently correctly implemented in that respect (since if it is, it should actually be an error to stick synchronized on a function). Regardless of that, however, unless you use shared (I don't know if you are), each instance of queue is going to be on its own thread. So, no mutexes will be necessary, but you won't be able to have multiple threads writing to the same File. It could get messy.

I get similar errors if I put synchronized on the class, or shared. My best guess is that a File struct cannot currently (2.055) be shared or accessed in a synchronized context. If I make the class synchronized and use __gshared on the File then it compiles. I don't know if it works, though.
 You could use a synchronized block instead of synchronizing the class
 
 void put(string s)
 {
 
     synchronized(this)
     {
     
         file.writeln(s);
     
     }
 
 }
 
 and see if that works. But you still need the variable to be shared.

I'll try that.
 So, what you'd probably do is spawn 3 threads from the main
 thread. One would read the file and send the data to another
 thread. That second thread would process the data, then it
 would send it to the third thread, which would write it to
 disk.

I think that would become messy when you have multiple processing threads. The reader and writer would have to handshake with all the processors.>

And the processing thread would be free to process at whatever rate that it can. The writer thread would be free to write and whatever rate it can. I/O issues would be reduced (especially if the files be read and written to are on separate drives), since the reading and writing threads would be separate. I don't see why it would be particularly messy. It's essentially what TDPL suggests a file copy function should do except that there's a thread in the middle which does so processing of the data before sending it the thread doing the writing. Really, this is a classic example of the sort of situation which std.concurrency's message passing is intended for.

If you only have one processing thread, this is true. However if you have, say, 16 processing threads, the code becomes messy. The reader has to handshake with each processor and select which processor to send the next chunk of input data to. The same dance has to happen when the processors are ready to write, since I want all the output ordered correctly. Broadcast (which isn't present in std.concurrency) wouldn't work because only one processor should get an input chunk. The problem is really a single-writer, multiple-reader problem.
 std.parallelism actually looks the closest to what I want.  Not sure
 if I can make it work easily though.

For std.parallelism to work, each iteration of a parallel foreach _must_ be _completely_ separate from the others. They can't access the same data. So, for instance, they could access separate elements in an array, but they can't ever access the same element (unless none of the write to it), and something like a file is not going to work at all, since then you'd be trying to write to the file from multiple threads at once with no synchronization whatsoever. std.parallelism is for when you do the same thing many times _in parallel_ with each other, and your use case does not sound like that at all. I really think that std.concurrency is what you should be using if you don't want to do it the C/C++ way and use shared.

Well I would have used the C++ shared way if I could get it to compile :-) I actually am doing the same thing many times in parallel. I have a single input file, but each line is processed independently by a worker thread. The results are output to a single file in order. Is map() what I want, perhaps? With both parallel foreach and parallel map, it wasn't clear if all processors need to finish a work item before more work items get used. The issue is that each processor may take different amount of time and I want to keep all processors busy continuously. My reading of std.parallelism docs suggested that a set of inputs are passed to the processors and all must finish before the next set is handled.

I'm really not all that familiar with std.parallelism and the details of how it works, but from everything I know of it, using it to read and write from a file would be a _bad_ idea, because its worker units must be completely independent, and reading and writing from a single file is _not_ something that you can do independently in multiple threads. Maybe what you should do is use std.concurrency as I suggested but use std.parallelism in the processing thread to separate the processing stuff into multiple threads, but I don't know. The tools are there, and I'm sure that there's a way to get it to do what you need to do, but I don't know the best way to do it with std.parallelism. I'm sure that dsimcha would be of help (he's the author of std.parallelism), but I don't think that he's subscribed to this list. If you want his help, you'll probably need to either post in the main D newsgroup or ask on stackoverlow (since he does pay attention there). - Jonathan M Davis
Sep 24 2011
prev sibling parent Lutger Blijdestijn <lutger.blijdestijn gmail.com> writes:
Jerry Quinn wrote:

 Jonathan M Davis Wrote:
 
 On Saturday, September 24, 2011 01:05:52 Jerry Quinn wrote:
 Jonathan M Davis Wrote:
 On Friday, September 23, 2011 23:01:17 Jerry Quinn wrote:
 
 A direct rewrite would involve using shared and synchronized (either
 on the class or a synchronized block around the code that you want to
 lock). However, the more idiomatic way to do it would be to use
 std.concurrency and have the threads pass messages to each other
 using send and receive.

I'm trying the direct rewrite but having problems with shared and synchronized. class queue { File file; this(string infile) { file.open(infile); } synchronized void put(string s) { file.writeln(s); } } queue.d(10): Error: template std.stdio.File.writeln(S...) does not match any function template declaration queue.d(10): Error: template std.stdio.File.writeln(S...) cannot deduce template function from argument types !()(string) Remove the synchronized and it compiles fine with 2.055.

Technically, sychronized should go on the _class_ and not the function, but I'm not sure if dmd is currently correctly implemented in that respect (since if it is, it should actually be an error to stick synchronized on a function). Regardless of that, however, unless you use shared (I don't know if you are), each instance of queue is going to be on its own thread. So, no mutexes will be necessary, but you won't be able to have multiple threads writing to the same File. It could get messy.

I get similar errors if I put synchronized on the class, or shared. My best guess is that a File struct cannot currently (2.055) be shared or accessed in a synchronized context. If I make the class synchronized and use __gshared on the File then it compiles. I don't know if it works, though.

You could embed a File pointer in the synchronized queue, make sure it is the sole owner, and then cast to File* when using it. The concurrency chapter has a paragraph on this method that explains why it is not covered by the type system. I believe this is closest to your C++ example, I never did this though so I'm not 100% sure it works.
Sep 25 2011