Skip to content

Commit

Permalink
Merge pull request #24 from lilab-bcb/boli
Browse files Browse the repository at this point in the history
Fixed a bug in izlib.h and improved error messages
  • Loading branch information
yihming authored Aug 18, 2022
2 parents 09cecb1 + 977f72c commit b483047
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 84 deletions.
30 changes: 23 additions & 7 deletions ReadParser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,28 @@ class ReadParser {
std::vector<std::thread> parsingThreads_;


bool load_one_tuple(std::vector<iGZipFile>& input_streams, ReadTuple *rt) {
int load_one_tuple(std::vector<iGZipFile>& input_streams, ReadTuple *rt) {
int cnt = 0;
for (int i = 0; i < n_mates_; ++i)
cnt += input_streams[i].next((*rt)[i]);
if (cnt > 0 && cnt != n_mates_) {
printf("Detected mate files with different number of lines!\n");
exit(-1);
return cnt;
}

bool check_error(int file_cnt, int n_load, std::vector<iGZipFile>& input_streams) {
int n_err = 0;
for (int i = 0; i < n_mates_; ++i) {
n_err += input_streams[i].check_error(file_cnt);
}
return cnt > 0;
if (n_err > 0) return true;

if (n_load > 0 && n_load != n_mates_) {
printf("Detected mate files with different number of reads! The following files reached end of file with %d reads while other files are not:\n", file_cnt);
for (int i = 0; i < n_mates_; ++i)
if (input_streams[i].eof()) printf(" %s\n", input_streams[i].get_input_file().c_str());
return true;
}

return false;
}

void parse_read_tuples(moodycamel::ConsumerToken* ctSpace, moodycamel::ProducerToken* ptRead) {
Expand All @@ -155,14 +168,16 @@ class ReadParser {
cur_size = 0;
rt = &((*buffer)[cur_size]);

int n_load, file_cnt;
while (fileQueue_.try_dequeue(fid)) {
// prepare input gzip "streams"
input_streams.clear();
for (int i = 0; i < n_mates_; ++i) input_streams.emplace_back(input_files_[fid][i]);

// load read tuples
while (load_one_tuple(input_streams, rt)) {
++cur_size;
file_cnt = 0;
while (n_load = load_one_tuple(input_streams, rt), n_load == n_mates_) {
++cur_size; ++file_cnt;
if (cur_size == chunkSize_) {
buffer->nreads = cur_size;
curMaxDelay = MIN_BACKOFF_ITERS;
Expand All @@ -173,6 +188,7 @@ class ReadParser {
}
rt = &((*buffer)[cur_size]);
}
if (check_error(file_cnt, n_load, input_streams)) exit(-1);
}

if (cur_size > 0) {
Expand Down
170 changes: 133 additions & 37 deletions compress.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,33 @@

#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <thread>
#include <libdeflate.h>

const size_t compressor_buffer_size = 1ULL << 24;
const double compression_ratio_upper_bound = 5.0; // 1:5
const size_t compressor_buffer_size = 1ULL << 23; // 8M buffer

const size_t bgzf_block_size = 0xff00; // same as htslib
const size_t bgzf_block_header_length = 18;
const size_t bgzf_block_footer_length = 8;

// The annotation below is from htslib
/* BGZF/GZIP header (specialized from RFC 1952; little endian):
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
| 31|139| 8| 4| 0| 0|255| 6| 66| 67| 2|BLK_LEN|
+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
BGZF extension:
^ ^ ^ ^
| | | |
FLG.EXTRA XLEN B C
BGZF format is compatible with GZIP. It limits the size of each compressed
block to 2^16 bytes and adds and an extra "BC" field in the gzip header which
records the size.
*/
const uint8_t bgzf_block_header[19] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\0\0";
const uint8_t bgzf_empty_block[29] = "\037\213\010\4\0\0\0\0\0\377\6\0\102\103\2\0\033\0\3\0\0\0\0\0\0\0\0\0";


// Base class
Expand All @@ -17,22 +39,19 @@ struct Compressor {
virtual void writeToBuffer(char c) {}
virtual void writeToBuffer(const char* data, size_t size) {}
virtual size_t compress() { return 1; } // return number of bytes after compression
virtual void flushOut(FILE* fo, size_t out_size) {} // write outBuffer to file
virtual void flushOut(FILE* fo, size_t out_size, bool last_flush = false) {} // write outBuffer to file
};


struct SingleThreadCompressor: public Compressor {
size_t datsize, bufsize, out_bufsize;
char *inBuffer;
void *outBuffer;
int compression_level;
uint8_t *inBuffer, *outBuffer;
libdeflate_compressor* compressor;
bool is_bgzf;

SingleThreadCompressor(size_t buffer_size = compressor_buffer_size, int compression_level = 6) {
datsize = 0;
bufsize = buffer_size;
out_bufsize = size_t(buffer_size / compression_ratio_upper_bound + 0.5);

inBuffer = (char*)malloc(buffer_size);
SingleThreadCompressor(size_t buffer_size = compressor_buffer_size, int compression_level = 6, bool is_bgzf = false) : datsize(0), bufsize(buffer_size), compression_level(compression_level), is_bgzf(is_bgzf) {
inBuffer = (uint8_t*)malloc(buffer_size);
if (inBuffer == NULL) {
printf("Failed to allocate an inBuffer of size %zu in SingleThreadCompressor!\n", buffer_size);
exit(-1);
Expand All @@ -44,7 +63,20 @@ struct SingleThreadCompressor: public Compressor {
exit(-1);
}

outBuffer = malloc(out_bufsize);
out_bufsize = 0;
if (!is_bgzf) {
out_bufsize = libdeflate_gzip_compress_bound(compressor, buffer_size);
}
else {
size_t block_size = libdeflate_deflate_compress_bound(compressor, bgzf_block_size) + bgzf_block_header_length + bgzf_block_footer_length;
out_bufsize = (buffer_size / bgzf_block_size) * block_size;
size_t rest_size = buffer_size % bgzf_block_size;
if (rest_size > 0) {
out_bufsize += libdeflate_deflate_compress_bound(compressor, rest_size) + bgzf_block_header_length + bgzf_block_footer_length;
}
}

outBuffer = (uint8_t*)malloc(out_bufsize);
if (outBuffer == NULL) {
printf("Failed to allocate an outBuffer of size %zu in SingleThreadCompressor!\n", out_bufsize);
exit(-1);
Expand Down Expand Up @@ -72,37 +104,98 @@ struct SingleThreadCompressor: public Compressor {
}

size_t compress() {
size_t bound, out_size;

if (datsize == 0) return 0;

bound = libdeflate_gzip_compress_bound(compressor, datsize);
size_t out_size = 0;

if (out_bufsize < bound) {
outBuffer = realloc(outBuffer, bound);
if (outBuffer == NULL) {
printf("Failed to realloc outBuffer in compress function, SingleThreadCompressor!\n");
if (datsize == 0) return out_size;

if (!is_bgzf) {
out_size = libdeflate_gzip_compress(compressor, inBuffer, datsize, outBuffer, out_bufsize);
if (out_size == 0) {
printf("Failed to compress inBuffer in compress, SingleThreadCompressor!\n");
exit(-1);
}
out_bufsize = bound;
}

out_size = libdeflate_gzip_compress(compressor, inBuffer, datsize, outBuffer, bound);
if (out_size == 0) {
printf("Failed to compress inBuffer in compress, SingleThreadCompressor!\n");
exit(-1);
else {
size_t in_nbytes = 0, out_block_size = 0, bsize;
uint8_t *in_buff = nullptr, *out_buff = nullptr;
uint32_t crc;

for (size_t i = 0; i < datsize; i += bgzf_block_size) {
in_buff = inBuffer + i;
out_buff = outBuffer + out_size;

in_nbytes = datsize - i;
if (in_nbytes > bgzf_block_size) in_nbytes = bgzf_block_size;

// write header
memcpy(out_buff, bgzf_block_header, bgzf_block_header_length);

if (compression_level == 0) {
out_buff[bgzf_block_header_length] = 1; // BFINAL=1, BTYPE=00; see RFC1951; from htslib
out_buff[bgzf_block_header_length + 1] = in_nbytes & 0xff;
out_buff[bgzf_block_header_length + 2] = (in_nbytes >> 8) & 0xff;
out_buff[bgzf_block_header_length + 3] = (~in_nbytes) & 0xff;
out_buff[bgzf_block_header_length + 4] = ((~in_nbytes) >> 8) & 0xff;
memcpy(out_buff + bgzf_block_header_length + 5, in_buff, in_nbytes);
out_block_size = in_nbytes + 5 + bgzf_block_header_length + bgzf_block_footer_length;
}
else {
out_block_size = libdeflate_deflate_compress(compressor, in_buff, in_nbytes, out_buff + bgzf_block_header_length, out_bufsize - out_size - bgzf_block_header_length);
if (out_block_size == 0) {
printf("Failed to compress BGZF block %d in compress, SingleThreadCompressor!\n", int(i / bgzf_block_size));
exit(-1);
}
out_block_size += bgzf_block_header_length + bgzf_block_footer_length;
}

// fill in BSIZE
bsize = out_block_size - 1;
out_buff[bgzf_block_header_length - 2] = bsize & 0xff;
out_buff[bgzf_block_header_length - 1] = (bsize >> 8) & 0xff;

// writer footer
out_buff += out_block_size - bgzf_block_footer_length;

crc = libdeflate_crc32(0, in_buff, in_nbytes);

out_buff[0] = crc & 0xff;
out_buff[1] = (crc >> 8) & 0xff;
out_buff[2] = (crc >> 16) & 0xff;
out_buff[3] = (crc >> 24) & 0xff;

out_buff[4] = in_nbytes & 0xff;
out_buff[5] = (in_nbytes >> 8) & 0xff;
out_buff[6] = (in_nbytes >> 16) & 0xff;
out_buff[7] = (in_nbytes >> 24) & 0xff;

out_size += out_block_size;
}
}

datsize = 0;

return out_size;
}

void flushOut(FILE* fo, size_t out_size) {
size_t written_size = fwrite(outBuffer, 1, out_size, fo);
if (out_size != written_size) {
printf("Cannot write the full compressed buffer to disk, flushOut, SingleThreadCompressor!\n");
exit(-1);
void flushOut(FILE* fo, size_t out_size, bool last_flush = false) {
size_t written_size;

if (out_size > 0) {
written_size = fwrite(outBuffer, 1, out_size, fo);
if (written_size != out_size) {
printf("Cannot write the full compressed buffer to disk, flushOut, SingleThreadCompressor!\n");
exit(-1);
}
out_size = 0;
}

if (last_flush && is_bgzf) { // write last empty block for BGZF
memcpy(outBuffer, bgzf_empty_block, 28);
written_size = fwrite(outBuffer, 1, 28, fo);
if (written_size != 28) {
printf("Cannot write the empty BGZF block to disk, flushOut, SingleThreadCompressor!\n");
exit(-1);
}
}
}
};
Expand All @@ -117,13 +210,14 @@ struct MultiThreadsCompressor: public Compressor {
SingleThreadCompressor **compressors;
size_t *out_sizes;
std::thread **threads;
bool is_bgzf;

MultiThreadsCompressor(int num_threads, size_t buffer_size = compressor_buffer_size, int compression_level = 6) : num_threads(num_threads), cur_pos(0) {
MultiThreadsCompressor(int num_threads, size_t buffer_size = compressor_buffer_size, int compression_level = 6, bool is_bgzf = false) : num_threads(num_threads), cur_pos(0), is_bgzf(is_bgzf) {
compressors = new SingleThreadCompressor*[num_threads];
out_sizes = new size_t[num_threads]();
threads = new std::thread*[num_threads];
for (int i = 0; i < num_threads; ++i) {
compressors[i] = new SingleThreadCompressor(buffer_size, compression_level);
compressors[i] = new SingleThreadCompressor(buffer_size, compression_level, is_bgzf);
}
}

Expand Down Expand Up @@ -160,9 +254,8 @@ struct MultiThreadsCompressor: public Compressor {
return out_sizes[0];
}

for (int i = 0; i < n_active; ++i) {
for (int i = 0; i < n_active; ++i)
threads[i] = new std::thread(perform_compression, compressors[i], &out_sizes[i]);
}
for (int i = 0; i < n_active; ++i) threads[i]->join();
for (int i = 0; i < n_active; ++i) delete threads[i];

Expand All @@ -172,13 +265,16 @@ struct MultiThreadsCompressor: public Compressor {
return out_size;
}

void flushOut(FILE* fo, size_t out_size) {
void flushOut(FILE* fo, size_t out_size, bool last_flush = false) {
int i = 0;
while (i < num_threads && out_sizes[i] > 0) {
compressors[i]->flushOut(fo, out_sizes[i]);
out_sizes[i] = 0;
++i;
}

if (last_flush && is_bgzf)
compressors[0]->flushOut(fo, 0, true);
}
};

Expand Down
Loading

0 comments on commit b483047

Please sign in to comment.