Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 181 additions & 41 deletions src/java/net/jpountz/lz4/LZ4BlockInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
*/
public class LZ4BlockInputStream extends FilterInputStream {

private final LZ4FastDecompressor decompressor;
private final LZ4FastDecompressor fastDecompressor;
private final LZ4SafeDecompressor safeDecompressor;
private final Checksum checksum;
private final boolean stopOnEmptyBlock;
private byte[] buffer;
Expand All @@ -56,52 +57,51 @@ public class LZ4BlockInputStream extends FilterInputStream {
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
*
* @param in the {@link InputStream} to poll
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
* use
* @param checksum the {@link Checksum} instance to use, must be
* equivalent to the instance which has been used to
* write the stream
* @param stopOnEmptyBlock whether read is stopped on an empty block
* @deprecated Use {@link #newBuilder()} instead.
*/
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, boolean stopOnEmptyBlock) {
super(in);
this.decompressor = decompressor;
this.checksum = checksum;
this.stopOnEmptyBlock = stopOnEmptyBlock;
this.buffer = new byte[0];
this.compressedBuffer = new byte[HEADER_LENGTH];
o = originalLen = 0;
finished = false;
@Deprecated
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum, boolean stopOnEmptyBlock) {
this(in, fastDecompressor, null, checksum, stopOnEmptyBlock);
}

/**
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
*
* @param in the {@link InputStream} to poll
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
* use
* @param checksum the {@link Checksum} instance to use, must be
* equivalent to the instance which has been used to
* write the stream
*
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean)
* @deprecated Use {@link #newBuilder()} instead.
*/
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) {
this(in, decompressor, checksum, true);
@Deprecated
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum) {
this(in, fastDecompressor, checksum, true);
}

/**
* Creates a new LZ4 input stream to read from the specified underlying InputStream, using {@link XXHash32} for checksuming.
*
* @param in the {@link InputStream} to poll
* @param decompressor the {@link LZ4FastDecompressor decompressor} instance to
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
* use
*
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean)
* @see StreamingXXHash32#asChecksum()
* @deprecated Use {@link #newBuilder()} instead.
*/
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true);
@Deprecated
public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor) {
this(in, fastDecompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true);
}

/**
Expand All @@ -113,7 +113,9 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) {
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean)
* @see LZ4Factory#fastestInstance()
* @see StreamingXXHash32#asChecksum()
* @deprecated Use {@link #newBuilder()} instead.
*/
@Deprecated
public LZ4BlockInputStream(InputStream in, boolean stopOnEmptyBlock) {
this(in, LZ4Factory.fastestInstance().fastDecompressor(), XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), stopOnEmptyBlock);
}
Expand All @@ -125,11 +127,54 @@ public LZ4BlockInputStream(InputStream in, boolean stopOnEmptyBlock) {
*
* @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor)
* @see LZ4Factory#fastestInstance()
* @deprecated Use {@link #newBuilder()} instead.
*/
@Deprecated
public LZ4BlockInputStream(InputStream in) {
this(in, LZ4Factory.fastestInstance().fastDecompressor());
}

/**
* Creates a new LZ4 input stream to read from the specified underlying InputStream.
*
* @param in the {@link InputStream} to poll
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to
* use
* @param safeDecompressor the {@link LZ4SafeDecompressor} instance to
* use (if both fastDecompressor and safeDecompressor are
* specified then the fastDecompressor gets used)
* @param checksum the {@link Checksum} instance to use, must be
* equivalent to the instance which has been used to
* write the stream
* @param stopOnEmptyBlock whether read is stopped on an empty block
*/
private LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, LZ4SafeDecompressor safeDecompressor,
Checksum checksum, boolean stopOnEmptyBlock) {
super(in);

this.fastDecompressor = fastDecompressor;
this.safeDecompressor = safeDecompressor;
this.checksum = checksum;
this.stopOnEmptyBlock = stopOnEmptyBlock;
this.buffer = new byte[0];
this.compressedBuffer = new byte[HEADER_LENGTH];
o = originalLen = 0;
finished = false;
}

/**
* Creates a new LZ4 block input stream builder. The following are defaults:
* <ul>
* <li> decompressor - {@code LZ4Factory.fastestInstance().safeDecompressor()} </li>
* <li> checksum - {@link XXHash32} </li>
* <li> stopOnEmptyBlock - {@code true} </li>
* </ul>
* @return new instance of {@link Builder} to be used to configure and build new LZ4 input stream
*/
public static Builder newBuilder() {
return new Builder();
}

