Skip to content

Commit

Permalink
Merge pull request #13 from pjreed/9/lz4-compression
Browse files Browse the repository at this point in the history
Adding support for LZ4 compression
  • Loading branch information
pjreed authored Jun 14, 2016
2 parents c273e89 + 1fea8ee commit d6a1c28
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 26 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Java Bag Reader changelog

1.4

- Added a custom fork of jpountz/lz4-java that is interoperable with the C++ library
- Added the ability to deserialize LZ4-compressed chunks
- Added an API method on BagFile to determine a bag's dominant compression method

1.3

- Implementing bulk deserialization of arrays
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Add the following dependency to your Maven pom.xml:
<dependency>
<groupId>com.github.swri-robotics</groupId>
<artifactId>bag-reader-java</artifactId>
<version>1.3</version>
<version>1.4</version>
</dependency>
```

Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<groupId>com.github.swri-robotics</groupId>
<artifactId>bag-reader-java</artifactId>
<packaging>jar</packaging>
<version>1.3</version>
<version>1.4</version>
<name>bag-reader-java</name>
<url>https://github.com/swri-robotics/bag-reader-java</url>
<properties>
Expand Down Expand Up @@ -54,6 +54,11 @@
<version>1.1.5</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.github.swri-robotics</groupId>
<artifactId>lz4</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
72 changes: 65 additions & 7 deletions src/main/java/com/github/swrirobotics/bags/reader/BagFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,63 @@ public List<TopicInfo> getTopics() throws BagReaderException {
return list;
}

/**
* If any chunks are compressed, this will return the most common type of
* compression used in this bag file (either "lz4" or "bz2"). If no
* chunks are compressed, this will return "none".
* This will iterate through all of the Chunk and ChunkInfo records in a
* bag, so it might be a little slow.
* @return The dominant compression type, "bz2" or "lz4", or "none" if there is none.
*/
public String getCompressionType() {
long lz4Count = 0;
long bz2Count = 0;

// First, check all of the Chunk records we read.
for (Chunk chunk : myChunks) {
String compression = chunk.getCompression();
if (compression.equals("bz2")) {
bz2Count++;
}
else if (compression.equals("lz4")) {
lz4Count++;
}
}

// It's possible we may not have actually read any Chunk records, just
// ChunkInfo records that contain the position of Chunks. In that
// case, we have to go look up the chunks and read them.
List<Long> chunkPositions = Lists.newArrayListWithExpectedSize(myChunkInfos.size() );
for (ChunkInfo info : myChunkInfos) {
chunkPositions.add(info.getChunkPos());
}
try (SeekableByteChannel channel = getChannel()) {
for (Long chunkPos : chunkPositions) {
Chunk chunk = new Chunk(recordAt(channel, chunkPos));
String compression = chunk.getCompression();
if (compression.equals("bz2")) {
bz2Count++;
}
else if (compression.equals("lz4")) {
lz4Count++;
}
}
}
catch ( IOException | BagReaderException e ) {
myLogger.warn("Error reading data chunk", e);
}

if (lz4Count > bz2Count) {
return "lz4";
}
else if (bz2Count > lz4Count && bz2Count != 0) {
return "bz2";
}
else {
return "none";
}
}

/**
* Counts how many messages are in this bag file. If this bag file is indexed,
* it counts how many are listed in the indices; otherwise, it iterates through
Expand Down Expand Up @@ -812,16 +869,17 @@ public void read() throws BagReaderException {
* @throws BagReaderException
*/
public void printInfo() throws BagReaderException {
myLogger.info("Version: " + this.getVersion());
myLogger.info("Duration: " + this.getDurationS() + "s");
myLogger.info("Start: " + (this.getStartTime() == null ?
myLogger.info("Version: " + this.getVersion());
myLogger.info("Compression: " + this.getCompressionType());
myLogger.info("Duration: " + this.getDurationS() + "s");
myLogger.info("Start: " + (this.getStartTime() == null ?
"Unknown" : (this.getStartTime().toString() + " (" + this.getStartTime().getTime() + ")")));
myLogger.info("End: " + (this.getEndTime() == null ?
myLogger.info("End: " + (this.getEndTime() == null ?
"Unknown" : (this.getEndTime().toString() + " (" + this.getEndTime().getTime() + ")")));
myLogger.info("Size: " +
myLogger.info("Size: " +
(((double) this.getPath().toFile().length()) / 1024.0) + " MB");
myLogger.info("Messages: " + this.getMessageCount());
myLogger.info("Types: ");
myLogger.info("Messages: " + this.getMessageCount());
myLogger.info("Types:");
for (Map.Entry<String, String> entry : this.getMessageTypes().entries()) {
myLogger.info(" " + entry.getKey() + " \t\t[" + entry.getValue() + "]");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.github.swrirobotics.bags.reader.exceptions.BagReaderException;
import com.github.swrirobotics.bags.reader.records.ChunkInfo;
import com.github.swrirobotics.bags.reader.records.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.channels.SeekableByteChannel;
import java.util.Iterator;
Expand All @@ -51,6 +53,8 @@ public class ChunkRecordIterator implements Iterator<Record> {
private final int myConnId;
private Record myNextRecord = null;

private static final Logger myLogger = LoggerFactory.getLogger(ChunkRecordIterator.class);

/**
* Creates the iterator.
* @param connectionId The ID of the connection to get chunks for.
Expand Down Expand Up @@ -115,6 +119,7 @@ private Record findNext() {
return chunk;
}
catch (BagReaderException e) {
myLogger.warn("Error reading data chunk", e);
return null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@

import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
import com.github.swrirobotics.bags.reader.exceptions.BagReaderException;
import net.jpountz.lz4.LZ4FrameInputStream;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SeekableByteChannel;
Expand Down Expand Up @@ -139,7 +141,8 @@ else if (this.headerLength == 0) {

/**
* Reads the data field of the record. If the record is compressed, this
* will involve decompressing it.
* will involve decompressing it. Currently both bz2 and lz4 compression
* are supported.
* Do not call this method more than once, and do not call it if the channel
* used to create this record has been closed.
* @throws BagReaderException
Expand All @@ -150,32 +153,68 @@ public void readData() throws BagReaderException {
myChannel.position(myDataOffset);
myChannel.read(myData);

// Chunks can have bz2-compressed myData in them, which we need to decompress in order
// to do anything useful with.
if (myHeader.getType() == RecordType.CHUNK && myHeader.getValue("compression").equals("bz2")) {
int decompressedSize = myHeader.getInt("size");
myData.flip();
try (ByteBufferBackedInputStream inStream = new ByteBufferBackedInputStream(myData);
BZip2CompressorInputStream bz2Stream = new BZip2CompressorInputStream(inStream)) {
final byte[] buffer = new byte[decompressedSize];
int n = bz2Stream.read(buffer);
if (n != decompressedSize) {
throw new BagReaderException("Read " + n + " bytes from a " +
"compressed chunk but expected " +
decompressedSize + ".");
}
// Chunks can have bz2 or lz4-compressed myData in them, which we need to
// decompress in order to do anything useful with.
if (myHeader.getType() == RecordType.CHUNK) {
String compression = myHeader.getValue("compression");
switch (compression) {
case "none":
// Do nothing here if not compressed
break;
case "bz2":
case "lz4":
{
int decompressedSize = myHeader.getInt("size");
myData.flip();
try (ByteBufferBackedInputStream inStream = new ByteBufferBackedInputStream(myData);
InputStream compressedStream = openCompressedStream(compression, inStream)) {
final byte[] buffer = new byte[decompressedSize];
int n = compressedStream.read(buffer);
if (n != decompressedSize) {
throw new BagReaderException("Read " + n + " bytes from a " +
"compressed chunk but expected " +
decompressedSize + ".");
}

myData = ByteBuffer.wrap(buffer);
myData = ByteBuffer.wrap(buffer);
}
break;
}
default:
myLogger.warn("Unknown compression format: " + compression);
break;
}
}

myData.order(ByteOrder.LITTLE_ENDIAN);
}
catch (IOException e) {
throw new BagReaderException(e);
}
}

/**
* Opens an input stream that will read compressed data of the specified type
* from a compressed input stream.
* @param compressionType The type of compression; currently supported values
* are "bz2" and "lz4".
* @param inStream An InputStream containing compressed data.
* @return An InputStream that will decompress data from that stream.
* @throws IOException If there was an error opening the stream.
* @throws BagReaderException If the compression type is not supported.
*/
private InputStream openCompressedStream(String compressionType, InputStream inStream)
throws IOException, BagReaderException {
switch (compressionType) {
case "bz2":
return new BZip2CompressorInputStream(inStream);
case "lz4":
return new LZ4FrameInputStream(inStream);
default:
String error = "Unknown compression type: " + compressionType;
throw new BagReaderException(error);
}
}

/**
* If this record represents a connection and {@link #setConnectionHeader(Header)}
* was previously called, returns this connection's header.
Expand Down

0 comments on commit d6a1c28

Please sign in to comment.