Skip to content

Commit

Permalink
Merge pull request #14 from pjreed/reader-locks
Browse files Browse the repository at this point in the history
Making some changes to fix a bug in the bag database
  • Loading branch information
pjreed authored Jun 20, 2016
2 parents d6a1c28 + e4b2b20 commit 2735de3
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Java Bag Reader changelog

1.5

- Acquiring shared locks when reading bag files
- Logging warnings and throwing an exception if it fails to read the expected number of chunks and connections

1.4

- Added a custom fork of jpountz/lz4-java that is interoperable with the C++ library
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<groupId>com.github.swri-robotics</groupId>
<artifactId>bag-reader-java</artifactId>
<packaging>jar</packaging>
<version>1.4</version>
<version>1.5</version>
<name>bag-reader-java</name>
<url>https://github.com/swri-robotics/bag-reader-java</url>
<properties>
Expand Down
52 changes: 42 additions & 10 deletions src/main/java/com/github/swrirobotics/bags/reader/BagFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -123,7 +124,7 @@ public BagFile(String filePath) {
* @return An open SeekableByteChannel.
* @throws IOException If there is an error opening the file.
*/
public SeekableByteChannel getChannel() throws IOException {
public FileChannel getChannel() throws IOException {
return FileChannel.open(getPath(), StandardOpenOption.READ);
}

Expand Down Expand Up @@ -273,7 +274,8 @@ public Connection findFirstConnectionByMd5Sum(String msgMd5Sum) {
*/
public void forFirstTopicWithMessagesOfType(String messageType, MessageHandler handler) throws BagReaderException {
for (Connection conn : myConnectionsByType.get(messageType)) {
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel);

while (iter.hasNext()) {
Expand Down Expand Up @@ -304,7 +306,8 @@ public void forFirstTopicWithMessagesOfType(String messageType, MessageHandler h
*/
public void forMessagesOfType(String messageType, MessageHandler handler) throws BagReaderException {
for (Connection conn : myConnectionsByType.get(messageType)) {
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel);

while (iter.hasNext()) {
Expand Down Expand Up @@ -332,7 +335,8 @@ public void forMessagesOfType(String messageType, MessageHandler handler) throws
*/
public void forMessagesOnTopic(String topic, MessageHandler handler) throws BagReaderException {
for (Connection conn : myConnectionsByTopic.get(topic)) {
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel);

while (iter.hasNext()) {
Expand All @@ -356,7 +360,8 @@ public void forMessagesOnTopic(String topic, MessageHandler handler) throws BagR
*/
public MessageType getFirstMessageOfType(String messageType) throws BagReaderException {
for (Connection conn : myConnectionsByType.get(messageType)) {
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel);
if (iter.hasNext()) {
return iter.next();
Expand All @@ -382,7 +387,8 @@ public MessageType getFirstMessageOfType(String messageType) throws BagReaderExc
*/
public MessageType getFirstMessageOnTopic(String topic) throws BagReaderException {
for (Connection conn : myConnectionsByTopic.get(topic)) {
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel);
if (iter.hasNext()) {
return iter.next();
Expand Down Expand Up @@ -500,7 +506,8 @@ else if (compression.equals("lz4")) {
for (ChunkInfo info : myChunkInfos) {
chunkPositions.add(info.getChunkPos());
}
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
for (Long chunkPos : chunkPositions) {
Chunk chunk = new Chunk(recordAt(channel, chunkPos));
String compression = chunk.getCompression();
Expand Down Expand Up @@ -590,7 +597,8 @@ public MessageType getMessageOnTopicAtIndex(String topic,
throw new BagReaderException(e);
}

try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
MessageIndex msgIndex = indexes.get(index);
Record record = BagFile.recordAt(channel, msgIndex.fileIndex);
ByteBufferChannel chunkChannel = new ByteBufferChannel(record.getData());
Expand All @@ -617,7 +625,8 @@ public MessageType getMessageOnTopicAtIndex(String topic,
*/
private void generateIndexesForTopic(String topic) throws BagReaderException {
List<MessageIndex> msgIndexes = Lists.newArrayList();
try (SeekableByteChannel channel = getChannel()) {
try (FileChannel channel = getChannel()) {
FileLock lock = channel.lock(0, Long.MAX_VALUE, true);
for (Connection conn : myConnectionsByTopic.get(topic)) {
for (ChunkInfo chunkInfo : myChunkInfosByConnectionId.get(conn.getConnectionId())) {
long chunkPos = chunkInfo.getChunkPos();
Expand Down Expand Up @@ -797,7 +806,8 @@ public void read() throws BagReaderException {
return;
}

try (SeekableByteChannel input = getChannel()){
try (FileChannel input = getChannel()){
FileLock lock = input.lock(0, Long.MAX_VALUE, true);
verifyBagFile(input);

while (hasNext(input)) {
Expand Down Expand Up @@ -857,6 +867,28 @@ public void read() throws BagReaderException {
}
else {
myLogger.warn("No chunk info records found; start and end time are unknown.");

myLogger.warn("Record type counts:" +
"\n Header: " + (this.getBagHeader() == null ? 0 : 1) +
"\n Chunk: " + this.getChunks().size() +
"\n Connection: " + this.getConnections().size() +
"\n Message Data: " + this.getMessages().size() +
"\n Index Data: " + this.getIndexes().size() +
"\n Chunk Info: " + this.getChunkInfos().size());
}

if (this.getBagHeader().getChunkCount() !=
(this.getChunks().size() + this.getChunkInfos().size()) ||
this.getBagHeader().getConnCount() != this.getConnections().size()) {
String errorMsg = "Expected " + this.getBagHeader().getChunkCount() +
" chunks and " + this.getBagHeader().getConnCount() +
" connections, but got " +
(this.getChunks().size() + this.getChunkInfos().size()) + " and " +
this.getConnections().size() + ". This is ok if the file " +
"is in the middle of being written to disk, otherwise the bag " +
"may be corrupt.";
myLogger.warn(errorMsg);
throw new BagReaderException(errorMsg);
}
}

Expand Down

0 comments on commit 2735de3

Please sign in to comment.