www.digitalmars.com         C & C++   DMDScript  

digitalmars.D - Re: Streaming library

reply Kagamin <spam here.lot> writes:
Denis Koroskin Wrote:

 // A generic stream
 interface Stream
 {
       property InputStream input();
       property OutputStream output();
       property SeekableStream seekable();
       property bool endOfStream();
      void close();
 }

I think, it's better to inherit Stream from InputStream and OutputStream. Do you even need endOfStream? From my experience, it's ok to blocked-read and determine end when 0 is read. Even if you read from network, is there a point in non-blocking read?
 InputStream doesn't really has many methods:
 
 interface InputStream
 {
 	// reads up to buffer.length bytes from a stream
 	// returns number of bytes read
 	// throws on error
 	size_t read(ubyte[] buffer);
 

I've found ubyte[] read(ubyte[] buffer) more usable: ubyte[] buffer=new ubyte[sz]; size_t rd=stream.read(buffer); ubyte[] rdata=buffer[0..rd]; ubyte[] buffer=new ubyte[sz]; ubyte[] rdata=stream.read(buffer); And you can instantly pass the read data to some other function. myProcessor.Process(stream.read(buffer));
 	// reads from current position
 	AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
 }

I also have an implementation of asyncronous stream with interface similar to one of .net, though recently I came to another design. ///What one can name "an actual stream" that holds the handle interface AsyncStreamSource { ///Advances position on each read AsyncStream createStream(); ///Leaves position intact - adjust manually or rather don't adjust AsyncStream createStillStream(); } ///An accessor for AsyncStreamSource that wraps an io completion port or its analogue. Contains stream position at which io is done on AsyncStreamSource (that's why it's stream - it works like unshared blocking stream with asynchronous access). interface AsyncStream { void beginRead(ubyte[] buffer); ubyte[] endRead(); long position() property; void position(long newPosition) property; } Multiple AsyncStreams can be created for one AsyncStreamSource. So effectively one AsyncStreamSource can be shared through different AsyncStreams, while individual AsyncStreams cannot be shared. With this design you won't new AsyncResult for each io operation. Though such desing can be problematic for linux as its async io functionality is quite... errmmm... linux way as I remember.
Oct 15 2010
parent reply "Denis Koroskin" <2korden gmail.com> writes:
On Fri, 15 Oct 2010 22:48:20 +0400, Kagamin <spam here.lot> wrote:

 Denis Koroskin Wrote:

 // A generic stream
 interface Stream
 {
       property InputStream input();
       property OutputStream output();
       property SeekableStream seekable();
       property bool endOfStream();
      void close();
 }

I think, it's better to inherit Stream from InputStream and OutputStream. Do you even need endOfStream? From my experience, it's ok to blocked-read and determine end when 0 is read. Even if you read from network, is there a point in non-blocking read?

Probably, I think I'll try both ways and see which one turns out to be better.
 InputStream doesn't really has many methods:

 interface InputStream
 {
 	// reads up to buffer.length bytes from a stream
 	// returns number of bytes read
 	// throws on error
 	size_t read(ubyte[] buffer);

I've found ubyte[] read(ubyte[] buffer) more usable: ubyte[] buffer=new ubyte[sz]; size_t rd=stream.read(buffer); ubyte[] rdata=buffer[0..rd]; ubyte[] buffer=new ubyte[sz]; ubyte[] rdata=stream.read(buffer); And you can instantly pass the read data to some other function. myProcessor.Process(stream.read(buffer));

Either way is fine with me. But I agree yours is handy, too. I was actually thinking about a plain ubyte[] read(); method: struct BufferedStream { ubyte[] read(); // just give me something } because in many cases you don't really care about buffer size or don't even know amount of data you can read (e.g. socket stream).
 	// reads from current position
 	AsyncReadRequest readAsync(ubyte[] buffer, Mailbox* mailbox = null);
 }

I also have an implementation of asyncronous stream with interface similar to one of .net, though recently I came to another design. ///What one can name "an actual stream" that holds the handle interface AsyncStreamSource { ///Advances position on each read AsyncStream createStream(); ///Leaves position intact - adjust manually or rather don't adjust AsyncStream createStillStream(); } ///An accessor for AsyncStreamSource that wraps an io completion port or its analogue. Contains stream position at which io is done on AsyncStreamSource (that's why it's stream - it works like unshared blocking stream with asynchronous access). interface AsyncStream { void beginRead(ubyte[] buffer); ubyte[] endRead(); long position() property; void position(long newPosition) property; } Multiple AsyncStreams can be created for one AsyncStreamSource. So effectively one AsyncStreamSource can be shared through different AsyncStreams, while individual AsyncStreams cannot be shared. With this design you won't new AsyncResult for each io operation.

Interesting, I think I'll give it a try. This will reduce basic Stream interface size, and some implementations can return null unless they support async read/write.
 Though such desing can be problematic for linux as its async io  
 functionality is quite... errmmm... linux way as I remember.

:)
Oct 15 2010
next sibling parent reply Kagamin <spam here.lot> writes:
Denis Koroskin Wrote:

 I think, it's better to inherit Stream from InputStream and OutputStream.
 Do you even need endOfStream? From my experience, it's ok to  
 blocked-read and determine end when 0 is read. Even if you read from  
 network, is there a point in non-blocking read?

