Skip to content

Commit dab37c9

Browse files
committed
Make DataTransfer implements Callable rather than Runnable
1 parent efa1bc1 commit dab37c9

File tree

2 files changed

+11
-7
lines changed

2 files changed

+11
-7
lines changed

hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
import java.util.concurrent.Future;
147147
import java.util.concurrent.ScheduledThreadPoolExecutor;
148148
import java.util.concurrent.TimeUnit;
149+
import java.util.concurrent.TimeoutException;
149150
import java.util.concurrent.atomic.AtomicInteger;
150151

151152
import javax.annotation.Nullable;
@@ -2895,7 +2896,7 @@ void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
28952896
xferTargetStorageTypes, xferTargetStorageIDs, block,
28962897
BlockConstructionStage.PIPELINE_SETUP_CREATE, "");
28972898

2898-
this.xferService.execute(dataTransferTask);
2899+
this.xferService.submit(dataTransferTask);
28992900
}
29002901
}
29012902

@@ -3006,7 +3007,7 @@ CHECKSUM_SIZE depends on CHECKSUM_TYPE (usually, 4 for CRC32)
30063007
* Used for transferring a block of data. This class
30073008
* sends a piece of data to another DataNode.
30083009
*/
3009-
private class DataTransfer implements Runnable {
3010+
private class DataTransfer implements Callable<Void> {
30103011
final DatanodeInfo[] targets;
30113012
final StorageType[] targetStorageTypes;
30123013
final private String[] targetStorageIds;
@@ -3064,9 +3065,11 @@ private class DataTransfer implements Runnable {
30643065

30653066
/**
30663067
* Do the deed, write the bytes
3068+
*
3069+
* @return
30673070
*/
30683071
@Override
3069-
public void run() {
3072+
public Void call() throws IOException {
30703073
incrementXmitsInProgress();
30713074
Socket sock = null;
30723075
DataOutputStream out = null;
@@ -3146,15 +3149,15 @@ public void run() {
31463149
}
31473150
} catch (IOException ie) {
31483151
if (copyBlockCrossNamespace) {
3149-
throw new RuntimeException(ie);
3152+
throw ie;
31503153
}
31513154
handleBadBlock(source, ie, false);
31523155
LOG.warn("{}:Failed to transfer {} to {} got",
31533156
bpReg, source, targets[0], ie);
31543157
} catch (Throwable t) {
31553158
LOG.error("Failed to transfer block {}", source, t);
31563159
if (copyBlockCrossNamespace) {
3157-
throw new RuntimeException(t);
3160+
throw t;
31583161
}
31593162
} finally {
31603163
decrementXmitsInProgress();
@@ -3163,6 +3166,7 @@ public void run() {
31633166
IOUtils.closeStream(in);
31643167
IOUtils.closeSocket(sock);
31653168
}
3169+
return null;
31663170
}
31673171

31683172
@Override
@@ -4444,7 +4448,7 @@ public void copyBlockCrossNamespace(ExtendedBlock sourceBlk, ExtendedBlock targe
44444448
}
44454449
try {
44464450
result.get(getDnConf().getCopyBlockCrossNamespaceSocketTimeout(), TimeUnit.MILLISECONDS);
4447-
} catch (Exception e) {
4451+
} catch (ExecutionException | InterruptedException | TimeoutException e) {
44484452
LOG.error(e.getMessage());
44494453
throw new IOException(e);
44504454
}

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ public void testReleaseVolumeRefIfExceptionThrown()
666666
}
667667
}
668668

669-
@Test(timeout = 90000)
669+
@Test
670670
public void testCopyBlockCrossNamespace()
671671
throws IOException, InterruptedException, TimeoutException {
672672
Configuration conf = new HdfsConfiguration();

0 commit comments

Comments
 (0)