From 9a48035613989d95cb3f651c7a200d4be0889e0e Mon Sep 17 00:00:00 2001 From: Istvan Toth Date: Thu, 19 Sep 2024 07:59:57 +0200 Subject: [PATCH] HBASE-28721 AsyncFSWAL is broken when running against hadoop 3.4.0 --- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 15 +-- .../FanOutOneBlockAsyncDFSOutputHelper.java | 97 +++++++++++++++---- 2 files changed, 87 insertions(+), 25 deletions(-) diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 55a2f6c86ae7..2a67d6e27426 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; @@ -121,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private final String src; - private final long fileId; + private HdfsFileStatus stat; private final ExtendedBlock block; @@ -354,14 +355,14 @@ private void setupReceiver(int timeoutMs) { } FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client, - ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock, - Encryptor encryptor, Map datanodeInfoMap, DataChecksum summer, - ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { + ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat, + LocatedBlock locatedBlock, Encryptor encryptor, Map datanodeInfoMap, + DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) { this.conf = conf; this.dfs = dfs; this.client = client; this.namenode = namenode; - this.fileId = fileId; + this.stat = stat; this.clientName = clientName; this.src = src; this.block = locatedBlock.getBlock(); @@ -592,7 +593,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException buf = null; } closeDataNodeChannelsAndAwait(); - endFileLease(client, fileId); + endFileLease(client, stat); RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf, reporter == null ? new CancelOnClose(client) : reporter); } @@ -607,7 +608,7 @@ public void close() throws IOException { state = State.CLOSED; closeDataNodeChannelsAndAwait(); block.setNumBytes(ackedBlockLength); - completeFile(client, namenode, src, clientName, block, fileId); + completeFile(client, namenode, src, clientName, block, stat); } @Override diff --git a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java index 6ff200c4ac3d..669727ea59e4 100644 --- a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java +++ b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java @@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() { private interface LeaseManager { - void begin(DFSClient client, long inodeId); + void begin(DFSClient client, HdfsFileStatus stat); - void end(DFSClient client, long inodeId); + void end(DFSClient client, HdfsFileStatus stat); } private static final LeaseManager LEASE_MANAGER; @@ -202,7 +202,58 @@ public boolean isClientRunning(DFSClient client) { }; } - private static LeaseManager createLeaseManager() throws NoSuchMethodException { + private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException { + Method beginFileLeaseMethod = + DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class); + beginFileLeaseMethod.setAccessible(true); + Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class); + endFileLeaseMethod.setAccessible(true); + Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration"); + getConfigurationMethod.setAccessible(true); + Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace"); + + return new LeaseManager() { + + private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY = + "dfs.client.output.stream.uniq.default.key"; + private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT"; + + private String getUniqId(DFSClient client, HdfsFileStatus stat) + throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + // Copied from DFSClient in Hadoop 3.4.0 + long fileId = stat.getFileId(); + String namespace = (String) getNamespaceMehtod.invoke(stat); + if (namespace == null) { + Configuration conf = (Configuration) getConfigurationMethod.invoke(client); + String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY, + DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT); + return defaultKey + "_" + fileId; + } else { + return namespace + "_" + fileId; + } + } + + @Override + public void begin(DFSClient client, HdfsFileStatus stat) { + try { + beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + + @Override + public void end(DFSClient client, HdfsFileStatus stat) { + try { + endFileLeaseMethod.invoke(client, getUniqId(client, stat)); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + }; + } + + private static LeaseManager createLeaseManager3() throws NoSuchMethodException { Method beginFileLeaseMethod = DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class); beginFileLeaseMethod.setAccessible(true); @@ -211,18 +262,18 @@ private static LeaseManager createLeaseManager() throws NoSuchMethodException { return new LeaseManager() { @Override - public void begin(DFSClient client, long inodeId) { + public void begin(DFSClient client, HdfsFileStatus stat) { try { - beginFileLeaseMethod.invoke(client, inodeId, null); + beginFileLeaseMethod.invoke(client, stat.getFileId(), null); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } @Override - public void end(DFSClient client, long inodeId) { + public void end(DFSClient client, HdfsFileStatus stat) { try { - endFileLeaseMethod.invoke(client, inodeId); + endFileLeaseMethod.invoke(client, stat.getFileId()); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } @@ -230,6 +281,16 @@ public void end(DFSClient client, long inodeId) { }; } + private static LeaseManager createLeaseManager() throws NoSuchMethodException { + try { + return createLeaseManager3_4(); + } catch (NoSuchMethodException e) { + LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below"); + } + + return createLeaseManager3(); + } + private static FileCreator createFileCreator3_3() throws NoSuchMethodException { Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class, String.class, EnumSetWritable.class, boolean.class, short.class, long.class, @@ -320,12 +381,12 @@ public boolean progress() { } } - static void beginFileLease(DFSClient client, long inodeId) { - LEASE_MANAGER.begin(client, inodeId); + static void beginFileLease(DFSClient client, HdfsFileStatus stat) { + LEASE_MANAGER.begin(client, stat); } - static void endFileLease(DFSClient client, long inodeId) { - LEASE_MANAGER.end(client, inodeId); + static void endFileLease(DFSClient client, HdfsFileStatus stat) { + LEASE_MANAGER.end(client, stat); } static DataChecksum createChecksum(DFSClient client) { @@ -552,7 +613,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d throw new NameNodeException(e); } } - beginFileLease(client, stat.getFileId()); + beginFileLease(client, stat); boolean succ = false; LocatedBlock locatedBlock = null; List> futureList = null; @@ -576,8 +637,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d } Encryptor encryptor = createEncryptor(conf, stat, client); FanOutOneBlockAsyncDFSOutput output = - new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, - stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); + new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat, + locatedBlock, encryptor, datanodes, summer, ALLOC, monitor); succ = true; return output; } catch (RemoteException e) { @@ -616,7 +677,7 @@ public void operationComplete(Future future) throws Exception { }); } } - endFileLease(client, stat.getFileId()); + endFileLease(client, stat); } } } @@ -654,11 +715,11 @@ public static boolean shouldRetryCreate(RemoteException e) { } static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName, - ExtendedBlock block, long fileId) { + ExtendedBlock block, HdfsFileStatus stat) { for (int retry = 0;; retry++) { try { - if (namenode.complete(src, clientName, block, fileId)) { - endFileLease(client, fileId); + if (namenode.complete(src, clientName, block, stat.getFileId())) { + endFileLease(client, stat); return; } else { LOG.warn("complete file " + src + " not finished, retry = " + retry);