diff --git a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java index 23fd5e34..eff16421 100644 --- a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java +++ b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java @@ -43,7 +43,8 @@ */ public class LZ4BlockInputStream extends FilterInputStream { - private final LZ4FastDecompressor decompressor; + private final LZ4FastDecompressor fastDecompressor; + private final LZ4SafeDecompressor safeDecompressor; private final Checksum checksum; private final boolean stopOnEmptyBlock; private byte[] buffer; @@ -56,52 +57,51 @@ public class LZ4BlockInputStream extends FilterInputStream { * Creates a new LZ4 input stream to read from the specified underlying InputStream. * * @param in the {@link InputStream} to poll - * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to + * @param fastDecompressor the {@link LZ4FastDecompressor} instance to * use * @param checksum the {@link Checksum} instance to use, must be * equivalent to the instance which has been used to * write the stream * @param stopOnEmptyBlock whether read is stopped on an empty block + * @deprecated Use {@link #newBuilder()} instead. */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, boolean stopOnEmptyBlock) { - super(in); - this.decompressor = decompressor; - this.checksum = checksum; - this.stopOnEmptyBlock = stopOnEmptyBlock; - this.buffer = new byte[0]; - this.compressedBuffer = new byte[HEADER_LENGTH]; - o = originalLen = 0; - finished = false; + @Deprecated + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum, boolean stopOnEmptyBlock) { + this(in, fastDecompressor, null, checksum, stopOnEmptyBlock); } /** * Creates a new LZ4 input stream to read from the specified underlying InputStream. * * @param in the {@link InputStream} to poll - * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to + * @param fastDecompressor the {@link LZ4FastDecompressor} instance to * use * @param checksum the {@link Checksum} instance to use, must be * equivalent to the instance which has been used to * write the stream * * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean) + * @deprecated Use {@link #newBuilder()} instead. */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) { - this(in, decompressor, checksum, true); + @Deprecated + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, Checksum checksum) { + this(in, fastDecompressor, checksum, true); } /** * Creates a new LZ4 input stream to read from the specified underlying InputStream, using {@link XXHash32} for checksuming. * * @param in the {@link InputStream} to poll - * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to + * @param fastDecompressor the {@link LZ4FastDecompressor} instance to * use * * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean) * @see StreamingXXHash32#asChecksum() + * @deprecated Use {@link #newBuilder()} instead. */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { - this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true); + @Deprecated + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor) { + this(in, fastDecompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true); } /** @@ -113,7 +113,9 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean) * @see LZ4Factory#fastestInstance() * @see StreamingXXHash32#asChecksum() + * @deprecated Use {@link #newBuilder()} instead. */ + @Deprecated public LZ4BlockInputStream(InputStream in, boolean stopOnEmptyBlock) { this(in, LZ4Factory.fastestInstance().fastDecompressor(), XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), stopOnEmptyBlock); } @@ -125,11 +127,54 @@ public LZ4BlockInputStream(InputStream in, boolean stopOnEmptyBlock) { * * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor) * @see LZ4Factory#fastestInstance() + * @deprecated Use {@link #newBuilder()} instead. */ + @Deprecated public LZ4BlockInputStream(InputStream in) { this(in, LZ4Factory.fastestInstance().fastDecompressor()); } + /** + * Creates a new LZ4 input stream to read from the specified underlying InputStream. + * + * @param in the {@link InputStream} to poll + * @param fastDecompressor the {@link LZ4FastDecompressor} instance to + * use + * @param safeDecompressor the {@link LZ4SafeDecompressor} instance to + * use (if both fastDecompressor and safeDecompressor are + * specified then the fastDecompressor gets used) + * @param checksum the {@link Checksum} instance to use, must be + * equivalent to the instance which has been used to + * write the stream + * @param stopOnEmptyBlock whether read is stopped on an empty block + */ + private LZ4BlockInputStream(InputStream in, LZ4FastDecompressor fastDecompressor, LZ4SafeDecompressor safeDecompressor, + Checksum checksum, boolean stopOnEmptyBlock) { + super(in); + + this.fastDecompressor = fastDecompressor; + this.safeDecompressor = safeDecompressor; + this.checksum = checksum; + this.stopOnEmptyBlock = stopOnEmptyBlock; + this.buffer = new byte[0]; + this.compressedBuffer = new byte[HEADER_LENGTH]; + o = originalLen = 0; + finished = false; + } + + /** + * Creates a new LZ4 block input stream builder. The following are defaults: + * + * @return new instance of {@link Builder} to be used to configure and build new LZ4 input stream + */ + public static Builder newBuilder() { + return new Builder(); + } + @Override public int available() throws IOException { return originalLen - o; @@ -213,11 +258,11 @@ private void refill() throws IOException { final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9); assert HEADER_LENGTH == MAGIC_LENGTH + 13; if (originalLen > 1 << compressionLevel - || originalLen < 0 - || compressedLen < 0 - || (originalLen == 0 && compressedLen != 0) - || (originalLen != 0 && compressedLen == 0) - || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { + || originalLen < 0 + || compressedLen < 0 + || (originalLen == 0 && compressedLen != 0) + || (originalLen != 0 && compressedLen == 0) + || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { throw new IOException("Stream is corrupted"); } if (originalLen == 0 && compressedLen == 0) { @@ -235,25 +280,32 @@ private void refill() throws IOException { buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)]; } switch (compressionMethod) { - case COMPRESSION_METHOD_RAW: - readFully(buffer, originalLen); - break; - case COMPRESSION_METHOD_LZ4: - if (compressedBuffer.length < compressedLen) { - compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; - } - readFully(compressedBuffer, compressedLen); - try { - final int compressedLen2 = decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); - if (compressedLen != compressedLen2) { - throw new IOException("Stream is corrupted"); + case COMPRESSION_METHOD_RAW: + readFully(buffer, originalLen); + break; + case COMPRESSION_METHOD_LZ4: + if (compressedBuffer.length < compressedLen) { + compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; } - } catch (LZ4Exception e) { - throw new IOException("Stream is corrupted", e); - } - break; - default: - throw new AssertionError(); + readFully(compressedBuffer, compressedLen); + try { + if (fastDecompressor == null) { + final int decompressedLen = safeDecompressor.decompress(compressedBuffer, 0, compressedLen, buffer, 0, originalLen); + if (decompressedLen != originalLen) { + throw new IOException("Stream is corrupted"); + } + } else { + final int compressedLen2 = fastDecompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); + if (compressedLen != compressedLen2) { + throw new IOException("Stream is corrupted"); + } + } + } catch (LZ4Exception e) { + throw new IOException("Stream is corrupted", e); + } + break; + default: + throw new AssertionError(); } checksum.reset(); checksum.update(buffer, 0, originalLen); @@ -304,7 +356,95 @@ public void reset() throws IOException { @Override public String toString() { return getClass().getSimpleName() + "(in=" + in - + ", decompressor=" + decompressor + ", checksum=" + checksum + ")"; + + ", decompressor=" + (fastDecompressor != null ? fastDecompressor : safeDecompressor) + + ", checksum=" + checksum + ")"; } + /** + * Builder for {@link LZ4BlockInputStream} + */ + public static final class Builder { + private boolean stopOnEmptyBlock = true; + private LZ4FastDecompressor fastDecompressor; + private LZ4SafeDecompressor safeDecompressor; + private Checksum checksum; + + private Builder() { + } + + /** + * Registers value of stopOnEmptyBlock to be used by the builder + * + * @param stopOnEmptyBlock whether read is stopped on an empty block + * @return current builder instance + */ + public Builder withStopOnEmptyBlock(boolean stopOnEmptyBlock) { + this.stopOnEmptyBlock = stopOnEmptyBlock; + return this; + } + + /** + * Registers {@link LZ4FastDecompressor} to be used by the builder as a decompressor. Overrides one set by + * {@link #withDecompressor(LZ4SafeDecompressor)} + * + * @param fastDecompressor the {@link LZ4FastDecompressor} instance to use + * @return current builder instance + */ + public Builder withDecompressor(LZ4FastDecompressor fastDecompressor) { + this.fastDecompressor = fastDecompressor; + this.safeDecompressor = null; + return this; + } + + /** + * Registers {@link LZ4SafeDecompressor} to be used by the builder as a decompressor. Overrides one set by + * {@link #withDecompressor(LZ4FastDecompressor)} + * + * @param safeDecompressor the {@link LZ4SafeDecompressor} instance to use. + * @return current builder instance + */ + public Builder withDecompressor(LZ4SafeDecompressor safeDecompressor) { + this.safeDecompressor = safeDecompressor; + this.fastDecompressor = null; + return this; + } + + /** + * Registers {@link Checksum} to be used by the builder + * + * @param checksum the {@link Checksum} instance to use, must be + * equivalent to the instance which has been used to + * write the stream + * @return current builder instance + */ + public Builder withChecksum(Checksum checksum) { + this.checksum = checksum; + return this; + } + + /** + * Creates a new LZ4 input stream to read from the specified InputStream with specified parameters + * + * @param in the {@link InputStream} to poll + * @return new instance of {@link LZ4BlockInputStream} using parameters set in the builder and provided InputStream + * + * @see #withChecksum(Checksum) + * @see #withDecompressor(LZ4FastDecompressor) + * @see #withDecompressor(LZ4SafeDecompressor) + * @see #withStopOnEmptyBlock(boolean) + */ + public LZ4BlockInputStream build(InputStream in) { + Checksum checksum = this.checksum; + LZ4FastDecompressor fastDecompressor = this.fastDecompressor; + LZ4SafeDecompressor safeDecompressor = this.safeDecompressor; + + if (checksum == null) { + checksum = XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(); + } + if (fastDecompressor == null && safeDecompressor == null) { + safeDecompressor = LZ4Factory.fastestInstance().safeDecompressor(); + } + return new LZ4BlockInputStream(in, fastDecompressor, safeDecompressor, checksum, stopOnEmptyBlock); + } + } } diff --git a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java index a88c35ae..e96a76fd 100644 --- a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java +++ b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java @@ -29,6 +29,7 @@ import java.util.zip.CRC32; import java.util.zip.Checksum; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import net.jpountz.xxhash.XXHashFactory; import org.junit.Test; @@ -38,6 +39,20 @@ public class LZ4BlockStreamingTest extends AbstractLZ4Test { + private final LZ4BlockInputStream.Builder preInitializedBuilder; + + public LZ4BlockStreamingTest(LZ4BlockInputStream.Builder preInitializedBuilder) { + this.preInitializedBuilder = preInitializedBuilder; + } + + @ParametersFactory + public static Iterable preInitializedBuilders() { + return Arrays.asList(new Object[][] { + { LZ4BlockInputStream.newBuilder().withDecompressor(LZ4Factory.fastestInstance().fastDecompressor()) }, + { LZ4BlockInputStream.newBuilder().withDecompressor(LZ4Factory.fastestInstance().safeDecompressor()) } + }); + } + // An input stream that might read less data than it is able to class MockInputStream extends FilterInputStream { @@ -158,55 +173,57 @@ public void testRoundTrip(byte[] data) throws IOException { final ByteArrayOutputStream compressed = new ByteArrayOutputStream(); final int blockSize; switch (randomInt(2)) { - case 0: - blockSize = LZ4BlockOutputStream.MIN_BLOCK_SIZE; - break; - case 1: - blockSize = LZ4BlockOutputStream.MAX_BLOCK_SIZE; - break; - default: - blockSize = randomIntBetween(LZ4BlockOutputStream.MIN_BLOCK_SIZE, LZ4BlockOutputStream.MAX_BLOCK_SIZE); - break; + case 0: + blockSize = LZ4BlockOutputStream.MIN_BLOCK_SIZE; + break; + case 1: + blockSize = LZ4BlockOutputStream.MAX_BLOCK_SIZE; + break; + default: + blockSize = randomIntBetween(LZ4BlockOutputStream.MIN_BLOCK_SIZE, LZ4BlockOutputStream.MAX_BLOCK_SIZE); + break; } final LZ4Compressor compressor = randomBoolean() - ? LZ4Factory.fastestInstance().fastCompressor() - : LZ4Factory.fastestInstance().highCompressor(); + ? LZ4Factory.fastestInstance().fastCompressor() + : LZ4Factory.fastestInstance().highCompressor(); final Checksum checksum; switch (randomInt(2)) { - case 0: - checksum = new Adler32(); - break; - case 1: - checksum = new CRC32(); - break; - default: - checksum = XXHashFactory.fastestInstance().newStreamingHash32(randomInt()).asChecksum(); - break; + case 0: + checksum = new Adler32(); + break; + case 1: + checksum = new CRC32(); + break; + default: + checksum = XXHashFactory.fastestInstance().newStreamingHash32(randomInt()).asChecksum(); + break; } final boolean syncFlush = randomBoolean(); final LZ4BlockOutputStream os = new LZ4BlockOutputStream(wrap(compressed), blockSize, compressor, checksum, syncFlush); final int half = data.length / 2; switch (randomInt(2)) { - case 0: - os.write(data, 0, half); - for (int i = half; i < data.length; ++i) { - os.write(data[i]); - } - break; - case 1: - for (int i = 0; i < half; ++i) { - os.write(data[i]); - } - os.write(data, half, data.length - half); - break; - case 2: - os.write(data, 0, data.length); - break; + case 0: + os.write(data, 0, half); + for (int i = half; i < data.length; ++i) { + os.write(data[i]); + } + break; + case 1: + for (int i = 0; i < half; ++i) { + os.write(data[i]); + } + os.write(data, half, data.length - half); + break; + case 2: + os.write(data, 0, data.length); + break; } os.close(); - final LZ4FastDecompressor decompressor = LZ4Factory.fastestInstance().fastDecompressor(); - InputStream is = new LZ4BlockInputStream(open(compressed.toByteArray()), decompressor, checksum); + final LZ4BlockInputStream.Builder builder = preInitializedBuilder + .withStopOnEmptyBlock(true) + .withChecksum(checksum); + InputStream is = builder.build(open(compressed.toByteArray())); assertFalse(is.markSupported()); try { is.mark(1); @@ -241,7 +258,7 @@ public void testRoundTrip(byte[] data) throws IOException { // test skip final int offset = data.length <= 1 ? 0 : randomInt(data.length - 1); final int length = randomInt(data.length - offset); - is = new LZ4BlockInputStream(open(compressed.toByteArray()), decompressor, checksum); + is = preInitializedBuilder.build(open(compressed.toByteArray())); restored = new byte[length + 1000]; read = 0; while (read < offset) { @@ -284,7 +301,10 @@ public void testDoubleClose() throws IOException { out.close(); out.close(); - LZ4BlockInputStream in = new LZ4BlockInputStream(new ByteArrayInputStream(bytes.toByteArray())); + LZ4BlockInputStream in = preInitializedBuilder + .withChecksum(null) + .withStopOnEmptyBlock(true) + .build(new ByteArrayInputStream(bytes.toByteArray())); byte[] actual = new byte[testBytes.length]; in.read(actual); @@ -331,14 +351,19 @@ public void testConcatenationOfSerializedStreams() throws IOException { System.arraycopy(bytes2, 0, concatenatedBytes, bytes1.length, bytes2.length); // In a default behaviour, we can read the first block of the concatenated bytes only - LZ4BlockInputStream in1 = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes)); + LZ4BlockInputStream in1 = preInitializedBuilder + .withChecksum(null) + .withStopOnEmptyBlock(true) + .build(new ByteArrayInputStream(concatenatedBytes)); byte[] actual1 = new byte[128]; assertEquals(64, readFully(in1, actual1)); assertEquals(-1, in1.read()); in1.close(); // Check if we can read concatenated byte stream - LZ4BlockInputStream in2 = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes), false); + LZ4BlockInputStream in2 = preInitializedBuilder + .withStopOnEmptyBlock(false) + .build(new ByteArrayInputStream(concatenatedBytes)); byte[] actual2 = new byte[128]; assertEquals(128, readFully(in2, actual2)); assertEquals(-1, in2.read());