Probably, I think I'll try both ways and see which one turns out to be better.

I should say, that implementation will be somewhat tricky, as different kinds of streams handle reads beyond end in different ways. Say, reading from a pipe whose write end is closed results in an error.
 Either way is fine with me. But I agree yours is handy, too.
 I was actually thinking about a plain ubyte[] read(); method:
 
 struct BufferedStream
 {
 	ubyte[] read(); // just give me something
 }

Funny idea. Here we can also think about MemoryStream: when you have all the data in memory, you don't need user side buffer, and can just return direct slice to data as const(ubyte)[].
Oct 15 2010
parent reply Andrei Alexandrescu <SeeWebsiteForEmail erdani.org> writes:
On 10/15/10 14:54 CDT, Kagamin wrote:
 Denis Koroskin Wrote:

 I think, it's better to inherit Stream from InputStream and OutputStream.
 Do you even need endOfStream? From my experience, it's ok to
 blocked-read and determine end when 0 is read. Even if you read from
 network, is there a point in non-blocking read?

Probably, I think I'll try both ways and see which one turns out to be better.

I should say, that implementation will be somewhat tricky, as different kinds of streams handle reads beyond end in different ways. Say, reading from a pipe whose write end is closed results in an error.
 Either way is fine with me. But I agree yours is handy, too.
 I was actually thinking about a plain ubyte[] read(); method:

 struct BufferedStream
 {
 	ubyte[] read(); // just give me something
 }

Funny idea. Here we can also think about MemoryStream: when you have all the data in memory, you don't need user side buffer, and can just return direct slice to data as const(ubyte)[].

We've circled all the way back to ranges a la byChunk. bool empty(); ubyte[] front(); void popFront(); Look ma, no copying, no fuss, no muss. Whatever interface(s) we find work best for various kinds of streams, we should make them play nice with ranges. Burst streams (the kind that offer data in variable-size chunks) work great with a range interface. Andrei
Oct 15 2010
parent reply Kagamin <spam here.lot> writes:
Andrei Alexandrescu Wrote:

 Whatever interface(s) we find work best for various kinds of streams, we 
 should make them play nice with ranges. Burst streams (the kind that 
 offer data in variable-size chunks) work great with a range interface.

