Skip to content
This repository has been archived by the owner on Dec 27, 2021. It is now read-only.

Commit

Permalink
A bunch of changes to fix biod/sambamba#393
Browse files Browse the repository at this point in the history
  • Loading branch information
pjotrp committed Nov 28, 2019
1 parent c9a165e commit 3f89719
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 118 deletions.
33 changes: 24 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Simple Makefile
#
# make sharedlibrary : make shared library
# make shared : make shared lib
# make lib : make static lib (nyi)
# make check
#
# You can also use 'dub' and 'dub test' instead

D_COMPILER=ldc2
DFLAGS = -wi -g -relocation-model=pic -Icontrib/undead -L-lz
Expand All @@ -17,32 +21,43 @@ endif
DLIBS = $(LIBRARY_PATH)/libphobos2-ldc.a $(LIBRARY_PATH)/libdruntime-ldc.a
DLIBS_DEBUG = $(LIBRARY_PATH)/libphobos2-ldc-debug.a $(LIBRARY_PATH)/libdruntime-ldc-debug.a

SRC = $(wildcard contrib/undead/*.d) contrib/undead/*/*.d $(wildcard bio/*.d bio/*/*.d bio/*/*/*.d bio/*/*/*/*.d bio/*/*/*/*/*.d bio/*/*/*/*/*/*.d) test/unittests.d
SRC = $(wildcard contrib/undead/*.d) contrib/undead/*/*.d $(wildcard bio/*.d bio/*/*.d bio/*/*/*.d bio/*/*/*/*.d bio/*/*/*/*/*.d bio/*/*/*/*/*/*.d) test/unittests.d test/read_bam_file.d

OBJ = $(SRC:.d=.o)
BIN = bin/biod_tests
sharedlibrary: BIN = libbiod.so
shared: LIB = libbiod.so
lib: LIB = libbiod

debug check: DFLAGS += -O0 -d-debug -unittest -link-debuglib
# debug check: DFLAGS += -O0 -d-debug -unittest -link-debuglib
check: DFLAGS += -O0 -d-debug -link-debuglib -unittest
release static: DFLAGS += -O3 -release -enable-inlining -boundscheck=off
static: DFLAGS += -static -L-Bstatic
sharedlibrary: DFLAGS += -shared
shared: DFLAGS += -shared
lib: DFLAGS += -lib

all: debug

default: all

default debug release static sharedlibrary: $(BIN)
default debug release static: $(BIN)
shared lib: $(LIB)

%.o: %.d
$(D_COMPILER) $(DFLAGS) -c $< -od=$(dir $@)

$(LIB): $(OBJ)
$(info linking lib...)
$(D_COMPILER) $(DFLAGS) $(OBJ) -of=$(LIB)

$(BIN): $(OBJ)
$(info linking...)
$(D_COMPILER) -main $(DFLAGS) $(OBJ) -of=$(BIN)
$(D_COMPILER) $(DFLAGS) $(OBJ) -of=$(BIN)

check: $(BIN)
$(info running tests...)
$(BIN) "--DRT-gcopt=gc:precise disable:1 cleanup:none"
$(info Make check running tests...)
$(BIN)

# $(BIN) "--DRT-gcopt=gc:precise disable:1 cleanup:none"

clean:
rm -vf $(OBJ)
Expand Down
6 changes: 6 additions & 0 deletions RELEASE-NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## ChangeLog v0.2.4 (2019xxxx)

+ Fixed dub and make files
+ Dub test still fails because of GC
+ Debian package (thanks https://github.com/atille https://github.com/biod/BioD/issues/50)

## ChangeLog v0.2.3 (20191119)

+ Compiles and tests pass on Debian with dub and ldc 1.17.0
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2.2
0.2.3
45 changes: 32 additions & 13 deletions bio/core/bgzf/block.d
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
module bio.core.bgzf.block;

import bio.std.hts.bam.constants;
import bio.core.utils.memoize;
// import bio.core.utils.memoize;
import bio.core.utils.zlib;

import std.array;
Expand All @@ -35,6 +35,9 @@ import std.exception;
/**
Structure representing BGZF block.
In general, users shouldn't use it, as it is EXTREMELY low-level.
Note it is a struct that has support for comparison based
on its crc32 value.
*/
struct BgzfBlock {
// field types are as in the SAM/BAM specification
Expand Down Expand Up @@ -89,40 +92,54 @@ struct BgzfBlock {
}
}

import std.stdio;

/**
Struct representing decompressed BgzfBlock
Start offset is needed to be able to tell current virtual offset,
and yet be able to decompress blocks in parallel.
*/
struct DecompressedBgzfBlock {
ulong start_offset;
ulong end_offset;
ubyte[] decompressed_data;
/* For the class version:
this(ulong start, ulong end, ubyte[] buf) {
start_offset = start;
end_offset = end;
decompressed_data = buf;
}
~this() {
stderr.writeln("destroy DecompressedBgzfBlock ",start_offset,":",end_offset," ",decompressed_data.sizeof);
};
*/

ulong start_offset;
ulong end_offset;
ubyte[] decompressed_data;
}

///
alias Cache!(BgzfBlock, DecompressedBgzfBlock) BgzfBlockCache;
// alias Cache!(BgzfBlock, DecompressedBgzfBlock) BgzfBlockCache;

/// Function for BGZF block decompression.
/// Reuses buffer allocated for storing compressed data,
/// i.e. after execution buffer of the passed $(D block)
/// is overwritten with uncompressed data.
DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block,
BgzfBlockCache cache=null)
DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block)
{
if (block.input_size == 0) {
return DecompressedBgzfBlock(block.start_offset,
block.start_offset + block.bsize + 1,
cast(ubyte[])[]); // EOF marker
// TODO: add check for correctness of EOF marker
return DecompressedBgzfBlock(block.start_offset,
block.start_offset + block.bsize + 1,
cast(ubyte[])[]); // EOF marker
// TODO: add check for correctness of EOF marker
}

/*
if (cache !is null) {
auto ptr = cache.lookup(block);
if (ptr !is null)
return *ptr;
}
*/

int err = void;

Expand Down Expand Up @@ -169,6 +186,7 @@ DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block,

assert(block.crc32 == crc32(0, uncompressed[]));

/*
if (cache !is null) {
BgzfBlock compressed_bgzf_block = block;
compressed_bgzf_block._buffer = block._buffer.dup;
Expand All @@ -180,6 +198,7 @@ DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block,
}
cache.put(compressed_bgzf_block, decompressed_bgzf_block);
}
*/

// Now copy back to block._buffer, overwriting existing data.
// It should have enough bytes already allocated.
Expand All @@ -192,6 +211,6 @@ DecompressedBgzfBlock decompressBgzfBlock(BgzfBlock block,
block._buffer[0 .. block.input_size] = uncompressed[];
block.dirty = true;

return DecompressedBgzfBlock(block.start_offset, block.end_offset,
block._buffer[0 .. block.input_size]);
auto decompressed = DecompressedBgzfBlock(block.start_offset, block.end_offset, block._buffer[0 .. block.input_size]);
return decompressed;
}
50 changes: 35 additions & 15 deletions bio/core/bgzf/inputstream.d
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ class BgzfException : Exception {
this(string msg) { super(msg); }
}

/*
Called by
randomaccessmanager.d: fillBgzfBufferFromStream(stream, true, &block, buf.ptr);
inputstream.d: auto result = fillBgzfBufferFromStream(_stream, _seekable, block, buffer,
*/

bool fillBgzfBufferFromStream(Stream stream, bool is_seekable,
BgzfBlock* block, ubyte* buffer,
size_t *number_of_bytes_read=null)
Expand Down Expand Up @@ -336,12 +345,13 @@ class StreamChunksSupplier : BgzfBlockSupplier {
}

///
// Provided an uncompressed stream block by class
class BgzfInputStream : Stream {
private {
BgzfBlockSupplier _supplier;
ubyte[] _data;

BgzfBlockCache _cache;
// BgzfBlockCache _cache;

ubyte[] _read_buffer;
VirtualOffset _current_vo;
Expand All @@ -355,24 +365,26 @@ class BgzfInputStream : Stream {
TaskPool _pool;
enum _max_block_size = BGZF_MAX_BLOCK_SIZE * 2;

alias Task!(decompressBgzfBlock, BgzfBlock, BgzfBlockCache)
DecompressionTask;
DecompressionTask[] _task_buf;
alias Task!(decompressBgzfBlock, BgzfBlock) DecompressionTask;
// DecompressionTask[] _task_buf;

// static here means that BlockAux has no access to
// its surrounding scope https://dlang.org/spec/struct.html
static struct BlockAux {
BgzfBlock block;
ushort skip_start;
ushort skip_end;
BgzfBlock block;
ushort skip_start;
ushort skip_end;

DecompressionTask* task;
alias task this;
DecompressionTask* task;
// alias task this; // https://dlang.org/spec/class.html#AliasThis
}

RoundBuf!BlockAux _tasks = void;

size_t _offset;

bool fillNextBlock() {
// Sets up a decompression task and pushes it on the roundbuf _tasks
ubyte* p = _data.ptr + _offset;
BlockAux b = void;
if (_supplier.getNextBgzfBlock(&b.block, p,
Expand All @@ -388,15 +400,21 @@ class BgzfInputStream : Stream {
stderr.writeln("[creating task] ", b.block.start_offset, " / ", b.skip_start, " / ", b.skip_end);
}

/*
DecompressionTask tmp = void;
tmp = scopedTask!decompressBgzfBlock(b.block, _cache);
tmp = scopedTask!decompressBgzfBlock(b.block);
auto t = _task_buf.ptr + _offset / _max_block_size;
import core.stdc.string : memcpy;
memcpy(t, &tmp, DecompressionTask.sizeof);
b.task = t;
_tasks.put(b);
_pool.put(b.task);

*/
// tmp = scopedTask!decompressBgzfBlock(b.block);
auto task = task!decompressBgzfBlock(b.block);
b.task = task;
_tasks.put(b); // _tasks is roundbuf
_pool.put(b.task); // _pool is thread pool
_offset += _max_block_size;
if (_offset == _data.length)
_offset = 0;
Expand Down Expand Up @@ -436,20 +454,22 @@ class BgzfInputStream : Stream {

this(BgzfBlockSupplier supplier,
TaskPool pool=taskPool,
BgzfBlockCache cache=null,
// BgzfBlockCache cache=null,
size_t buffer_size=0)
{
_supplier = supplier;
_compressed_size = _supplier.totalCompressedSize();
_pool = pool;
_cache = cache;
// _cache = cache;

// The roundbuf size (n_tasks) should be at least
// the number of threads
size_t n_tasks = max(pool.size, 1) * 2;
if (buffer_size > 0)
n_tasks = max(n_tasks, buffer_size / BGZF_MAX_BLOCK_SIZE);

// n_tasks is 13 on my machine
_tasks = RoundBuf!BlockAux(n_tasks);
_task_buf = uninitializedArray!(DecompressionTask[])(n_tasks);
// _task_buf = uninitializedArray!(DecompressionTask[])(n_tasks);

This comment has been minimized.

Copy link
@pjotrp

pjotrp Dec 3, 2019

Author Member

This was the main fix


_data = uninitializedArray!(ubyte[])(n_tasks * _max_block_size);

Expand Down
16 changes: 8 additions & 8 deletions bio/core/bgzf/outputstream.d
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
the rights to use, copy, modify, merge, publish, distribute, sublicense,
and/or sell copies of the Software, and to permit persons to whom the
Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand Down Expand Up @@ -67,9 +67,9 @@ class BgzfOutputStream : Stream {

/// Create new BGZF output stream which will use
/// provided $(D task_pool) to do multithreaded compression.
this(Stream output_stream,
int compression_level=-1,
TaskPool task_pool=taskPool,
this(Stream output_stream,
int compression_level=-1,
TaskPool task_pool=taskPool,
size_t buffer_size=0,
size_t max_block_size=BGZF_MAX_BLOCK_SIZE,
size_t block_size=BGZF_BLOCK_SIZE)
Expand Down Expand Up @@ -132,7 +132,7 @@ class BgzfOutputStream : Stream {
}

/// Force flushing current block, even if it is not yet filled.
/// Should be used when it's not desired to have records crossing block borders.
/// Should be used when it's not desired to have records crossing block borders.
void flushCurrentBlock() {

if (_current_size == 0)
Expand Down Expand Up @@ -197,7 +197,7 @@ class BgzfOutputStream : Stream {
writeResult(block);
_compression_tasks.popFront();
}

_stream.flush();
_current_size = 0;
}
Expand All @@ -218,7 +218,7 @@ class BgzfOutputStream : Stream {

/// Adds EOF block. This function is called in close() method.
void addEofBlock() {
_stream.writeExact(BGZF_EOF.ptr, BGZF_EOF.length);
_stream.writeExact(BGZF_EOF.ptr, BGZF_EOF.length);
}
}

Expand Down
Loading

0 comments on commit 3f89719

Please sign in to comment.