From 39d0220316ca0079f0be267b9d2b67975b325cb4 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Thu, 19 Sep 2024 10:47:16 -0700 Subject: [PATCH 1/6] HDDS-11220. Reproduction test case. Change-Id: I832307a171da0d8e59a7afcf1617577c4c657e43 --- .../org/apache/hadoop/fs/ozone/TestHSync.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index c39e24571a8..63b1306f5d1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -417,6 +417,44 @@ private static String getChunkPathOnDataNode(FSDataOutputStream outputStream) return chunkPath; } + @Test + public void testHSyncSeek() throws Exception { + // Set the fs.defaultFS + final String rootPath = String.format("%s://%s.%s/", + OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName()); + CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath); + + final String dir = OZONE_ROOT + bucket.getVolumeName() + + OZONE_URI_DELIMITER + bucket.getName(); + final Path key1 = new Path(dir, "key-hsync-seek"); + + final byte[] data = new byte[1024]; + final byte[] buffer = new byte[1024]; + ThreadLocalRandom.current().nextBytes(data); + try (FileSystem fs = FileSystem.get(CONF)) { + // Create key1 + try (FSDataOutputStream os = fs.create(key1, true)) { + os.write(data, 0, 1); + // the first hsync will update the correct length in the key info at OM + os.hsync(); + os.write(data, 0, data.length); + os.hsync(); // the second hsync will not update the length at OM + try (FSDataInputStream in = fs.open(key1)) { + // the actual key length is 1025, but the length in OM is 1 + in.seek(2); + final int n = in.read(buffer, 1, buffer.length-1); + // expect to read 1023 bytes + assertEquals(buffer.length - 1, n); + for (int i = 1; i < buffer.length; i++) { + assertEquals(data[i], buffer[i], "expected at i=" + i); + } + } + } finally { + fs.delete(key1, false); + } + } + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testO3fsHSync(boolean incrementalChunkList) throws Exception { From b0f41289b3d7be70b3651d4e066763c0a27294af Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Thu, 19 Sep 2024 13:18:17 -0700 Subject: [PATCH 2/6] Initial fix. Change-Id: Ic255aa69f731974878c2ab2d9bb7831a7f9cabc0 --- .../scm/storage/MultipartInputStream.java | 29 ++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java index 4bc144f3bd7..d76de42fb58 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java @@ -34,7 +34,7 @@ public class MultipartInputStream extends ExtendedInputStream { private final String key; - private final long length; + private long length; // List of PartInputStream, one for each part of the key private final List partStreams; @@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream { // can be reset if a new position is seeked. private int prevPartIndex; + private boolean initialized = false; + public MultipartInputStream(String keyName, List inputStreams) { @@ -130,6 +132,9 @@ protected void checkPartBytesRead(int numBytesToRead, int numBytesRead, @Override public synchronized void seek(long pos) throws IOException { checkOpen(); + if (!initialized) { + initialize(); + } if (pos == 0 && length == 0) { // It is possible for length and pos to be zero in which case // seek should return instead of throwing exception @@ -173,6 +178,28 @@ public synchronized void seek(long pos) throws IOException { prevPartIndex = partIndex; } + public synchronized void initialize() throws IOException { + + // Pre-check that the stream has not been intialized already + if (initialized) { + return; + } + + for (PartInputStream partInputStream : partStreams) { + if (partInputStream instanceof BlockInputStream) { + ((BlockInputStream) partInputStream).initialize(); + } + } + + long streamLength = 0L; + for (PartInputStream partInputStream : partStreams) { + //this.partOffsets[i++] = streamLength; + streamLength += partInputStream.getLength(); + } + this.length = streamLength; + initialized = true; + } + @Override public synchronized long getPos() throws IOException { return length == 0 ? 0 : From 9131a6e6434bb49af428bb6104116d896aef4ae4 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 20 Sep 2024 12:53:53 -0700 Subject: [PATCH 3/6] debug logs Change-Id: I80f3978d9afad982aafecaf2c3b90c1732d69d29 --- .../org/apache/hadoop/hdds/scm/storage/BlockInputStream.java | 4 ++++ .../src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 34fb728be95..43036ecb82b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -163,9 +163,13 @@ public synchronized void initialize() throws IOException { try { blockData = getBlockData(); chunks = blockData.getChunksList(); + LOG.debug("Block {} has block data {}", blockID, blockData); if (blockInfo != null && blockInfo.isUnderConstruction()) { // use the block length from DN if block is under construction. length = blockData.getSize(); + LOG.debug("Updated block length to {} for block {}", length, blockID); + } else { + LOG.debug("blockInfo = {}", blockInfo); } break; // If we get a StorageContainerException or an IOException due to diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 63b1306f5d1..116f1cc2e46 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -431,17 +431,18 @@ public void testHSyncSeek() throws Exception { final byte[] data = new byte[1024]; final byte[] buffer = new byte[1024]; ThreadLocalRandom.current().nextBytes(data); + final int WAL_HEADER_LEN = 83; try (FileSystem fs = FileSystem.get(CONF)) { // Create key1 try (FSDataOutputStream os = fs.create(key1, true)) { - os.write(data, 0, 1); + os.write(data, 0, WAL_HEADER_LEN); // the first hsync will update the correct length in the key info at OM os.hsync(); os.write(data, 0, data.length); os.hsync(); // the second hsync will not update the length at OM try (FSDataInputStream in = fs.open(key1)) { // the actual key length is 1025, but the length in OM is 1 - in.seek(2); + in.seek(WAL_HEADER_LEN+1); final int n = in.read(buffer, 1, buffer.length-1); // expect to read 1023 bytes assertEquals(buffer.length - 1, n); From 797ccdc95cb55c1319391295576ea5f0c024dd68 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 23 Oct 2024 16:31:03 -0700 Subject: [PATCH 4/6] Remove redundant code. Change-Id: Ifd7d8f113d53b53fdac035d7b8a78fd8ac44ebb8 --- .../apache/hadoop/hdds/scm/storage/MultipartInputStream.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java index d76de42fb58..5f00e83e81b 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/MultipartInputStream.java @@ -179,7 +179,6 @@ public synchronized void seek(long pos) throws IOException { } public synchronized void initialize() throws IOException { - // Pre-check that the stream has not been intialized already if (initialized) { return; @@ -193,7 +192,6 @@ public synchronized void initialize() throws IOException { long streamLength = 0L; for (PartInputStream partInputStream : partStreams) { - //this.partOffsets[i++] = streamLength; streamLength += partInputStream.getLength(); } this.length = streamLength; From 8ad3cbc0e4fb17a592d83ef64b1ba48c45786af9 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Wed, 23 Oct 2024 16:36:47 -0700 Subject: [PATCH 5/6] Fix checkstyle Change-Id: Ia52ad49c616bc85eaf9433c158d5eda62c47aecc --- .../test/java/org/apache/hadoop/fs/ozone/TestHSync.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 116f1cc2e46..975b544772c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -172,6 +172,7 @@ public class TestHSync { private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE; private static final int SERVICE_INTERVAL = 100; private static final int EXPIRE_THRESHOLD_MS = 140; + private static final int WAL_HEADER_LEN = 83; private static OpenKeyCleanupService openKeyCleanupService; @@ -431,7 +432,7 @@ public void testHSyncSeek() throws Exception { final byte[] data = new byte[1024]; final byte[] buffer = new byte[1024]; ThreadLocalRandom.current().nextBytes(data); - final int WAL_HEADER_LEN = 83; + try (FileSystem fs = FileSystem.get(CONF)) { // Create key1 try (FSDataOutputStream os = fs.create(key1, true)) { @@ -442,8 +443,8 @@ public void testHSyncSeek() throws Exception { os.hsync(); // the second hsync will not update the length at OM try (FSDataInputStream in = fs.open(key1)) { // the actual key length is 1025, but the length in OM is 1 - in.seek(WAL_HEADER_LEN+1); - final int n = in.read(buffer, 1, buffer.length-1); + in.seek(WAL_HEADER_LEN + 1); + final int n = in.read(buffer, 1, buffer.length - 1); // expect to read 1023 bytes assertEquals(buffer.length - 1, n); for (int i = 1; i < buffer.length; i++) { From e91a15527f0f371f6bfc4496ed94cabd0e73cb00 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 25 Oct 2024 10:41:37 -0700 Subject: [PATCH 6/6] Update per review comments. Change-Id: I3c7728a619d68541fa2a367b9ba79680fc559cfb --- .../org/apache/hadoop/hdds/scm/storage/BlockInputStream.java | 3 --- .../src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java index 43036ecb82b..d6353be9d22 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java @@ -163,13 +163,10 @@ public synchronized void initialize() throws IOException { try { blockData = getBlockData(); chunks = blockData.getChunksList(); - LOG.debug("Block {} has block data {}", blockID, blockData); if (blockInfo != null && blockInfo.isUnderConstruction()) { // use the block length from DN if block is under construction. length = blockData.getSize(); LOG.debug("Updated block length to {} for block {}", length, blockID); - } else { - LOG.debug("blockInfo = {}", blockInfo); } break; // If we get a StorageContainerException or an IOException due to diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java index 975b544772c..f3b4b66989a 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestHSync.java @@ -442,7 +442,7 @@ public void testHSyncSeek() throws Exception { os.write(data, 0, data.length); os.hsync(); // the second hsync will not update the length at OM try (FSDataInputStream in = fs.open(key1)) { - // the actual key length is 1025, but the length in OM is 1 + // the actual key length is WAL_HEADER_LEN + 1024, but the length in OM is WAL_HEADER_LEN (83) in.seek(WAL_HEADER_LEN + 1); final int n = in.read(buffer, 1, buffer.length - 1); // expect to read 1023 bytes