From e4b2b20b9a1e2b79e22d084a2e9fd26620112dbb Mon Sep 17 00:00:00 2001 From: "P. J. Reed" Date: Mon, 20 Jun 2016 13:11:07 -0500 Subject: [PATCH] Making some changes to fix a bug in the bag database - Acquiring shared locks when reading bag files - Logging warnings and throwing an exception if it fails to read the expected number of chunks and connections --- CHANGELOG.md | 5 ++ pom.xml | 2 +- .../swrirobotics/bags/reader/BagFile.java | 52 +++++++++++++++---- 3 files changed, 48 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a69f61..d3bd979 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Java Bag Reader changelog +1.5 + +- Acquiring shared locks when reading bag files +- Logging warnings and throwing an exception if it fails to read the expected number of chunks and connections + 1.4 - Added a custom fork of jpountz/lz4-java that is interoperable with the C++ library diff --git a/pom.xml b/pom.xml index e85f878..210bb68 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ com.github.swri-robotics bag-reader-java jar - 1.4 + 1.5 bag-reader-java https://github.com/swri-robotics/bag-reader-java diff --git a/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java b/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java index 87c5815..4451ef9 100644 --- a/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java +++ b/src/main/java/com/github/swrirobotics/bags/reader/BagFile.java @@ -50,6 +50,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; import java.nio.channels.ReadableByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.file.FileSystems; @@ -123,7 +124,7 @@ public BagFile(String filePath) { * @return An open SeekableByteChannel. * @throws IOException If there is an error opening the file. */ - public SeekableByteChannel getChannel() throws IOException { + public FileChannel getChannel() throws IOException { return FileChannel.open(getPath(), StandardOpenOption.READ); } @@ -273,7 +274,8 @@ public Connection findFirstConnectionByMd5Sum(String msgMd5Sum) { */ public void forFirstTopicWithMessagesOfType(String messageType, MessageHandler handler) throws BagReaderException { for (Connection conn : myConnectionsByType.get(messageType)) { - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel); while (iter.hasNext()) { @@ -304,7 +306,8 @@ public void forFirstTopicWithMessagesOfType(String messageType, MessageHandler h */ public void forMessagesOfType(String messageType, MessageHandler handler) throws BagReaderException { for (Connection conn : myConnectionsByType.get(messageType)) { - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel); while (iter.hasNext()) { @@ -332,7 +335,8 @@ public void forMessagesOfType(String messageType, MessageHandler handler) throws */ public void forMessagesOnTopic(String topic, MessageHandler handler) throws BagReaderException { for (Connection conn : myConnectionsByTopic.get(topic)) { - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel); while (iter.hasNext()) { @@ -356,7 +360,8 @@ public void forMessagesOnTopic(String topic, MessageHandler handler) throws BagR */ public MessageType getFirstMessageOfType(String messageType) throws BagReaderException { for (Connection conn : myConnectionsByType.get(messageType)) { - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel); if (iter.hasNext()) { return iter.next(); @@ -382,7 +387,8 @@ public MessageType getFirstMessageOfType(String messageType) throws BagReaderExc */ public MessageType getFirstMessageOnTopic(String topic) throws BagReaderException { for (Connection conn : myConnectionsByTopic.get(topic)) { - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MsgIterator iter = new MsgIterator(myChunkInfos, conn, channel); if (iter.hasNext()) { return iter.next(); @@ -500,7 +506,8 @@ else if (compression.equals("lz4")) { for (ChunkInfo info : myChunkInfos) { chunkPositions.add(info.getChunkPos()); } - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); for (Long chunkPos : chunkPositions) { Chunk chunk = new Chunk(recordAt(channel, chunkPos)); String compression = chunk.getCompression(); @@ -590,7 +597,8 @@ public MessageType getMessageOnTopicAtIndex(String topic, throw new BagReaderException(e); } - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); MessageIndex msgIndex = indexes.get(index); Record record = BagFile.recordAt(channel, msgIndex.fileIndex); ByteBufferChannel chunkChannel = new ByteBufferChannel(record.getData()); @@ -617,7 +625,8 @@ public MessageType getMessageOnTopicAtIndex(String topic, */ private void generateIndexesForTopic(String topic) throws BagReaderException { List msgIndexes = Lists.newArrayList(); - try (SeekableByteChannel channel = getChannel()) { + try (FileChannel channel = getChannel()) { + FileLock lock = channel.lock(0, Long.MAX_VALUE, true); for (Connection conn : myConnectionsByTopic.get(topic)) { for (ChunkInfo chunkInfo : myChunkInfosByConnectionId.get(conn.getConnectionId())) { long chunkPos = chunkInfo.getChunkPos(); @@ -797,7 +806,8 @@ public void read() throws BagReaderException { return; } - try (SeekableByteChannel input = getChannel()){ + try (FileChannel input = getChannel()){ + FileLock lock = input.lock(0, Long.MAX_VALUE, true); verifyBagFile(input); while (hasNext(input)) { @@ -857,6 +867,28 @@ public void read() throws BagReaderException { } else { myLogger.warn("No chunk info records found; start and end time are unknown."); + + myLogger.warn("Record type counts:" + + "\n Header: " + (this.getBagHeader() == null ? 0 : 1) + + "\n Chunk: " + this.getChunks().size() + + "\n Connection: " + this.getConnections().size() + + "\n Message Data: " + this.getMessages().size() + + "\n Index Data: " + this.getIndexes().size() + + "\n Chunk Info: " + this.getChunkInfos().size()); + } + + if (this.getBagHeader().getChunkCount() != + (this.getChunks().size() + this.getChunkInfos().size()) || + this.getBagHeader().getConnCount() != this.getConnections().size()) { + String errorMsg = "Expected " + this.getBagHeader().getChunkCount() + + " chunks and " + this.getBagHeader().getConnCount() + + " connections, but got " + + (this.getChunks().size() + this.getChunkInfos().size()) + " and " + + this.getConnections().size() + ". This is ok if the file " + + "is in the middle of being written to disk, otherwise the bag " + + "may be corrupt."; + myLogger.warn(errorMsg); + throw new BagReaderException(errorMsg); } }