From 3d1c6bdd4db3daf0d3a6fa0f00bcf76ba4d2e35f Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Thu, 29 Feb 2024 16:55:17 -0500 Subject: [PATCH 1/5] HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation --- .../hadoop/hbase/regionserver/wal/ProtobufLogWriter.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 212788c940ed..5654b4ca3f22 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -105,8 +105,13 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwritable) .bufferSize(bufferSize).replication(replication).blockSize(blockSize); if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { - this.output = - ((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build(); + DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = + (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; + if (fs.getConf().getBoolean("hbase.regionserver.wal.avoid-local-writes", false)) { + dfsBuilder.replicate(); + } + dfsBuilder.noLocalWrite(); + this.output = dfsBuilder.build(); } else { this.output = builder.build(); } From 0250652b728e7732240e48561fb1a000930bbaf0 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Thu, 29 Feb 2024 21:02:45 -0500 Subject: [PATCH 2/5] fix logic --- .../hadoop/hbase/regionserver/wal/ProtobufLogWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 5654b4ca3f22..a5af2457d7ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -107,10 +107,10 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) { DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; + dfsBuilder.replicate(); if (fs.getConf().getBoolean("hbase.regionserver.wal.avoid-local-writes", false)) { - dfsBuilder.replicate(); + dfsBuilder.noLocalWrite(); } - dfsBuilder.noLocalWrite(); this.output = dfsBuilder.build(); } else { this.output = builder.build(); From 9b8c1d84da4acedd6b8b5fdbf3efeb39e9514ee8 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Tue, 5 Mar 2024 20:56:37 -0500 Subject: [PATCH 3/5] thread avoid-local-writes into async-fs --- .../hbase/io/asyncfs/AsyncFSOutputHelper.java | 5 +++-- .../FanOutOneBlockAsyncDFSOutputHelper.java | 16 ++++++++++------ .../TestFanOutOneBlockAsyncDFSOutput.java | 17 +++++++++-------- .../TestFanOutOneBlockAsyncDFSOutputHang.java | 2 +- .../hbase/io/asyncfs/TestLocalAsyncOutput.java | 2 +- .../TestSaslFanOutOneBlockAsyncDFSOutput.java | 2 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 4 ++++ .../wal/AsyncProtobufLogWriter.java | 6 +++++- .../regionserver/wal/ProtobufLogWriter.java | 5 ++++- 9 files changed, 38 insertions(+), 21 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index a530ca4a2a0d..2a58e10ecfc3 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -48,11 +48,12 @@ private AsyncFSOutputHelper() { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class channelClass, StreamSlowMonitor monitor) + Class channelClass, StreamSlowMonitor monitor, boolean forWAL) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, - overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor); + overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, + forWAL); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 98590173ed2a..d4a71a77a79d 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -445,20 +445,24 @@ public NameNodeException(Throwable cause) { } } - private static EnumSetWritable getCreateFlags(boolean overwrite) { + private static EnumSetWritable getCreateFlags(boolean overwrite, + boolean noLocalWrite) { List flags = new ArrayList<>(); flags.add(CreateFlag.CREATE); if (overwrite) { flags.add(CreateFlag.OVERWRITE); } + if (noLocalWrite) { + flags.add(CreateFlag.NO_LOCAL_WRITE); + } flags.add(CreateFlag.SHOULD_REPLICATE); return new EnumSetWritable<>(EnumSet.copyOf(flags)); } private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src, boolean overwrite, boolean createParent, short replication, long blockSize, - EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor) - throws IOException { + EventLoopGroup eventLoopGroup, Class channelClass, StreamSlowMonitor monitor, + boolean noLocalWrite) throws IOException { Configuration conf = dfs.getConf(); DFSClient client = dfs.getClient(); String clientName = client.getClientName(); @@ -475,7 +479,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d try { stat = FILE_CREATOR.create(namenode, src, FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName, - getCreateFlags(overwrite), createParent, replication, blockSize, + getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize, CryptoProtocolVersion.supported()); } catch (Exception e) { if (e instanceof RemoteException) { @@ -561,14 +565,14 @@ public void operationComplete(Future future) throws Exception { public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, Class channelClass, - final StreamSlowMonitor monitor) throws IOException { + final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException { return new FileSystemLinkResolver() { @Override public FanOutOneBlockAsyncDFSOutput doCall(Path p) throws IOException, UnresolvedLinkException { return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); } @Override diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 68b8bfa3d9f3..f0910684eddf 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -141,7 +141,7 @@ public void test() throws IOException, InterruptedException, ExecutionException Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); writeAndVerify(FS, f, out); } @@ -154,7 +154,7 @@ public void test0Recover() throws IOException, InterruptedException, ExecutionEx Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[10]; Bytes.random(b); out.write(b, 0, b.length); @@ -183,7 +183,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); Thread.sleep(READ_TIMEOUT_MS * 2); // the connection to datanode should still alive. writeAndVerify(FS, f, out); @@ -198,7 +198,7 @@ public void testCreateParentFailed() throws IOException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); try { FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); fail("should fail with parent does not exist"); } catch (RemoteException e) { LOG.info("expected exception caught", e); @@ -220,8 +220,9 @@ public void testConnectToDatanodeFailed() DataNodeProperties dnProp = CLUSTER.stopDataNode(0); Path f = new Path("/test"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); - try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, - f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) { + try (FanOutOneBlockAsyncDFSOutput output = + FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); } finally { @@ -251,7 +252,7 @@ public void testExcludeFailedConnectToDatanode() assertEquals(0, excludeDatanodeManager.getExcludeDNs().size()); try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) { + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) { // should exclude the dead dn when retry so here we only have 2 DNs in pipeline assertEquals(2, output.getPipeline().length); assertEquals(1, excludeDatanodeManager.getExcludeDNs().size()); @@ -266,7 +267,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, - false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR); + false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true); byte[] b = new byte[50 * 1024 * 1024]; Bytes.random(b); out.write(b); diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java index 77752789dbb3..7f6535a93a93 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutputHang.java @@ -98,7 +98,7 @@ public static void setUp() throws Exception { Path f = new Path("/testHang"); EventLoop eventLoop = EVENT_LOOP_GROUP.next(); OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2, - FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); } @AfterClass diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java index d1ce128b118d..4171b60c5b82 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLocalAsyncOutput.java @@ -65,7 +65,7 @@ public void test() throws IOException, InterruptedException, ExecutionException, Path f = new Path(TEST_UTIL.getDataTestDir(), "test"); FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration()); AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true, - fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR); + fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out); } } diff --git a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java index 479b8f4e6034..99048ff2bed1 100644 --- a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestSaslFanOutOneBlockAsyncDFSOutput.java @@ -255,7 +255,7 @@ private Path getEncryptionTestFile() { private void test(Path file) throws IOException, InterruptedException, ExecutionException { EventLoop eventLoop = EVENT_LOOP_GROUP.next(); FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file, - true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR); + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true); TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index ef25068512f0..a94d827e8e2d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -179,6 +179,10 @@ public abstract class AbstractFSWAL implements WAL { public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; + public static final String WAL_AVOID_LOCAL_WRITES_KEY = + "hbase.regionserver.wal.avoid-local-writes"; + public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false; + /** * file system instance */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index e50a02f6f80d..3816008b8cae 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; @@ -180,8 +182,10 @@ public AsyncFSOutput getOutput() { protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize, StreamSlowMonitor monitor) throws IOException, StreamLacksCapabilityException { + boolean noLocalWrite = + fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoopGroup, channelClass, monitor); + blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); this.asyncOutputWrapper = new OutputStreamWrapper(output); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index a5af2457d7ef..9a4c2c6397d4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; + import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicLong; @@ -108,7 +111,7 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; dfsBuilder.replicate(); - if (fs.getConf().getBoolean("hbase.regionserver.wal.avoid-local-writes", false)) { + if (fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT)) { dfsBuilder.noLocalWrite(); } this.output = dfsBuilder.build(); From a2bedafa28ab63db13abf32a84c9fc4c99b31880 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 6 Mar 2024 08:54:04 -0500 Subject: [PATCH 4/5] variable name --- .../apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java index 2a58e10ecfc3..cbb0648f3afb 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/AsyncFSOutputHelper.java @@ -48,12 +48,12 @@ private AsyncFSOutputHelper() { */ public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup, - Class channelClass, StreamSlowMonitor monitor, boolean forWAL) + Class channelClass, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, CommonFSUtils.StreamLacksCapabilityException { if (fs instanceof DistributedFileSystem) { return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f, overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor, - forWAL); + noLocalWrite); } final FSDataOutputStream out; int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, From edce0ff08e141a9834403f16b6a68f23efb04214 Mon Sep 17 00:00:00 2001 From: Charles Connell Date: Wed, 6 Mar 2024 09:38:19 -0500 Subject: [PATCH 5/5] move noLocalWrite lookup --- .../hbase/regionserver/wal/AbstractProtobufLogWriter.java | 8 ++++++-- .../hbase/regionserver/wal/AsyncProtobufLogWriter.java | 6 +----- .../hadoop/hbase/regionserver/wal/ProtobufLogWriter.java | 7 ++----- 3 files changed, 9 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java index 890fb4e444c7..e6463c563a05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; +import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.DEFAULT_WAL_TRAILER_WARN_SIZE; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_COMPLETE_MAGIC; import static org.apache.hadoop.hbase.regionserver.wal.AbstractProtobufWALReader.PB_WAL_MAGIC; @@ -163,8 +165,10 @@ public void init(FileSystem fs, Path path, Configuration conf, boolean overwrita int bufferSize = CommonFSUtils.getDefaultBufferSize(fs); short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", CommonFSUtils.getDefaultReplication(fs, path)); + boolean noLocalWrite = + conf.getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); - initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor); + initOutput(fs, path, overwritable, bufferSize, replication, blocksize, monitor, noLocalWrite); boolean doTagCompress = doCompress && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); @@ -253,7 +257,7 @@ protected final void writeWALTrailer() { } protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException; /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index 3816008b8cae..f10f39222722 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,8 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; -import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; @@ -180,10 +178,8 @@ public AsyncFSOutput getOutput() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { - boolean noLocalWrite = - fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT); this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite); this.asyncOutputWrapper = new OutputStreamWrapper(output); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java index 9a4c2c6397d4..52317949cc83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/ProtobufLogWriter.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT; -import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY; - import java.io.IOException; import java.io.OutputStream; import java.util.concurrent.atomic.AtomicLong; @@ -103,7 +100,7 @@ public FSDataOutputStream getStream() { @Override protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, - short replication, long blockSize, StreamSlowMonitor monitor) + short replication, long blockSize, StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException, StreamLacksCapabilityException { FSDataOutputStreamBuilder builder = fs.createFile(path).overwrite(overwritable) .bufferSize(bufferSize).replication(replication).blockSize(blockSize); @@ -111,7 +108,7 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder = (DistributedFileSystem.HdfsDataOutputStreamBuilder) builder; dfsBuilder.replicate(); - if (fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT)) { + if (noLocalWrite) { dfsBuilder.noLocalWrite(); } this.output = dfsBuilder.build();