@Override
public int available() throws IOException {
return originalLen - o;
Expand Down Expand Up @@ -213,11 +258,11 @@ private void refill() throws IOException {
final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9);
assert HEADER_LENGTH == MAGIC_LENGTH + 13;
if (originalLen > 1 << compressionLevel
|| originalLen < 0
|| compressedLen < 0
|| (originalLen == 0 && compressedLen != 0)
|| (originalLen != 0 && compressedLen == 0)
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
|| originalLen < 0
|| compressedLen < 0
|| (originalLen == 0 && compressedLen != 0)
|| (originalLen != 0 && compressedLen == 0)
|| (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) {
throw new IOException("Stream is corrupted");
}
if (originalLen == 0 && compressedLen == 0) {
Expand All @@ -235,25 +280,32 @@ private void refill() throws IOException {
buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)];
}
switch (compressionMethod) {
case COMPRESSION_METHOD_RAW:
readFully(buffer, originalLen);
break;
case COMPRESSION_METHOD_LZ4:
if (compressedBuffer.length < compressedLen) {
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
}
readFully(compressedBuffer, compressedLen);
try {
final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
if (compressedLen != compressedLen2) {
throw new IOException("Stream is corrupted");
case COMPRESSION_METHOD_RAW:
readFully(buffer, originalLen);
break;
case COMPRESSION_METHOD_LZ4:
if (compressedBuffer.length < compressedLen) {
compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)];
}
} catch (LZ4Exception e) {
throw new IOException("Stream is corrupted", e);
}
break;
default:
throw new AssertionError();
readFully(compressedBuffer, compressedLen);
try {
if (fastDecompressor == null) {
final int decompressedLen = safeDecompressor.decompress(compressedBuffer, 0, compressedLen, buffer, 0, originalLen);
if (decompressedLen != originalLen) {
throw new IOException("Stream is corrupted");
}
} else {
final int compressedLen2 = fastDecompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen);
if (compressedLen != compressedLen2) {
throw new IOException("Stream is corrupted");
}
}
} catch (LZ4Exception e) {
throw new IOException("Stream is corrupted", e);
}
break;
default:
throw new AssertionError();
}
checksum.reset();
checksum.update(buffer, 0, originalLen);
Expand Down Expand Up @@ -304,7 +356,95 @@ public void reset() throws IOException {
@Override
public String toString() {
return getClass().getSimpleName() + "(in=" + in
+ ", decompressor=" + decompressor + ", checksum=" + checksum + ")";
+ ", decompressor=" + (fastDecompressor != null ? fastDecompressor : safeDecompressor)
+ ", checksum=" + checksum + ")";
}

/**
* Builder for {@link LZ4BlockInputStream}
*/
public static final class Builder {
private boolean stopOnEmptyBlock = true;
private LZ4FastDecompressor fastDecompressor;
private LZ4SafeDecompressor safeDecompressor;
private Checksum checksum;

private Builder() {
}

/**
* Registers value of stopOnEmptyBlock to be used by the builder
*
* @param stopOnEmptyBlock whether read is stopped on an empty block
* @return current builder instance
*/
public Builder withStopOnEmptyBlock(boolean stopOnEmptyBlock) {
this.stopOnEmptyBlock = stopOnEmptyBlock;
return this;
}

/**
* Registers {@link LZ4FastDecompressor} to be used by the builder as a decompressor. Overrides one set by
* {@link #withDecompressor(LZ4SafeDecompressor)}
*
* @param fastDecompressor the {@link LZ4FastDecompressor} instance to use
* @return current builder instance
*/
public Builder withDecompressor(LZ4FastDecompressor fastDecompressor) {
this.fastDecompressor = fastDecompressor;
this.safeDecompressor = null;
return this;
}

/**
* Registers {@link LZ4SafeDecompressor} to be used by the builder as a decompressor. Overrides one set by
* {@link #withDecompressor(LZ4FastDecompressor)}
*
* @param safeDecompressor the {@link LZ4SafeDecompressor} instance to use.
* @return current builder instance
*/
public Builder withDecompressor(LZ4SafeDecompressor safeDecompressor) {
this.safeDecompressor = safeDecompressor;
this.fastDecompressor = null;
return this;
}

/**
* Registers {@link Checksum} to be used by the builder
*
* @param checksum the {@link Checksum} instance to use, must be
* equivalent to the instance which has been used to
* write the stream
* @return current builder instance
*/
public Builder withChecksum(Checksum checksum) {
this.checksum = checksum;
return this;
}

/**
* Creates a new LZ4 input stream to read from the specified InputStream with specified parameters
*
* @param in the {@link InputStream} to poll
* @return new instance of {@link LZ4BlockInputStream} using parameters set in the builder and provided InputStream
*
* @see #withChecksum(Checksum)
* @see #withDecompressor(LZ4FastDecompressor)
* @see #withDecompressor(LZ4SafeDecompressor)
* @see #withStopOnEmptyBlock(boolean)
*/
public LZ4BlockInputStream build(InputStream in) {
Checksum checksum = this.checksum;
LZ4FastDecompressor fastDecompressor = this.fastDecompressor;
LZ4SafeDecompressor safeDecompressor = this.safeDecompressor;

if (checksum == null) {
checksum = XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum();
}
if (fastDecompressor == null && safeDecompressor == null) {
safeDecompressor = LZ4Factory.fastestInstance().safeDecompressor();
}
return new LZ4BlockInputStream(in, fastDecompressor, safeDecompressor, checksum, stopOnEmptyBlock);
}
}
}
Loading