Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-28260: Add NO_WRITE_LOCAL flag to WAL file creation #5733

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ private AsyncFSOutputHelper() {
*/
public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite,
boolean createParent, short replication, long blockSize, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
Class<? extends Channel> channelClass, StreamSlowMonitor monitor, boolean noLocalWrite)
throws IOException, CommonFSUtils.StreamLacksCapabilityException {
if (fs instanceof DistributedFileSystem) {
return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, f,
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor);
overwrite, createParent, replication, blockSize, eventLoopGroup, channelClass, monitor,
noLocalWrite);
}
final FSDataOutputStream out;
int bufferSize = fs.getConf().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,24 @@ public NameNodeException(Throwable cause) {
}
}

private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite,
boolean noLocalWrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
}
if (noLocalWrite) {
flags.add(CreateFlag.NO_LOCAL_WRITE);
}
flags.add(CreateFlag.SHOULD_REPLICATE);
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor)
throws IOException {
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass, StreamSlowMonitor monitor,
boolean noLocalWrite) throws IOException {
Configuration conf = dfs.getConf();
DFSClient client = dfs.getClient();
String clientName = client.getClientName();
Expand All @@ -475,7 +479,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
getCreateFlags(overwrite), createParent, replication, blockSize,
getCreateFlags(overwrite, noLocalWrite), createParent, replication, blockSize,
CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
Expand Down Expand Up @@ -561,14 +565,14 @@ public void operationComplete(Future<Channel> future) throws Exception {
public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass,
final StreamSlowMonitor monitor) throws IOException {
final StreamSlowMonitor monitor, boolean noLocalWrite) throws IOException {
return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {

@Override
public FanOutOneBlockAsyncDFSOutput doCall(Path p)
throws IOException, UnresolvedLinkException {
return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
blockSize, eventLoopGroup, channelClass, monitor);
blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void test() throws IOException, InterruptedException, ExecutionException
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
writeAndVerify(FS, f, out);
}

Expand All @@ -154,7 +154,7 @@ public void test0Recover() throws IOException, InterruptedException, ExecutionEx
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[10];
Bytes.random(b);
out.write(b, 0, b.length);
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testHeartbeat() throws IOException, InterruptedException, ExecutionE
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
Thread.sleep(READ_TIMEOUT_MS * 2);
// the connection to datanode should still alive.
writeAndVerify(FS, f, out);
Expand All @@ -198,7 +198,7 @@ public void testCreateParentFailed() throws IOException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try {
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
fail("should fail with parent does not exist");
} catch (RemoteException e) {
LOG.info("expected exception caught", e);
Expand All @@ -220,8 +220,9 @@ public void testConnectToDatanodeFailed()
DataNodeProperties dnProp = CLUSTER.stopDataNode(0);
Path f = new Path("/test");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
try (FanOutOneBlockAsyncDFSOutput output = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
f, true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR)) {
try (FanOutOneBlockAsyncDFSOutput output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
} finally {
Expand Down Expand Up @@ -251,7 +252,7 @@ public void testExcludeFailedConnectToDatanode()
assertEquals(0, excludeDatanodeManager.getExcludeDNs().size());
try (FanOutOneBlockAsyncDFSOutput output =
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 3,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor)) {
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, streamSlowDNsMonitor, true)) {
// should exclude the dead dn when retry so here we only have 2 DNs in pipeline
assertEquals(2, output.getPipeline().length);
assertEquals(1, excludeDatanodeManager.getExcludeDNs().size());
Expand All @@ -266,7 +267,7 @@ public void testWriteLargeChunk() throws IOException, InterruptedException, Exec
Path f = new Path("/" + name.getMethodName());
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true,
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR);
false, (short) 3, 1024 * 1024 * 1024, eventLoop, CHANNEL_CLASS, MONITOR, true);
byte[] b = new byte[50 * 1024 * 1024];
Bytes.random(b);
out.write(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public static void setUp() throws Exception {
Path f = new Path("/testHang");
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
OUT = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, true, false, (short) 2,
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
}

@AfterClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void test() throws IOException, InterruptedException, ExecutionException,
Path f = new Path(TEST_UTIL.getDataTestDir(), "test");
FileSystem fs = FileSystem.getLocal(TEST_UTIL.getConfiguration());
AsyncFSOutput out = AsyncFSOutputHelper.createOutput(fs, f, false, true,
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR);
fs.getDefaultReplication(f), fs.getDefaultBlockSize(f), GROUP, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(fs, f, out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ private Path getEncryptionTestFile() {
private void test(Path file) throws IOException, InterruptedException, ExecutionException {
EventLoop eventLoop = EVENT_LOOP_GROUP.next();
FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, file,
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR);
true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, MONITOR, true);
TestFanOutOneBlockAsyncDFSOutput.writeAndVerify(FS, file, out);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size";
public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024;

public static final String WAL_AVOID_LOCAL_WRITES_KEY =
"hbase.regionserver.wal.avoid-local-writes";
public static final boolean WAL_AVOID_LOCAL_WRITES_DEFAULT = false;

/**
* file system instance
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;

import java.io.IOException;
Expand Down Expand Up @@ -180,8 +182,10 @@ public AsyncFSOutput getOutput() {
protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize,
short replication, long blockSize, StreamSlowMonitor monitor)
throws IOException, StreamLacksCapabilityException {
boolean noLocalWrite =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add this as a parameter of the initOutput method? So we do not need to write the similar code twice in AsyncProtobufLogWriter and ProtobufLogWriter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT);
this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication,
blockSize, eventLoopGroup, channelClass, monitor);
blockSize, eventLoopGroup, channelClass, monitor, noLocalWrite);
this.asyncOutputWrapper = new OutputStreamWrapper(output);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.regionserver.wal;

import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_DEFAULT;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL.WAL_AVOID_LOCAL_WRITES_KEY;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -105,8 +108,13 @@ protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bu
FSDataOutputStreamBuilder<?, ?> builder = fs.createFile(path).overwrite(overwritable)
.bufferSize(bufferSize).replication(replication).blockSize(blockSize);
if (builder instanceof DistributedFileSystem.HdfsDataOutputStreamBuilder) {
this.output =
((DistributedFileSystem.HdfsDataOutputStreamBuilder) builder).replicate().build();
DistributedFileSystem.HdfsDataOutputStreamBuilder dfsBuilder =
(DistributedFileSystem.HdfsDataOutputStreamBuilder) builder;
dfsBuilder.replicate();
if (fs.getConf().getBoolean(WAL_AVOID_LOCAL_WRITES_KEY, WAL_AVOID_LOCAL_WRITES_DEFAULT)) {
dfsBuilder.noLocalWrite();
}
this.output = dfsBuilder.build();
} else {
this.output = builder.build();
}
Expand Down