Skip to content

Commit 318a281

Browse files
aswinmprabhuAswin M Prabhu
authored andcommitted
HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer
1 parent 134dcf1 commit 318a281

File tree

6 files changed

+104
-11
lines changed

6 files changed

+104
-11
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/InterQJournalProtocol.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.hadoop.classification.InterfaceAudience;
2222
import org.apache.hadoop.hdfs.DFSConfigKeys;
23+
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
2324
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
2425
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
2526
import org.apache.hadoop.security.KerberosInfo;
@@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
5152
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
5253
throws IOException;
5354

55+
/**
56+
* Get the storage info for the specified journal.
57+
* @param jid the journal identifier
58+
* @param nameServiceId the name service id
59+
* @return the storage info object
60+
*/
61+
StorageInfoProto getStorageInfo(String jid, String nameServiceId)
62+
throws IOException;
63+
5464
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolServerSideTranslatorPB.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
27+
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
28+
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
2729
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
2830
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
2931

@@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
6062
throw new ServiceException(e);
6163
}
6264
}
65+
66+
@Override
67+
public StorageInfoProto getStorageInfo(
68+
RpcController controller, GetStorageInfoRequestProto request)
69+
throws ServiceException {
70+
try {
71+
return impl.getStorageInfo(
72+
request.getJid().getIdentifier(),
73+
request.hasNameServiceId() ? request.getNameServiceId() : null
74+
);
75+
} catch (IOException e) {
76+
throw new ServiceException(e);
77+
}
78+
}
6379
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/InterQJournalProtocolTranslatorPB.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.hadoop.hdfs.qjournal.protocolPB;
2121

22+
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
23+
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
2224
import org.apache.hadoop.thirdparty.protobuf.RpcController;
2325
import org.apache.hadoop.classification.InterfaceAudience;
2426
import org.apache.hadoop.classification.InterfaceStability;
@@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
7577
req.build()));
7678
}
7779

80+
@Override
81+
public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
82+
throws IOException {
83+
InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
84+
InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
85+
.setJid(convertJournalId(jid));
86+
if (nameServiceId != null) {
87+
req.setNameServiceId(nameServiceId);
88+
}
89+
return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
90+
}
91+
7892
private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
7993
return QJournalProtocolProtos.JournalIdProto.newBuilder()
8094
.setIdentifier(jid)

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hdfs.qjournal.server;
1919

2020
import org.apache.hadoop.classification.VisibleForTesting;
21+
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
2122
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
2223
import org.slf4j.Logger;
2324
import org.apache.hadoop.classification.InterfaceAudience;
@@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,
7172