I was thinking about chunk size to be supplied by user like this int readInt() { ubyte[] buffer=read(4); assert(buffer.length==4); // can this trigger? return *cast(int*)buffer.ptr; }
Oct 15 2010
parent reply Andrei Alexandrescu <SeeWebsiteForEmail erdani.org> writes:
On 10/15/10 15:54 CDT, Kagamin wrote:
 Andrei Alexandrescu Wrote:

 Whatever interface(s) we find work best for various kinds of streams, we
 should make them play nice with ranges. Burst streams (the kind that
 offer data in variable-size chunks) work great with a range interface.

I was thinking about chunk size to be supplied by user like this int readInt() { ubyte[] buffer=read(4); assert(buffer.length==4); // can this trigger? return *cast(int*)buffer.ptr; }

That's a fair point. I don't think you can assert, there could always be the situation that there was not enough data. Anyhow, wrt byChunk I was thinking of adding a property for changing the chunk size prior to popFront(): auto chunks = File("file.bin").byChunk(8); // now chunks.front() is a ubyte[] containing 8 bytes chunks.chunkSize = 4; chunks.popFront(); // now chunks.front() is a ubyte[] containing 4 bytes chunks.chunkSize = 4096; for (; !chunks.empty; chunks.popFront()) { // handle 4KB at a time } Andrei
Oct 15 2010
parent Kagamin <spam here.lot> writes:
Denis Koroskin Wrote:

 ubyte[] read() and ubyte[] read(size_t size) both require buffering (e.g.  
 BufferedStream adapter). As such, it can provide stronger guaranties over  
 raw streams.

What do you plan to do if user requests too much data from BufferedStream? Ideally stream can allocate big buffer and store it in a weak pointer for it to be both memory and allocation-wise, but we don't have weak pointers, do we?
Oct 15 2010
prev sibling next sibling parent "Denis Koroskin" <2korden gmail.com> writes:
On Fri, 15 Oct 2010 23:54:32 +0400, Kagamin <spam here.lot> wrote:

 Denis Koroskin Wrote:

 I think, it's better to inherit Stream from InputStream and  

 Do you even need endOfStream? From my experience, it's ok to
 blocked-read and determine end when 0 is read. Even if you read from
 network, is there a point in non-blocking read?

Probably, I think I'll try both ways and see which one turns out to be better.

I should say, that implementation will be somewhat tricky, as different kinds of streams handle reads beyond end in different ways. Say, reading from a pipe whose write end is closed results in an error.
 Either way is fine with me. But I agree yours is handy, too.
 I was actually thinking about a plain ubyte[] read(); method:

 struct BufferedStream
 {
 	ubyte[] read(); // just give me something
 }

Funny idea. Here we can also think about MemoryStream: when you have all the data in memory, you don't need user side buffer, and can just return direct slice to data as const(ubyte)[].

Yeah. I'm also investigating into reading/sending multiple buffers at once (aka scatter-gather I/O: http://www.delorie.com/gnu/docs/glibc/libc_246.html) It most likely won't be a part of a Stream interface, because I'd like to support different types of buffer ranges, and that asks for a templated implementation: size_t writeRange(Range)(Range buffers); Each element of Range needs to be of type ubyte[], and no other requirements. Returns number of bytes written (not number of buffers, because data transmission might stop at a middle of buffer.
Oct 15 2010
prev sibling next sibling parent "Denis Koroskin" <2korden gmail.com> writes:
On Sat, 16 Oct 2010 01:01:33 +0400, Andrei Alexandrescu  
<SeeWebsiteForEmail erdani.org> wrote:

 On 10/15/10 15:54 CDT, Kagamin wrote:
 Andrei Alexandrescu Wrote:

 Whatever interface(s) we find work best for various kinds of streams,  
 we
 should make them play nice with ranges. Burst streams (the kind that
 offer data in variable-size chunks) work great with a range interface.

I was thinking about chunk size to be supplied by user like this int readInt() { ubyte[] buffer=read(4); assert(buffer.length==4); // can this trigger? return *cast(int*)buffer.ptr; }


ubyte[] read() and ubyte[] read(size_t size) both require buffering (e.g. BufferedStream adapter). As such, it can provide stronger guaranties over raw streams.
 That's a fair point. I don't think you can assert, there could always be  
 the situation that there was not enough data. Anyhow, wrt byChunk I was  
 thinking of adding a property for changing the chunk size prior to  
 popFront():

 auto chunks = File("file.bin").byChunk(8);
 // now chunks.front() is a ubyte[] containing 8 bytes
 chunks.chunkSize = 4;
 chunks.popFront();
 // now chunks.front() is a ubyte[] containing 4 bytes
 chunks.chunkSize = 4096;
 for (; !chunks.empty; chunks.popFront()) {
      // handle 4KB at a time
 }


 Andrei

Oct 15 2010
prev sibling parent "Denis Koroskin" <2korden gmail.com> writes:
On Sat, 16 Oct 2010 01:25:35 +0400, Kagamin <spam here.lot> wrote:

 Denis Koroskin Wrote:

 ubyte[] read() and ubyte[] read(size_t size) both require buffering  
 (e.g.
 BufferedStream adapter). As such, it can provide stronger guaranties  
 over
 raw streams.

What do you plan to do if user requests too much data from BufferedStream?

Allocate, read, return.
 Ideally stream can allocate big buffer and store it in a weak pointer  
 for it to be both memory and allocation-wise, but we don't have weak  
 pointers, do we?

I don't see anything that would prevent weak references from working. I'm storing some of my pointers as size_t, and they are being garbage-collected like intended (dtor updates dead reference). I will try implementing WeakRef template and see how it turns out later.
Oct 15 2010