Skip to content

Commit ff4b113

Browse files
author
xiezhineng
committed
add unit test
1 parent e471564 commit ff4b113

File tree

1 file changed

+168
-0
lines changed

1 file changed

+168
-0
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
* <p>
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
* <p>
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdfs;
19+
20+
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.Path;
25+
import org.apache.hadoop.hdfs.protocol.Block;
26+
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
27+
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
28+
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
29+
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
30+
import org.apache.hadoop.hdfs.server.datanode.DataNode;
31+
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
32+
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
33+
import org.apache.hadoop.io.erasurecode.CodecUtil;
34+
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
35+
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
36+
import org.apache.hadoop.test.GenericTestUtils;
37+
import org.junit.After;
38+
import org.junit.Assert;
39+
import org.junit.Before;
40+
import org.junit.Rule;
41+
import org.junit.Test;
42+
import org.junit.rules.Timeout;
43+
44+
import java.io.IOException;
45+
import java.util.Arrays;
46+
47+
public class TestDFSStripedInputStreamWithTimeout {
48+
49+
public static final Logger LOG =
50+
LoggerFactory.getLogger(TestDFSStripedInputStreamWithTimeout.class);
51+
52+
private MiniDFSCluster cluster;
53+
private Configuration conf = new Configuration();
54+
private DistributedFileSystem fs;
55+
private final Path dirPath = new Path("/striped");
56+
private Path filePath = new Path(dirPath, "file");
57+
private ErasureCodingPolicy ecPolicy;
58+
private short dataBlocks;
59+
private short parityBlocks;
60+
private int cellSize;
61+
private final int stripesPerBlock = 2;
62+
private int blockSize;
63+
private int blockGroupSize;
64+
65+
@Rule
66+
public Timeout globalTimeout = new Timeout(300000);
67+
68+
public ErasureCodingPolicy getEcPolicy() {
69+
return StripedFileTestUtil.getDefaultECPolicy();
70+
}
71+
72+
@Before
73+
public void setup() throws IOException {
74+
/*
75+
* Initialize erasure coding policy.
76+
*/
77+
ecPolicy = getEcPolicy();
78+
dataBlocks = (short) ecPolicy.getNumDataUnits();
79+
parityBlocks = (short) ecPolicy.getNumParityUnits();
80+
cellSize = ecPolicy.getCellSize();
81+
blockSize = stripesPerBlock * cellSize;
82+
blockGroupSize = dataBlocks * blockSize;
83+
System.out.println("EC policy = " + ecPolicy);
84+
85+
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
86+
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
87+
88+
conf.setInt(DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, 1000);
89+
// SET CONFIG FOR HDFS CLIENT
90+
conf.setInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
91+
conf.setInt(HdfsClientConfigKeys.StripedRead.DATANODE_MAX_ATTEMPTS, 3);
92+
93+
if (ErasureCodeNative.isNativeCodeLoaded()) {
94+
conf.set(
95+
CodecUtil.IO_ERASURECODE_CODEC_RS_RAWCODERS_KEY,
96+
NativeRSRawErasureCoderFactory.CODER_NAME);
97+
}
98+
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
99+
GenericTestUtils.getRandomizedTempPath());
100+
SimulatedFSDataset.setFactory(conf);
101+
startUp();
102+
}
103+
104+
private void startUp() throws IOException {
105+
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
106+
dataBlocks + parityBlocks).build();
107+
cluster.waitActive();
108+
for (DataNode dn : cluster.getDataNodes()) {
109+
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
110+
}
111+
fs = cluster.getFileSystem();
112+
fs.enableErasureCodingPolicy(getEcPolicy().getName());
113+
fs.mkdirs(dirPath);
114+
fs.getClient()
115+
.setErasureCodingPolicy(dirPath.toString(), ecPolicy.getName());
116+
}
117+
118+
@After
119+
public void tearDown() {
120+
if (cluster != null) {
121+
cluster.shutdown();
122+
cluster = null;
123+
}
124+
}
125+
126+
@Test
127+
public void testPreadTimeout() throws Exception {
128+
final int numBlocks = 2;
129+
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
130+
stripesPerBlock, false, ecPolicy);
131+
final int fileSize = numBlocks * blockGroupSize;
132+
133+
LocatedBlocks lbs = fs.getClient().namenode.
134+
getBlockLocations(filePath.toString(), 0, fileSize);
135+
136+
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
137+
assert lb instanceof LocatedStripedBlock;
138+
LocatedStripedBlock bg = (LocatedStripedBlock) (lb);
139+
for (int i = 0; i < dataBlocks; i++) {
140+
Block blk = new Block(bg.getBlock().getBlockId() + i,
141+
stripesPerBlock * cellSize,
142+
bg.getBlock().getGenerationStamp());
143+
blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
144+
cluster.injectBlocks(i, Arrays.asList(blk),
145+
bg.getBlock().getBlockPoolId());
146+
}
147+
}
148+
149+
DFSStripedInputStream in = new DFSStripedInputStream(fs.getClient(),
150+
filePath.toString(), false, ecPolicy, null);
151+
int bufLen = 1024 * 100;
152+
byte[] buf = new byte[bufLen];
153+
int readTotal = 0;
154+
try {
155+
while (readTotal < fileSize) {
156+
in.seek(readTotal);
157+
int nread = in.read(buf, 0, bufLen);
158+
readTotal += nread;
159+
// Simulated time-consuming processing operations, such as UDF.
160+
Thread.sleep(10);
161+
}
162+
Assert.assertEquals("Success to read striped file.", fileSize, readTotal);
163+
} catch (Exception e) {
164+
Assert.fail("Fail to read striped time out. ");
165+
}
166+
in.close();
167+
}
168+
}

0 commit comments

Comments
 (0)