Skip to content

Commit c906ec8

Browse files
committed
apache#3. datanode的启动流程
1 parent 61d9c65 commit c906ec8

File tree

5 files changed

+22
-0
lines changed

5 files changed

+22
-0
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,9 +453,12 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
453453
storageMap.values());
454454
}
455455

456+
// TODO 更改存储信息
456457
setCacheCapacity(cacheCapacity);
457458
setCacheUsed(cacheUsed);
458459
setXceiverCount(xceiverCount);
460+
461+
// TODO 修改上一次心跳时间
459462
setLastUpdate(Time.now());
460463
setLastUpdateMonotonic(Time.monotonicNow());
461464
this.volumeFailures = volFailures;

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,8 @@ void removeDeadDatanode(final DatanodeID nodeID) {
582582

583583
/** Is the datanode dead? */
584584
boolean isDatanodeDead(DatanodeDescriptor node) {
585+
// 如果超过10分钟30秒没有心跳,就认为是dead
586+
// 超时后会复制副本到新的机器
585587
return (node.getLastUpdateMonotonic() <
586588
(monotonicNow() - heartbeatExpireInterval));
587589
}
@@ -1349,6 +1351,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
13491351
synchronized (datanodeMap) {
13501352
DatanodeDescriptor nodeinfo = null;
13511353
try {
1354+
// 获取node的信息
13521355
nodeinfo = getDatanode(nodeReg);
13531356
} catch(UnregisteredNodeException e) {
13541357
return new DatanodeCommand[]{RegisterCommand.REGISTER};
@@ -1360,10 +1363,12 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
13601363
throw new DisallowedDatanodeException(nodeinfo);
13611364
}
13621365

1366+
// 如果没有则注册node的信息
13631367
if (nodeinfo == null || !nodeinfo.isAlive) {
13641368
return new DatanodeCommand[]{RegisterCommand.REGISTER};
13651369
}
13661370

1371+
// TODO 更新心跳的重要的信息
13671372
heartbeatManager.updateHeartbeat(nodeinfo, reports,
13681373
cacheCapacity, cacheUsed,
13691374
xceiverCount, failedVolumes,

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ synchronized void updateHeartbeat(final DatanodeDescriptor node,
221221
int xceiverCount, int failedVolumes,
222222
VolumeFailureSummary volumeFailureSummary) {
223223
stats.subtract(node);
224+
// TODO 重要代码
224225
node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
225226
xceiverCount, failedVolumes, volumeFailureSummary);
226227
stats.add(node);
@@ -283,6 +284,7 @@ void heartbeatCheck() {
283284
int numOfStaleNodes = 0;
284285
int numOfStaleStorages = 0;
285286
synchronized(this) {
287+
// TODO 遍历每个datanode
286288
for (DatanodeDescriptor d : datanodes) {
287289
if (dead == null && dm.isDatanodeDead(d)) {
288290
stats.incrExpiredHeartbeats();
@@ -355,6 +357,7 @@ public void run() {
355357
try {
356358
final long now = Time.monotonicNow();
357359
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
360+
// TODO 心跳检查
358361
heartbeatCheck();
359362
lastHeartbeatCheck = now;
360363
}

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ private void offerService() throws Exception {
692692
//
693693
// Every so often, send heartbeat or block-report
694694
//
695+
// TODO 心跳是每三秒发送一次
695696
if (startTime - lastHeartbeat >= dnConf.heartBeatInterval) {
696697
//
697698
// All heartbeat messages include following info:
@@ -702,6 +703,11 @@ private void offerService() throws Exception {
702703
//
703704
lastHeartbeat = startTime;
704705
if (!dn.areHeartbeatsDisabledForTests()) {
706+
// Namenode 是不直接跟Datanode进行链接的
707+
// DataNode 发送心跳给Namenode
708+
// NameNode接收到心跳后,会返回一些命令
709+
// Datanode接收到这些命令后,根据这些命令做对应的操作
710+
// TODO 发送心跳,返回的是NameNode的响应命令
705711
HeartbeatResponse resp = sendHeartBeat();
706712
assert resp != null;
707713
dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
@@ -721,6 +727,8 @@ private void offerService() throws Exception {
721727
}
722728

723729
long startProcessCommands = monotonicNow();
730+
// TODO 处理namenode返回的指令
731+
// 比如删除block,复制block
724732
if (!processCommand(resp.getCommands()))
725733
continue;
726734
long endProcessCommands = monotonicNow();
@@ -877,6 +885,7 @@ public void run() {
877885

878886
while (shouldRun()) {
879887
try {
888+
// TODO 发送心跳
880889
offerService();
881890
} catch (Exception ex) {
882891
LOG.error("Exception in BPOfferService for " + this, ex);

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4529,6 +4529,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
45294529
//get datanode commands
45304530
final int maxTransfer = blockManager.getMaxReplicationStreams()
45314531
- xmitsInProgress;
4532+
// TODO NameNode处理Datanode发送过来的心跳
45324533
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
45334534
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
45344535
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
@@ -4538,6 +4539,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
45384539
haContext.getState().getServiceState(),
45394540
getFSImage().getLastAppliedOrWrittenTxId());
45404541

4542+
// TODO 给DataNode返回响应
45414543
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
45424544
} finally {
45434545
readUnlock();

0 commit comments

Comments
 (0)