From 318a28121a52cd8ba934c2cc57e425cac89c32a0 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Mon, 1 Jul 2024 16:28:06 +0530 Subject: [PATCH 1/8] HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer --- .../protocol/InterQJournalProtocol.java | 10 +++++ ...JournalProtocolServerSideTranslatorPB.java | 16 ++++++++ .../InterQJournalProtocolTranslatorPB.java | 14 +++++++ .../qjournal/server/JournalNodeRpcServer.java | 28 ++++++++----- .../qjournal/server/JournalNodeSyncer.java | 40 ++++++++++++++++++- .../main/proto/InterQJournalProtocol.proto | 7 ++++ 6 files changed, 104 insertions(+), 11 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java index f1f7e9ce1ff47..c3eed14c3b662 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.security.KerberosInfo; @@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal( String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException; + /** + * Get the storage info for the specified journal. + * @param jid the journal identifier + * @param nameServiceId the name service id + * @return the storage info object + */ + StorageInfoProto getStorageInfo(String jid, String nameServiceId) + throws IOException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java index ba5ddb1ab6770..ac67bcb0cbd17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java @@ -24,6 +24,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; @@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal( throw new ServiceException(e); } } + + @Override + public StorageInfoProto getStorageInfo( + RpcController controller, GetStorageInfoRequestProto request) + throws ServiceException { + try { + return impl.getStorageInfo( + request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null + ); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java index 4544308fff2fc..49ae53fceebc6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.qjournal.protocolPB; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; +import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal( req.build())); } + @Override + public StorageInfoProto getStorageInfo(String jid, String nameServiceId) + throws IOException { + InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req = + InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder() + .setJid(convertJournalId(jid)); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build())); + } + private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) { return QJournalProtocolProtos.JournalIdProto.newBuilder() .setIdentifier(jid) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 7e33ab5c759f5..92e53880a7769 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.qjournal.server; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto; import org.apache.hadoop.thirdparty.protobuf.BlockingService; import org.slf4j.Logger; import org.apache.hadoop.classification.InterfaceAudience; @@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol, JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException { this.jn = jn; - + Configuration confCopy = new Configuration(conf); - + // Ensure that nagling doesn't kick in, which could cause latency issues. confCopy.setBoolean( CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, true); - + InetSocketAddress addr = getAddress(confCopy); String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null); if (bindHost == null) { @@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol, this.handlerCount = confHandlerCount; LOG.info("The number of JournalNodeRpcServer handlers is {}.", this.handlerCount); - + this.server = new RPC.Builder(confCopy) .setProtocol(QJournalProtocolPB.class) .setInstance(service) @@ -149,15 +150,15 @@ void start() { public InetSocketAddress getAddress() { return server.getListenerAddress(); } - + void join() throws InterruptedException { this.server.join(); } - + void stop() { this.server.stop(); } - + static InetSocketAddress getAddress(Configuration conf) { String addr = conf.get( DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, @@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo, jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } - + @Override public void heartbeat(RequestInfo reqInfo) throws IOException { jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) @@ -245,10 +246,10 @@ public GetEditLogManifestResponseProto getEditLogManifest( String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException { - + RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId) .getEditLogManifest(sinceTxId, inProgressOk); - + return GetEditLogManifestResponseProto.newBuilder() .setManifest(PBHelper.convert(manifest)) .setHttpPort(jn.getBoundHttpAddress().getPort()) @@ -256,6 +257,13 @@ public GetEditLogManifestResponseProto getEditLogManifest( .build(); } + @Override + public StorageInfoProto getStorageInfo(String jid, + String nameServiceId) throws IOException { + StorageInfo storage = jn.getOrCreateJournal(jid).getStorage(); + return PBHelper.convert(storage); + } + @Override public GetJournaledEditsResponseProto getJournaledEdits(String jid, String nameServiceId, long sinceTxId, int maxTxns) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index f451b46de7b37..bd7dd086085aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hdfs.qjournal.server; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos; +import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -168,6 +171,10 @@ private boolean getOtherJournalNodeProxies() { private void startSyncJournalsDaemon() { syncJournalDaemon = new Daemon(() -> { + // Format the journal with namespace info from the other JNs if it is not formatted + if (!journal.isFormatted()) { + formatWithSyncer(); + } // Wait for journal to be formatted to create edits.sync directory while(!journal.isFormatted()) { try { @@ -187,7 +194,8 @@ private void startSyncJournalsDaemon() { while(shouldSync) { try { if (!journal.isFormatted()) { - LOG.warn("Journal cannot sync. Not formatted."); + LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer"); + formatWithSyncer(); } else { syncJournals(); } @@ -233,6 +241,36 @@ private void syncJournals() { journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs; } + private void formatWithSyncer() { + LOG.info("Trying to format the journal with the syncer"); + try { + StorageInfo storage = null; + for (JournalNodeProxy jnProxy : otherJNProxies) { + try { + HdfsServerProtos.StorageInfoProto storageInfoResponse = jnProxy.jnProxy.getStorageInfo(jid, null); + storage = PBHelper.convert(storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE); + if (storage.getNamespaceID() == 0) { + LOG.error("Got invalid StorageInfo from " + jnProxy); + continue; + } + LOG.info("Got StorageInfo " + storage + " from " + jnProxy); + break; + } catch (IOException e) { + LOG.error("Could not get StorageInfo from " + jnProxy, e); + } + } + if (storage == null) { + LOG.error("Could not get StorageInfo from any JournalNode. " + + "JournalNodeSyncer cannot format the journal."); + return; + } + NamespaceInfo nsInfo = new NamespaceInfo(storage); + journal.format(nsInfo, true); + } catch (IOException e) { + LOG.error("Exception in formatting the journal with the syncer", e); + } + } + private void syncWithJournalAtIndex(int index) { LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" + jn.getBoundIpcAddress().getPort() + " with " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto index 1c78423b40990..5510eeb7c4239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto @@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal; import "HdfsServer.proto"; import "QJournalProtocol.proto"; +message GetStorageInfoRequestProto { + required JournalIdProto jid = 1; + optional string nameServiceId = 2; +} service InterQJournalProtocolService { rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto) returns (GetEditLogManifestResponseProto); + + rpc getStorageInfo(GetStorageInfoRequestProto) + returns (StorageInfoProto); } \ No newline at end of file From 1b0a910d0476d1da7076410abd5a8ecedda30f22 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Wed, 10 Jul 2024 16:30:15 +0530 Subject: [PATCH 2/8] Fix unit tests --- .../apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index bd7dd086085aa..95dc7493827f2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -251,6 +251,7 @@ private void formatWithSyncer() { storage = PBHelper.convert(storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE); if (storage.getNamespaceID() == 0) { LOG.error("Got invalid StorageInfo from " + jnProxy); + storage = null; continue; } LOG.info("Got StorageInfo " + storage + " from " + jnProxy); From 809e51adc4dce0ef35485ca2d3eb397dbf33af7e Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Sat, 13 Jul 2024 01:10:02 +0530 Subject: [PATCH 3/8] Add unit tests and configs --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 ++ .../qjournal/server/JournalNodeSyncer.java | 21 ++++++++- .../src/main/resources/hdfs-default.xml | 10 +++++ .../qjournal/server/TestJournalNodeSync.java | 45 +++++++++++++++++++ 4 files changed, 77 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index b9f8e07f67a5f..dd3193fdadff2 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY = "dfs.journalnode.sync.interval"; public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L; + public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY = + "dfs.journalnode.enable.sync.format"; + public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false; public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY = "dfs.journalnode.edit-cache-size.bytes"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 95dc7493827f2..e06eeddf9cce0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -82,6 +82,7 @@ public class JournalNodeSyncer { private int numOtherJNs; private int journalNodeIndexForSync = 0; private final long journalSyncInterval; + private final boolean tryFormatting; private final int logSegmentTransferTimeout; private final DataTransferThrottler throttler; private final JournalMetrics metrics; @@ -101,6 +102,9 @@ public class JournalNodeSyncer { logSegmentTransferTimeout = conf.getInt( DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); + tryFormatting = conf.getBoolean( + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT); throttler = getThrottler(conf); metrics = journal.getMetrics(); journalSyncerStarted = false; @@ -196,6 +200,13 @@ private void startSyncJournalsDaemon() { if (!journal.isFormatted()) { LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer"); formatWithSyncer(); + if (journal.isFormatted() && !createEditsSyncDir()) { + LOG.error("Failed to create directory for downloading log " + + "segments: {}. Stopping Journal Node Sync.", + journal.getStorage().getEditsSyncDir()); + return; + } + continue; } else { syncJournals(); } @@ -242,13 +253,19 @@ private void syncJournals() { } private void formatWithSyncer() { + if (!tryFormatting) { + return; + } LOG.info("Trying to format the journal with the syncer"); try { StorageInfo storage = null; for (JournalNodeProxy jnProxy : otherJNProxies) { try { - HdfsServerProtos.StorageInfoProto storageInfoResponse = jnProxy.jnProxy.getStorageInfo(jid, null); - storage = PBHelper.convert(storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE); + HdfsServerProtos.StorageInfoProto storageInfoResponse = + jnProxy.jnProxy.getStorageInfo(jid, null); + storage = PBHelper.convert( + storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE + ); if (storage.getNamespaceID() == 0) { LOG.error("Got invalid StorageInfo from " + jnProxy); storage = null; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 94c3ea0cc9b0c..2ab25f8329ce6 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -5062,6 +5062,16 @@ + + dfs.journalnode.enable.sync.format + false + + If true, the journal node syncer daemon that tries to sync edit + logs between journal nodes will try to format its journal if it is not. + It will query the other journal nodes for the storage info required to format. + + + dfs.journalnode.edit-cache-size.bytes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 28e36e03bfaa5..5feba5eb72ec9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -20,6 +20,7 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.function.Supplier; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -83,6 +84,9 @@ public void setUpMiniCluster() throws IOException { conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFS_HA_TAILEDITS_PERIOD_SECONDS); } + if (testName.getMethodName().equals("testFormatWithSyncer")) { + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true); + } qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) .build(); dfsCluster = qjmhaCluster.getDfsCluster(); @@ -478,6 +482,32 @@ public void testSyncDuringRollingUpgrade() throws Exception { } } + @Test(timeout=300_000) + public void testFormatWithSyncer() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs + long firstTxId = generateEditLog(); + + // Delete them from the JN01 + List missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + + // Delete the storage directory itself to simulate a disk wipe + // and ensure that the in-memory formatting state of JNStorage gets updated + FileUtils.deleteDirectory(firstJournalDir); + jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage(); + + // Wait for JN formatting with Syncer + GenericTestUtils.waitFor(jnFormatted(0), 500, 30000); + // Generate some more edit log so that the JN updates its committed tx id + generateEditLog(); + // Check that the missing edit logs have been synced + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + private File deleteEditLog(File currentDir, long startTxId) throws IOException { EditLogFile logFile = getLogFile(currentDir, startTxId); @@ -581,4 +611,19 @@ public Boolean get() { }; return supplier; } + + private Supplier jnFormatted(int jnIndex) throws Exception { + Supplier supplier = new Supplier() { + @Override + public Boolean get() { + try { + return jCluster.getJournalNode(jnIndex).getOrCreateJournal(jid) + .isFormatted(); + } catch (Exception e) { + return false; + } + } + }; + return supplier; + } } From 0c61a9fc37db6d574cd0199b0c5e5e230cfb0d71 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Mon, 15 Jul 2024 00:56:31 +0530 Subject: [PATCH 4/8] Fix race condition with namenode format command --- .../qjournal/server/JournalNodeSyncer.java | 25 +++++++++++++++++++ .../qjournal/server/TestJournalNodeSync.java | 4 +-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index e06eeddf9cce0..09c34a50455b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -260,6 +260,11 @@ private void formatWithSyncer() { try { StorageInfo storage = null; for (JournalNodeProxy jnProxy : otherJNProxies) { + if (!hasEditLogs(jnProxy)) { + // This avoids a race condition between `hdfs namenode -format` and + // JN syncer by checking if the other JN is not newly formatted. + continue; + } try { HdfsServerProtos.StorageInfoProto storageInfoResponse = jnProxy.jnProxy.getStorageInfo(jid, null); @@ -289,6 +294,26 @@ private void formatWithSyncer() { } } + private boolean hasEditLogs(JournalNodeProxy journalProxy) { + GetEditLogManifestResponseProto editLogManifest; + try { + editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal( + jid, nameServiceId, 0, false); + } catch (IOException e) { + LOG.error("Could not get edit log manifest from " + journalProxy, e); + return false; + } + + List otherJournalEditLogs = PBHelper.convert( + editLogManifest.getManifest()).getLogs(); + if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) { + LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs"); + return false; + } + + return true; + } + private void syncWithJournalAtIndex(int index) { LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" + jn.getBoundIpcAddress().getPort() + " with " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 5feba5eb72ec9..27f7dcb5a73b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -76,6 +76,7 @@ public void setUpMiniCluster() throws IOException { conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, true); conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true); if (testName.getMethodName().equals( "testSyncAfterJNdowntimeWithoutQJournalQueue")) { conf.setInt(DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, 0); @@ -84,9 +85,6 @@ public void setUpMiniCluster() throws IOException { conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFS_HA_TAILEDITS_PERIOD_SECONDS); } - if (testName.getMethodName().equals("testFormatWithSyncer")) { - conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY, true); - } qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) .build(); dfsCluster = qjmhaCluster.getDfsCluster(); From 08ce676b3e8a52cb070cea86e3f0af2203722556 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Wed, 17 Jul 2024 11:47:18 +0530 Subject: [PATCH 5/8] Address review comments --- .../hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java | 2 +- .../apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 92e53880a7769..b09d09aed0379 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -260,7 +260,7 @@ public GetEditLogManifestResponseProto getEditLogManifest( @Override public StorageInfoProto getStorageInfo(String jid, String nameServiceId) throws IOException { - StorageInfo storage = jn.getOrCreateJournal(jid).getStorage(); + StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage(); return PBHelper.convert(storage); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 09c34a50455b9..8bd9bae22b140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -267,7 +267,7 @@ private void formatWithSyncer() { } try { HdfsServerProtos.StorageInfoProto storageInfoResponse = - jnProxy.jnProxy.getStorageInfo(jid, null); + jnProxy.jnProxy.getStorageInfo(jid, nameServiceId); storage = PBHelper.convert( storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE ); From 0636bc4790895d0abdbb6ebd83b2479c804bf8f6 Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Thu, 18 Jul 2024 12:26:21 +0530 Subject: [PATCH 6/8] Bump timeout for unit test to figure out diff b/w local and CI envs --- .../hadoop/hdfs/qjournal/server/TestJournalNodeSync.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 27f7dcb5a73b8..7a39ea75c4114 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -480,7 +480,7 @@ public void testSyncDuringRollingUpgrade() throws Exception { } } - @Test(timeout=300_000) + @Test(timeout=900_000) public void testFormatWithSyncer() throws Exception { File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) @@ -499,11 +499,11 @@ public void testFormatWithSyncer() throws Exception { jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage(); // Wait for JN formatting with Syncer - GenericTestUtils.waitFor(jnFormatted(0), 500, 30000); + GenericTestUtils.waitFor(jnFormatted(0), 500, 90000); // Generate some more edit log so that the JN updates its committed tx id generateEditLog(); // Check that the missing edit logs have been synced - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 90000); } private File deleteEditLog(File currentDir, long startTxId) From d1fd1c1553a6a221d383dc4fbb85ceb14cb13bbe Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Fri, 19 Jul 2024 11:54:14 +0530 Subject: [PATCH 7/8] Fix unit tests --- .../hadoop/hdfs/qjournal/server/TestJournalNodeSync.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 7a39ea75c4114..ac250ffc4f2c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -480,7 +480,7 @@ public void testSyncDuringRollingUpgrade() throws Exception { } } - @Test(timeout=900_000) + @Test(timeout=300_000) public void testFormatWithSyncer() throws Exception { File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) @@ -493,17 +493,18 @@ public void testFormatWithSyncer() throws Exception { List missingLogs = Lists.newArrayList(); missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); - // Delete the storage directory itself to simulate a disk wipe + // Wait to ensure sync starts, delete the storage directory itself to simulate a disk wipe // and ensure that the in-memory formatting state of JNStorage gets updated + Thread.sleep(2000); FileUtils.deleteDirectory(firstJournalDir); jCluster.getJournalNode(0).getOrCreateJournal(jid).getStorage().analyzeStorage(); // Wait for JN formatting with Syncer - GenericTestUtils.waitFor(jnFormatted(0), 500, 90000); + GenericTestUtils.waitFor(jnFormatted(0), 500, 30000); // Generate some more edit log so that the JN updates its committed tx id generateEditLog(); // Check that the missing edit logs have been synced - GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 90000); + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); } private File deleteEditLog(File currentDir, long startTxId) From 4ad404516c74549c2b93fb82a80e0fe8e252e72c Mon Sep 17 00:00:00 2001 From: Aswin M Prabhu Date: Fri, 19 Jul 2024 19:02:46 +0530 Subject: [PATCH 8/8] Retry formatting on startup as well --- .../hadoop/hdfs/qjournal/server/JournalNodeSyncer.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 8bd9bae22b140..75010596b1ae6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -175,13 +175,11 @@ private boolean getOtherJournalNodeProxies() { private void startSyncJournalsDaemon() { syncJournalDaemon = new Daemon(() -> { - // Format the journal with namespace info from the other JNs if it is not formatted - if (!journal.isFormatted()) { - formatWithSyncer(); - } // Wait for journal to be formatted to create edits.sync directory while(!journal.isFormatted()) { try { + // Format the journal with namespace info from the other JNs if it is not formatted + formatWithSyncer(); Thread.sleep(journalSyncInterval); } catch (InterruptedException e) { LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);