Skip to content

Commit

Permalink
HBASE-28721 AsyncFSWAL is broken when running against hadoop 3.4.0 (#…
Browse files Browse the repository at this point in the history
…6270)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit c18921a)
  • Loading branch information
stoty committed Oct 9, 2024
1 parent 22f80ff commit e56a8fc
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
Expand Down Expand Up @@ -121,7 +122,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {

private final String src;

private final long fileId;
private HdfsFileStatus stat;

private final ExtendedBlock block;

Expand Down Expand Up @@ -354,14 +355,14 @@ private void setupReceiver(int timeoutMs) {
}

FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, DFSClient client,
ClientProtocol namenode, String clientName, String src, long fileId, LocatedBlock locatedBlock,
Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer,
ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
ClientProtocol namenode, String clientName, String src, HdfsFileStatus stat,
LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> datanodeInfoMap,
DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
this.namenode = namenode;
this.fileId = fileId;
this.stat = stat;
this.clientName = clientName;
this.src = src;
this.block = locatedBlock.getBlock();
Expand Down Expand Up @@ -592,7 +593,7 @@ public void recoverAndClose(CancelableProgressable reporter) throws IOException
buf = null;
}
closeDataNodeChannelsAndAwait();
endFileLease(client, fileId);
endFileLease(client, stat);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
Expand All @@ -607,7 +608,7 @@ public void close() throws IOException {
state = State.CLOSED;
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
completeFile(client, namenode, src, clientName, block, fileId);
completeFile(client, namenode, src, clientName, block, stat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,9 @@ private FanOutOneBlockAsyncDFSOutputHelper() {

private interface LeaseManager {

void begin(DFSClient client, long inodeId);
void begin(DFSClient client, HdfsFileStatus stat);

void end(DFSClient client, long inodeId);
void end(DFSClient client, HdfsFileStatus stat);
}

private static final LeaseManager LEASE_MANAGER;
Expand Down Expand Up @@ -202,7 +202,58 @@ public boolean isClientRunning(DFSClient client) {
};
}

private static LeaseManager createLeaseManager() throws NoSuchMethodException {
private static LeaseManager createLeaseManager3_4() throws NoSuchMethodException {
Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", String.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
Method getConfigurationMethod = DFSClient.class.getDeclaredMethod("getConfiguration");
getConfigurationMethod.setAccessible(true);
Method getNamespaceMehtod = HdfsFileStatus.class.getDeclaredMethod("getNamespace");

return new LeaseManager() {

private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
"dfs.client.output.stream.uniq.default.key";
private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = "DEFAULT";

private String getUniqId(DFSClient client, HdfsFileStatus stat)
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
// Copied from DFSClient in Hadoop 3.4.0
long fileId = stat.getFileId();
String namespace = (String) getNamespaceMehtod.invoke(stat);
if (namespace == null) {
Configuration conf = (Configuration) getConfigurationMethod.invoke(client);
String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
return defaultKey + "_" + fileId;
} else {
return namespace + "_" + fileId;
}
}

@Override
public void begin(DFSClient client, HdfsFileStatus stat) {
try {
beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, HdfsFileStatus stat) {
try {
endFileLeaseMethod.invoke(client, getUniqId(client, stat));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}

private static LeaseManager createLeaseManager3() throws NoSuchMethodException {
Method beginFileLeaseMethod =
DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
beginFileLeaseMethod.setAccessible(true);
Expand All @@ -211,25 +262,35 @@ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
return new LeaseManager() {

@Override
public void begin(DFSClient client, long inodeId) {
public void begin(DFSClient client, HdfsFileStatus stat) {
try {
beginFileLeaseMethod.invoke(client, inodeId, null);
beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

@Override
public void end(DFSClient client, long inodeId) {
public void end(DFSClient client, HdfsFileStatus stat) {
try {
endFileLeaseMethod.invoke(client, inodeId);
endFileLeaseMethod.invoke(client, stat.getFileId());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
};
}

private static LeaseManager createLeaseManager() throws NoSuchMethodException {
try {
return createLeaseManager3_4();
} catch (NoSuchMethodException e) {
LOG.debug("DFSClient::beginFileLease wrong arguments, should be hadoop 3.3 or below");
}

return createLeaseManager3();
}

private static FileCreator createFileCreator3_3() throws NoSuchMethodException {
Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
Expand Down Expand Up @@ -320,12 +381,12 @@ public boolean progress() {
}
}

static void beginFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.begin(client, inodeId);
static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.begin(client, stat);
}

static void endFileLease(DFSClient client, long inodeId) {
LEASE_MANAGER.end(client, inodeId);
static void endFileLease(DFSClient client, HdfsFileStatus stat) {
LEASE_MANAGER.end(client, stat);
}

static DataChecksum createChecksum(DFSClient client) {
Expand Down Expand Up @@ -552,7 +613,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
throw new NameNodeException(e);
}
}
beginFileLease(client, stat.getFileId());
beginFileLease(client, stat);
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
Expand All @@ -576,8 +637,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
}
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src,
stat.getFileId(), locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, clientName, src, stat,
locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
succ = true;
return output;
} catch (RemoteException e) {
Expand Down Expand Up @@ -616,7 +677,7 @@ public void operationComplete(Future<Channel> future) throws Exception {
});
}
}
endFileLease(client, stat.getFileId());
endFileLease(client, stat);
}
}
}
Expand Down Expand Up @@ -654,11 +715,11 @@ public static boolean shouldRetryCreate(RemoteException e) {
}

static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
ExtendedBlock block, long fileId) {
ExtendedBlock block, HdfsFileStatus stat) {
for (int retry = 0;; retry++) {
try {
if (namenode.complete(src, clientName, block, fileId)) {
endFileLease(client, fileId);
if (namenode.complete(src, clientName, block, stat.getFileId())) {
endFileLease(client, stat);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);
Expand Down

0 comments on commit e56a8fc

Please sign in to comment.