|
49 | 49 | import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
50 | 50 | import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; |
51 | 51 | import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; |
| 52 | +import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; |
52 | 53 | import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; |
53 | 54 | import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; |
54 | 55 | import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; |
|
64 | 65 | import org.junit.BeforeClass; |
65 | 66 | import org.junit.Test; |
66 | 67 |
|
67 | | -import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY; |
68 | 68 | import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY; |
69 | 69 | import static org.junit.Assert.assertFalse; |
70 | 70 | import static org.junit.Assert.assertTrue; |
@@ -186,40 +186,6 @@ public void testPreventOverflow() throws IOException, NoSuchFieldException, |
186 | 186 | runAdjustChunkBoundary(configuredWritePacketSize, finalWritePacketSize); |
187 | 187 | } |
188 | 188 |
|
189 | | - @Test(timeout=60000) |
190 | | - public void testFirstPacketSizeInNewBlocks() throws IOException { |
191 | | - final long blockSize = 1L * 1024 * 1024; |
192 | | - final int numDataNodes = 3; |
193 | | - final Configuration dfsConf = new Configuration(); |
194 | | - dfsConf.setLong(DFS_BLOCK_SIZE_KEY, blockSize); |
195 | | - MiniDFSCluster dfsCluster = null; |
196 | | - dfsCluster = new MiniDFSCluster.Builder(dfsConf).numDataNodes(numDataNodes).build(); |
197 | | - dfsCluster.waitActive(); |
198 | | - |
199 | | - DistributedFileSystem fs = dfsCluster.getFileSystem(); |
200 | | - Path fileName = new Path("/testfile.dat"); |
201 | | - FSDataOutputStream fos = fs.create(fileName); |
202 | | - DataChecksum crc32c = DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512); |
203 | | - |
204 | | - long loop = 0; |
205 | | - Random r = new Random(); |
206 | | - byte[] buf = new byte[1 * 1024 * 1024]; |
207 | | - r.nextBytes(buf); |
208 | | - fos.write(buf); |
209 | | - fos.hflush(); |
210 | | - |
211 | | - while (loop < 20) { |
212 | | - r.nextBytes(buf); |
213 | | - fos.write(buf); |
214 | | - fos.hflush(); |
215 | | - loop++; |
216 | | - Assert.assertNotEquals(crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(), |
217 | | - ((DFSOutputStream)fos.getWrappedStream()).packetSize); |
218 | | - } |
219 | | - |
220 | | - fos.close(); |
221 | | - } |
222 | | - |
223 | 189 | /** |
224 | 190 | * @configuredWritePacketSize the configured WritePacketSize. |
225 | 191 | * @finalWritePacketSize the final WritePacketSize picked by |
@@ -544,6 +510,45 @@ public void testExceptionInCloseWithoutRecoverLease() throws Exception { |
544 | 510 | } |
545 | 511 | } |
546 | 512 |
|
| 513 | + @Test(timeout=60000) |
| 514 | + public void testFirstPacketSizeInNewBlocks() throws IOException { |
| 515 | + final long blockSize = (long) 1024 * 1024; |
| 516 | + MiniDFSCluster dfsCluster = cluster; |
| 517 | + DistributedFileSystem fs = dfsCluster.getFileSystem(); |
| 518 | + Configuration dfsConf = fs.getConf(); |
| 519 | + |
| 520 | + EnumSet<CreateFlag> flags = EnumSet.of(CreateFlag.CREATE); |
| 521 | + try(FSDataOutputStream fos = fs.create(new Path("/testfile.dat"), |
| 522 | + FsPermission.getDefault(), |
| 523 | + flags, 512, (short)3, blockSize, null)) { |
| 524 | + |
| 525 | + DataChecksum crc32c = DataChecksum.newDataChecksum( |
| 526 | + DataChecksum.Type.CRC32C, 512); |
| 527 | + |
| 528 | + long loop = 0; |
| 529 | + Random r = new Random(); |
| 530 | + byte[] buf = new byte[(int) blockSize]; |
| 531 | + r.nextBytes(buf); |
| 532 | + fos.write(buf); |
| 533 | + fos.hflush(); |
| 534 | + |
| 535 | + int chunkSize = crc32c.getBytesPerChecksum() + crc32c.getChecksumSize(); |
| 536 | + int packetContentSize = (dfsConf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, |
| 537 | + DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT) - |
| 538 | + PacketHeader.PKT_MAX_HEADER_LEN) / chunkSize * chunkSize; |
| 539 | + |
| 540 | + while (loop < 20) { |
| 541 | + r.nextBytes(buf); |
| 542 | + fos.write(buf); |
| 543 | + fos.hflush(); |
| 544 | + loop++; |
| 545 | + Assert.assertEquals(((DFSOutputStream) fos.getWrappedStream()).packetSize, |
| 546 | + packetContentSize); |
| 547 | + } |
| 548 | + } |
| 549 | + fs.delete(new Path("/testfile.dat"), true); |
| 550 | + } |
| 551 | + |
547 | 552 | @AfterClass |
548 | 553 | public static void tearDown() { |
549 | 554 | if (cluster != null) { |
|
0 commit comments