www.digitalmars.com         C & C++   DMDScript  

digitalmars.D.learn - Using iopipe to stream a gzipped file

reply Andrew <aabrown24 hotmail.com> writes:
Hi,

I have a very large gziped text file (all ASCII characters and 
~500GB) that I want to stream and process line-by-line, and I 
thought the iopipe library would be perfect for this, but I can't 
seem to get it to work. So far, this is the closest I have to 
getting it to work:

import iopipe.textpipe;
import iopipe.zip;
import iopipe.bufpipe;
import iopipe.stream;

void main()
{

   auto fileToRead = 
openDev("file.gz").bufd.unzip(CompressionFormat.gzip);

   foreach (line; fileToRead.assumeText.byLineRange!false)
   {
      \\ do stuff
   }
}

but this only processes the first ~200 odd lines (I guess the 
initial read into the buffer). Can anyone help me out?

Thanks very much

Andrew
Jan 03 2018
parent reply Steven Schveighoffer <schveiguy yahoo.com> writes:
On 1/3/18 9:45 AM, Andrew wrote:
 Hi,
 
 I have a very large gziped text file (all ASCII characters and ~500GB) 
 that I want to stream and process line-by-line, and I thought the iopipe 
 library would be perfect for this, but I can't seem to get it to work. 
 So far, this is the closest I have to getting it to work:
 
 import iopipe.textpipe;
 import iopipe.zip;
 import iopipe.bufpipe;
 import iopipe.stream;
 
 void main()
 {
 
    auto fileToRead = openDev("file.gz").bufd.unzip(CompressionFormat.gzip);
 
    foreach (line; fileToRead.assumeText.byLineRange!false)
    {
       \\ do stuff
    }
 }
 
 but this only processes the first ~200 odd lines (I guess the initial 
 read into the buffer). Can anyone help me out?
Do you have a sample file I can play with? Your iopipe chain looks correct, so I'm not sure why it wouldn't work. -Steve
Jan 03 2018
parent reply Andrew <aabrown24 hotmail.com> writes:
On Wednesday, 3 January 2018 at 16:09:19 UTC, Steven 
Schveighoffer wrote:
 On 1/3/18 9:45 AM, Andrew wrote:
 Hi,
 
 I have a very large gziped text file (all ASCII characters and 
 ~500GB) that I want to stream and process line-by-line, and I 
 thought the iopipe library would be perfect for this, but I 
 can't seem to get it to work. So far, this is the closest I 
 have to getting it to work:
 
 import iopipe.textpipe;
 import iopipe.zip;
 import iopipe.bufpipe;
 import iopipe.stream;
 
 void main()
 {
 
    auto fileToRead = 
 openDev("file.gz").bufd.unzip(CompressionFormat.gzip);
 
    foreach (line; fileToRead.assumeText.byLineRange!false)
    {
       \\ do stuff
    }
 }
 
 but this only processes the first ~200 odd lines (I guess the 
 initial read into the buffer). Can anyone help me out?
Do you have a sample file I can play with? Your iopipe chain looks correct, so I'm not sure why it wouldn't work. -Steve
A sample file (about 250MB) can be found here: ftp://ftp.1000genomes.ebi.ac.uk/vol1/ftp/release/20130502/ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz It should have 1,103,800 lines, but the following code only reports 256: import iopipe.textpipe; import iopipe.zip; import iopipe.bufpipe; import iopipe.stream; import std.stdio; void main() { auto fileToRead = openDev("ALL.chr22.phase3_shapeit2_mvncall_integrated_v5a.20130502.genotypes.vcf.gz").bufd.unzip(CompressionFormat.gzip); auto counter = 0; foreach (line; fileToRead.assumeText.byLineRange!false) { counter++; } writeln(counter); } Thanks for looking into this. Andrew
Jan 03 2018
parent reply Steven Schveighoffer <schveiguy yahoo.com> writes:
On 1/3/18 12:03 PM, Andrew wrote:

 Thanks for looking into this.
 
So it looks like the file you have is a concatenated gzip file. If I gunzip the file and recompress it, it works properly. Looking at the docs of zlib inflate [1]: " Unlike the gunzip utility and gzread() ..., inflate() will not automatically decode concatenated gzip streams. inflate() will return Z_STREAM_END at the end of the gzip stream. The state would need to be reset to continue decoding a subsequent gzip stream." So what is happening is the inflate function is returning Z_STREAM_END, and I'm considering the stream done from that return code. I'm not sure yet how to fix this. I suppose I can check if any more data exists, and then re-init and continue. I have to look up what a concatenated gzip file is. gzread isn't good for generic purposes, because it requires an actual file input (I want to support any input type, including memory data). -Steve [1] https://github.com/dlang/phobos/blob/master/etc/c/zlib.d#L874
Jan 03 2018
parent reply Andrew <aabrown24 hotmail.com> writes:
On Thursday, 4 January 2018 at 02:44:09 UTC, Steven Schveighoffer 
wrote:
 On 1/3/18 12:03 PM, Andrew wrote:

 Thanks for looking into this.
 
