diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 34fb728be95..d6353be9d22 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -166,6 +166,7 @@ public synchronized void initialize() throws IOException { if (blockInfo != null && blockInfo.isUnderConstruction()) { // use the block length from DN if block is under construction. length = blockData.getSize(); + LOG.debug("Updated block length to {} for block {}", length, blockID); } break; // If we get a StorageContainerException or an IOException due to diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java index 4bc144f3bd7..5f00e83e81b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java @@ -34,7 +34,7 @@ public class MultipartInputStream extends ExtendedInputStream { private final String key; - private final long length; + private long length; // List of PartInputStream, one for each part of the key private final List partStreams; @@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream { // can be reset if a new position is seeked. private int prevPartIndex; + private boolean initialized = false; + public MultipartInputStream(String keyName, List inputStreams) { @@ -130,6 +132,9 @@ protected void checkPartBytesRead(int numBytesToRead, int numBytesRead, @Override public synchronized void seek(long pos) throws IOException { checkOpen(); + if (!initialized) { + initialize(); + } if (pos == 0 && length == 0) { // It is possible for length and pos to be zero in which case // seek should return instead of throwing exception @@ -173,6 +178,26 @@ public synchronized void seek(long pos) throws IOException { prevPartIndex = partIndex; } + public synchronized void initialize() throws IOException { + // Pre-check that the stream has not been intialized already + if (initialized) { + return; + } + + for (PartInputStream partInputStream : partStreams) { + if (partInputStream instanceof BlockInputStream) { + ((BlockInputStream) partInputStream).initialize(); + } + } + + long streamLength = 0L; + for (PartInputStream partInputStream : partStreams) { + streamLength += partInputStream.getLength(); + } + this.length = streamLength; + initialized = true; + } + @Override public synchronized long getPos() throws IOException { return length == 0 ? 0 : diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index c39e24571a8..f3b4b66989a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -172,6 +172,7 @@ public class TestHSync { private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; private static final int SERVICE_INTERVAL = 100; private static final int EXPIRE_THRESHOLD_MS = 140; + private static final int WAL_HEADER_LEN = 83; private static OpenKeyCleanupService openKeyCleanupService; @@ -417,6 +418,45 @@ private static String getChunkPathOnDataNode(FSDataOutputStream outputStream) return chunkPath; } + @Test + public void testHSyncSeek() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + final Path key1 = new Path(dir, "key-hsync-seek"); + + final byte[] data = new byte[1024]; + final byte[] buffer = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + + try (FileSystem fs = FileSystem.get(CONF)) { + // Create key1 + try (FSDataOutputStream os = fs.create(key1, true)) { + os.write(data, 0, WAL_HEADER_LEN); + // the first hsync will update the correct length in the key info at OM + os.hsync(); + os.write(data, 0, data.length); + os.hsync(); // the second hsync will not update the length at OM + try (FSDataInputStream in = fs.open(key1)) { + // the actual key length is WAL_HEADER_LEN + 1024, but the length in OM is WAL_HEADER_LEN (83) + in.seek(WAL_HEADER_LEN + 1); + final int n = in.read(buffer, 1, buffer.length - 1); + // expect to read 1023 bytes + assertEquals(buffer.length - 1, n); + for (int i = 1; i < buffer.length; i++) { + assertEquals(data[i], buffer[i], "expected at i=" + i); + } + } + } finally { + fs.delete(key1, false); + } + } + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testO3fsHSync(boolean incrementalChunkList) throws Exception {