7273
JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
7374
this.jn = jn;
74-
75+
7576
Configuration confCopy = new Configuration(conf);
76-
77+
7778
// Ensure that nagling doesn't kick in, which could cause latency issues.
7879
confCopy.setBoolean(
7980
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
8081
true);
81-
82+
8283
InetSocketAddress addr = getAddress(confCopy);
8384
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
8485
if (bindHost == null) {
@@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
104105
this.handlerCount = confHandlerCount;
105106
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
106107
this.handlerCount);
107-
108+
108109
this.server = new RPC.Builder(confCopy)
109110
.setProtocol(QJournalProtocolPB.class)
110111
.setInstance(service)
@@ -149,15 +150,15 @@ void start() {
149150
public InetSocketAddress getAddress() {
150151
return server.getListenerAddress();
151152
}
152-
153+
153154
void join() throws InterruptedException {
154155
this.server.join();
155156
}
156-
157+
157158
void stop() {
158159
this.server.stop();
159160
}
160-
161+
161162
static InetSocketAddress getAddress(Configuration conf) {
162163
String addr = conf.get(
163164
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
@@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo,
211212
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
212213
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
213214
}
214-
215+
215216
@Override
216217
public void heartbeat(RequestInfo reqInfo) throws IOException {
217218
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
@@ -245,17 +246,24 @@ public GetEditLogManifestResponseProto getEditLogManifest(
245246
String jid, String nameServiceId,
246247
long sinceTxId, boolean inProgressOk)
247248
throws IOException {
248-
249+
249250
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
250251
.getEditLogManifest(sinceTxId, inProgressOk);
251-
252+
252253
return GetEditLogManifestResponseProto.newBuilder()
253254
.setManifest(PBHelper.convert(manifest))
254255
.setHttpPort(jn.getBoundHttpAddress().getPort())
255256
.setFromURL(jn.getHttpServerURI())
256257
.build();
257258
}
258259

260+
@Override
261+
public StorageInfoProto getStorageInfo(String jid,
262+
String nameServiceId) throws IOException {
263+
StorageInfo storage = jn.getOrCreateJournal(jid).getStorage();
264+
return PBHelper.convert(storage);
265+
}
266+
259267
@Override
260268
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
261269
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.hadoop.hdfs.qjournal.server;
1919

2020
import org.apache.hadoop.classification.VisibleForTesting;
21+
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
22+
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
23+
import org.apache.hadoop.hdfs.server.common.StorageInfo;
2124
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
2225
import org.apache.hadoop.classification.InterfaceAudience;
2326
import org.apache.hadoop.conf.Configuration;
@@ -168,6 +171,10 @@ private boolean getOtherJournalNodeProxies() {
168171

169172
private void startSyncJournalsDaemon() {
170173
syncJournalDaemon = new Daemon(() -> {
174+
// Format the journal with namespace info from the other JNs if it is not formatted
175+
if (!journal.isFormatted()) {
176+
formatWithSyncer();
177+
}
171178
// Wait for journal to be formatted to create edits.sync directory
172179
while(!journal.isFormatted()) {
173180
try {
@@ -187,7 +194,8 @@ private void startSyncJournalsDaemon() {
187194
while(shouldSync) {
188195
try {
189196
if (!journal.isFormatted()) {
190-
LOG.warn("Journal cannot sync. Not formatted.");
197+
LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
198+
formatWithSyncer();
191199
} else {
192200
syncJournals();
193201
}
@@ -233,6 +241,36 @@ private void syncJournals() {
233241
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
234242
}
235243

244+
private void formatWithSyncer() {
245+
LOG.info("Trying to format the journal with the syncer");
246+
try {
247+
StorageInfo storage = null;
248+
for (JournalNodeProxy jnProxy : otherJNProxies) {
249+
try {
250+
HdfsServerProtos.StorageInfoProto storageInfoResponse = jnProxy.jnProxy.getStorageInfo(jid, null);
251+
storage = PBHelper.convert(storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE);
252+
if (storage.getNamespaceID() == 0) {
253+
LOG.error("Got invalid StorageInfo from " + jnProxy);
254+
continue;
255+
}
256+
LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
257+
break;
258+
} catch (IOException e) {
259+
LOG.error("Could not get StorageInfo from " + jnProxy, e);
260+
}
261+
}
262+
if (storage == null) {
263+
LOG.error("Could not get StorageInfo from any JournalNode. " +
264+
"JournalNodeSyncer cannot format the journal.");
265+
return;
266+
}
267+
NamespaceInfo nsInfo = new NamespaceInfo(storage);
268+
journal.format(nsInfo, true);
269+
} catch (IOException e) {
270+
LOG.error("Exception in formatting the journal with the syncer", e);
271+
}
272+
}
273+
236274
private void syncWithJournalAtIndex(int index) {
237275
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
238276
+ jn.getBoundIpcAddress().getPort() + " with "

hadoop-hdfs-project/hadoop-hdfs/src/main/proto/InterQJournalProtocol.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal;
3131
import "HdfsServer.proto";
3232
import "QJournalProtocol.proto";
3333

34+
message GetStorageInfoRequestProto {
35+
required JournalIdProto jid = 1;
36+
optional string nameServiceId = 2;
37+
}
3438

3539
service InterQJournalProtocolService {
3640
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
3741
returns (GetEditLogManifestResponseProto);
42+
43+
rpc getStorageInfo(GetStorageInfoRequestProto)
44+
returns (StorageInfoProto);
3845
}

0 commit comments

Comments
 (0)