So it looks like the file you have is a concatenated gzip file. If I gunzip the file and recompress it, it works properly. Looking at the docs of zlib inflate [1]: " Unlike the gunzip utility and gzread() ..., inflate() will not automatically decode concatenated gzip streams. inflate() will return Z_STREAM_END at the end of the gzip stream. The state would need to be reset to continue decoding a subsequent gzip stream." So what is happening is the inflate function is returning Z_STREAM_END, and I'm considering the stream done from that return code. I'm not sure yet how to fix this. I suppose I can check if any more data exists, and then re-init and continue. I have to look up what a concatenated gzip file is. gzread isn't good for generic purposes, because it requires an actual file input (I want to support any input type, including memory data). -Steve [1] https://github.com/dlang/phobos/blob/master/etc/c/zlib.d#L874
Ah thank you, that makes sense. These types of files are compressed using the bgzip utility so that the file can be indexed meaning specific rows extracted quickly (there's more details of this here http://www.htslib.org/doc/tabix.html and the code can be found here: https://github.com/samtools/htslib/blob/develop/bgzf.c)
Jan 04 2018
parent reply Steven Schveighoffer <schveiguy yahoo.com> writes:
On 1/4/18 7:01 AM, Andrew wrote:

 Ah thank you, that makes sense. These types of files are compressed 
 using the bgzip utility so that the file can be indexed meaning specific 
 rows extracted quickly (there's more details of this here 
 http://www.htslib.org/doc/tabix.html and the code can be found here: 
 https://github.com/samtools/htslib/blob/develop/bgzf.c)
Hm... that utility seems to say it will result in bgz file extension? So this must be an extraction from one of those files? In any case, I'll figure out how to deal with concatenated gzip file, and update iopipe. Next version will focus on a bunch of stuff relating to the 2 zip threads recently posted here. Thanks! -Steve
Jan 04 2018
parent reply Andrew <aabrown24 hotmail.com> writes:
On Thursday, 4 January 2018 at 12:15:27 UTC, Steven Schveighoffer 
wrote:
 On 1/4/18 7:01 AM, Andrew wrote:

 Ah thank you, that makes sense. These types of files are 
 compressed using the bgzip utility so that the file can be 
 indexed meaning specific rows extracted quickly (there's more 
 details of this here http://www.htslib.org/doc/tabix.html and 
 the code can be found here: 
 https://github.com/samtools/htslib/blob/develop/bgzf.c)
Hm... that utility seems to say it will result in bgz file extension? So this must be an extraction from one of those files? In any case, I'll figure out how to deal with concatenated gzip file, and update iopipe. Next version will focus on a bunch of stuff relating to the 2 zip threads recently posted here. Thanks! -Steve
That would be really great for me, thank you! By default bgzip produces a file with the standard .gz extension. Looking at the code it adds an extra field to the standard gzip header: /* BGZF/GZIP header (speciallized from RFC 1952; little endian): +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ | 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN| +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ BGZF extension: ^ ^ ^ ^ | | | | FLG.EXTRA XLEN B C BGZF format is compatible with GZIP. It limits the size of each compressed block to 2^16 bytes and adds and an extra "BC" field in the gzip header which records the size. */ Thanks again! Andrew
Jan 04 2018
parent reply Steven Schveighoffer <schveiguy yahoo.com> writes:
On 1/4/18 7:23 AM, Andrew wrote:
 On Thursday, 4 January 2018 at 12:15:27 UTC, Steven Schveighoffer wrote:
 In any case, I'll figure out how to deal with concatenated gzip file, 
 and update iopipe. Next version will focus on a bunch of stuff 
 relating to the 2 zip threads recently posted here.
That would be really great for me, thank you!
It's now been updated, see version 0.0.3. Note, the performance isn't something I focused on. I'll note that gzcat | wc -l is 2x faster than your simple example on that file. I can think of a couple reasons for this: 1. gzcat may use mmap to increase read speed 2. gzcat may read larger chunks at once (this can be tuned using iopipe as well, just set the optimizedReadSize). 3. gzcat file.gz | iopipe_byline -nooutput is about 20% faster than using wc -l, so it's definitely not the line parsing. Let me know if this works correctly for your other test cases! If not, file an issue: https://github.com/schveiguy/iopipe/issues -Steve
Jan 04 2018
parent Andrew <aabrown24 hotmail.com> writes:
On Thursday, 4 January 2018 at 15:48:21 UTC, Steven Schveighoffer 
wrote:
 It's now been updated, see version 0.0.3.

 Note, the performance isn't something I focused on. I'll note 
 that gzcat | wc -l is 2x faster than your simple example on 
 that file.

 I can think of a couple reasons for this:

 1. gzcat may use mmap to increase read speed
 2. gzcat may read larger chunks at once (this can be tuned 
 using iopipe as well, just set the optimizedReadSize).
 3. gzcat file.gz | iopipe_byline -nooutput is about 20% faster 
 than using wc -l, so it's definitely not the line parsing.

 Let me know if this works correctly for your other test cases! 
 If not, file an issue: 
 https://github.com/schveiguy/iopipe/issues

 -Steve
That works perfectly, thank you very much!
Jan 04 2018