From d77d7cbe82e088e993ec7f8dc41fa5acce48d2f3 Mon Sep 17 00:00:00 2001 From: "P. J. Reed" Date: Tue, 14 Jun 2016 13:49:12 -0500 Subject: [PATCH 1/2] Adding support for LZ4 compression - Added a custom fork of jpountz/lz4-java that is interoperable with the C++ library - Added the ability to deserialize LZ4-compressed chunks - Added an API method on BagFile to determine a bag's dominant compression method - Releasing version 1.4 --- CHANGELOG.md | 6 ++ pom.xml | 7 +- .../swrirobotics/bags/reader/BagFile.java | 72 ++++++++++++++++-- .../reader/records/ChunkRecordIterator.java | 5 ++ .../bags/reader/records/Record.java | 73 ++++++++++++++----- 5 files changed, 138 insertions(+), 25 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b68729..1a69f61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,11 @@ # Java Bag Reader changelog +1.4 + +- Added a custom fork of jpountz/lz4-java that is interoperable with the C++ library +- Added the ability to deserialize LZ4-compressed chunks +- Added an API method on BagFile to determine a bag's dominant compression method + 1.3 - Implementing bulk deserialization of arrays diff --git a/pom.xml b/pom.xml index 6fbdf27..e85f878 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.github.swri-robotics bag-reader-java jar - 1.3 + 1.4 bag-reader-java https://github.com/swri-robotics/bag-reader-java @@ -54,6 +54,11 @@ 1.1.5 runtime + + com.github.swri-robotics + lz4 + 1.4.0 + diff --git a/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java b/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java index 3b1f7a5..87c5815 100644 --- a/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java +++ b/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java @@ -470,6 +470,63 @@ public List getTopics() throws BagReaderException { return list; } + /** + * If any chunks are compressed, this will return the most common type of + * compression used in this bag file (either "lz4" or "bz2"). If no + * chunks are compressed, this will return "none". + * This will iterate through all of the Chunk and ChunkInfo records in a + * bag, so it might be a little slow. + * @return The dominant compression type, "bz2" or "lz4", or "none" if there is none. + */ + public String getCompressionType() { + long lz4Count = 0; + long bz2Count = 0; + + // First, check all of the Chunk records we read. + for (Chunk chunk : myChunks) { + String compression = chunk.getCompression(); + if (compression.equals("bz2")) { + bz2Count++; + } + else if (compression.equals("lz4")) { + lz4Count++; + } + } + + // It's possible we may not have actually read any Chunk records, just + // ChunkInfo records that contain the position of Chunks. In that + // case, we have to go look up the chunks and read them. + List chunkPositions = Lists.newArrayListWithExpectedSize(myChunkInfos.size() ); + for (ChunkInfo info : myChunkInfos) { + chunkPositions.add(info.getChunkPos()); + } + try (SeekableByteChannel channel = getChannel()) { + for (Long chunkPos : chunkPositions) { + Chunk chunk = new Chunk(recordAt(channel, chunkPos)); + String compression = chunk.getCompression(); + if (compression.equals("bz2")) { + bz2Count++; + } + else if (compression.equals("lz4")) { + lz4Count++; + } + } + } + catch ( IOException | BagReaderException e ) { + myLogger.warn("Error reading data chunk", e); + } + + if (lz4Count > bz2Count) { + return "lz4"; + } + else if (bz2Count > lz4Count && bz2Count != 0) { + return "bz2"; + } + else { + return "none"; + } + } + /** * Counts how many messages are in this bag file. If this bag file is indexed, * it counts how many are listed in the indices; otherwise, it iterates through @@ -812,16 +869,17 @@ public void read() throws BagReaderException { * @throws BagReaderException */ public void printInfo() throws BagReaderException { - myLogger.info("Version: " + this.getVersion()); - myLogger.info("Duration: " + this.getDurationS() + "s"); - myLogger.info("Start: " + (this.getStartTime() == null ? + myLogger.info("Version: " + this.getVersion()); + myLogger.info("Compression: " + this.getCompressionType()); + myLogger.info("Duration: " + this.getDurationS() + "s"); + myLogger.info("Start: " + (this.getStartTime() == null ? "Unknown" : (this.getStartTime().toString() + " (" + this.getStartTime().getTime() + ")"))); - myLogger.info("End: " + (this.getEndTime() == null ? + myLogger.info("End: " + (this.getEndTime() == null ? "Unknown" : (this.getEndTime().toString() + " (" + this.getEndTime().getTime() + ")"))); - myLogger.info("Size: " + + myLogger.info("Size: " + (((double) this.getPath().toFile().length()) / 1024.0) + " MB"); - myLogger.info("Messages: " + this.getMessageCount()); - myLogger.info("Types: "); + myLogger.info("Messages: " + this.getMessageCount()); + myLogger.info("Types:"); for (Map.Entry entry : this.getMessageTypes().entries()) { myLogger.info(" " + entry.getKey() + " \t\t[" + entry.getValue() + "]"); } diff --git a/src/main/java/com/github/swrirobotics/bags/reader/records/ChunkRecordIterator.java b/src/main/java/com/github/swrirobotics/bags/reader/records/ChunkRecordIterator.java index 925958f..7d3d181 100644 --- a/src/main/java/com/github/swrirobotics/bags/reader/records/ChunkRecordIterator.java +++ b/src/main/java/com/github/swrirobotics/bags/reader/records/ChunkRecordIterator.java @@ -34,6 +34,8 @@ import com.github.swrirobotics.bags.reader.exceptions.BagReaderException; import com.github.swrirobotics.bags.reader.records.ChunkInfo; import com.github.swrirobotics.bags.reader.records.Record; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.channels.SeekableByteChannel; import java.util.Iterator; @@ -51,6 +53,8 @@ public class ChunkRecordIterator implements Iterator { private final int myConnId; private Record myNextRecord = null; + private static final Logger myLogger = LoggerFactory.getLogger(ChunkRecordIterator.class); + /** * Creates the iterator. * @param connectionId The ID of the connection to get chunks for. @@ -115,6 +119,7 @@ private Record findNext() { return chunk; } catch (BagReaderException e) { + myLogger.warn("Error reading data chunk", e); return null; } } diff --git a/src/main/java/com/github/swrirobotics/bags/reader/records/Record.java b/src/main/java/com/github/swrirobotics/bags/reader/records/Record.java index f3d36ac..eec303d 100644 --- a/src/main/java/com/github/swrirobotics/bags/reader/records/Record.java +++ b/src/main/java/com/github/swrirobotics/bags/reader/records/Record.java @@ -32,11 +32,13 @@ import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; import com.github.swrirobotics.bags.reader.exceptions.BagReaderException; +import net.jpountz.lz4.LZ4FrameInputStream; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.SeekableByteChannel; @@ -139,7 +141,8 @@ else if (this.headerLength == 0) { /** * Reads the data field of the record. If the record is compressed, this - * will involve decompressing it. + * will involve decompressing it. Currently both bz2 and lz4 compression + * are supported. * Do not call this method more than once, and do not call it if the channel * used to create this record has been closed. * @throws BagReaderException @@ -150,25 +153,38 @@ public void readData() throws BagReaderException { myChannel.position(myDataOffset); myChannel.read(myData); - // Chunks can have bz2-compressed myData in them, which we need to decompress in order - // to do anything useful with. - if (myHeader.getType() == RecordType.CHUNK && myHeader.getValue("compression").equals("bz2")) { - int decompressedSize = myHeader.getInt("size"); - myData.flip(); - try (ByteBufferBackedInputStream inStream = new ByteBufferBackedInputStream(myData); - BZip2CompressorInputStream bz2Stream = new BZip2CompressorInputStream(inStream)) { - final byte[] buffer = new byte[decompressedSize]; - int n = bz2Stream.read(buffer); - if (n != decompressedSize) { - throw new BagReaderException("Read " + n + " bytes from a " + - "compressed chunk but expected " + - decompressedSize + "."); - } + // Chunks can have bz2 or lz4-compressed myData in them, which we need to + // decompress in order to do anything useful with. + if (myHeader.getType() == RecordType.CHUNK) { + String compression = myHeader.getValue("compression"); + switch (compression) { + case "none": + // Do nothing here if not compressed + break; + case "bz2": + case "lz4": + { + int decompressedSize = myHeader.getInt("size"); + myData.flip(); + try (ByteBufferBackedInputStream inStream = new ByteBufferBackedInputStream(myData); + InputStream compressedStream = openCompressedStream(compression, inStream)) { + final byte[] buffer = new byte[decompressedSize]; + int n = compressedStream.read(buffer); + if (n != decompressedSize) { + throw new BagReaderException("Read " + n + " bytes from a " + + "compressed chunk but expected " + + decompressedSize + "."); + } - myData = ByteBuffer.wrap(buffer); + myData = ByteBuffer.wrap(buffer); + } + break; + } + default: + myLogger.warn("Unknown compression format: " + compression); + break; } } - myData.order(ByteOrder.LITTLE_ENDIAN); } catch (IOException e) { @@ -176,6 +192,29 @@ public void readData() throws BagReaderException { } } + /** + * Opens an input stream that will read compressed data of the specified type + * from a compressed input stream. + * @param compressionType The type of compression; currently supported values + * are "bz2" and "lz4". + * @param inStream An InputStream containing compressed data. + * @return An InputStream that will decompress data from that stream. + * @throws IOException If there was an error opening the stream. + * @throws BagReaderException If the compression type is not supported. + */ + private InputStream openCompressedStream(String compressionType, InputStream inStream) + throws IOException, BagReaderException { + switch (compressionType) { + case "bz2": + return new BZip2CompressorInputStream(inStream); + case "lz4": + return new LZ4FrameInputStream(inStream); + default: + String error = "Unknown compression type: " + compressionType; + throw new BagReaderException(error); + } + } + /** * If this record represents a connection and {@link #setConnectionHeader(Header)} * was previously called, returns this connection's header. From 1fea8ee50e6167dcaac14a3600ed4f0c70b13f1e Mon Sep 17 00:00:00 2001 From: "P. J. Reed" Date: Tue, 14 Jun 2016 13:58:24 -0500 Subject: [PATCH 2/2] Updating the version number in the README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b61f128..526a7eb 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Add the following dependency to your Maven pom.xml: com.github.swri-robotics bag-reader-java - 1.3 + 1.4 ```