|
17 | 17 | */ |
18 | 18 | package org.apache.hadoop.hdfs; |
19 | 19 |
|
| 20 | +import org.apache.hadoop.fs.FSDataOutputStream; |
| 21 | +import org.apache.hadoop.fs.FileStatus; |
20 | 22 | import org.slf4j.Logger; |
21 | 23 | import org.slf4j.LoggerFactory; |
22 | 24 | import org.apache.hadoop.HadoopIllegalArgumentException; |
|
50 | 52 | import java.util.Arrays; |
51 | 53 | import java.util.Collections; |
52 | 54 | import java.util.List; |
| 55 | +import java.util.Random; |
53 | 56 |
|
54 | 57 | import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; |
55 | 58 | import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; |
@@ -735,4 +738,63 @@ public void onCreateBlockReader(LocatedBlock block, int chunkIndex, |
735 | 738 | assertEquals(rangesExpected, ranges); |
736 | 739 | } |
737 | 740 |
|
| 741 | + @Test |
| 742 | + public void testStatefulReadRetryWhenMoreThanParityFailOnce() throws Exception { |
| 743 | + HdfsConfiguration hdfsConf = new HdfsConfiguration(); |
| 744 | + String testBaseDir = "/testECRead"; |
| 745 | + String testfileName = "testfile"; |
| 746 | + DFSClientFaultInjector old = DFSClientFaultInjector.get(); |
| 747 | + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(hdfsConf) |
| 748 | + .numDataNodes(9).build()) { |
| 749 | + cluster.waitActive(); |
| 750 | + final DistributedFileSystem dfs = cluster.getFileSystem(); |
| 751 | + Path dir = new Path(testBaseDir); |
| 752 | + assertTrue(dfs.mkdirs(dir)); |
| 753 | + dfs.enableErasureCodingPolicy("RS-6-3-1024k"); |
| 754 | + dfs.setErasureCodingPolicy(dir, "RS-6-3-1024k"); |
| 755 | + assertEquals("RS-6-3-1024k", dfs.getErasureCodingPolicy(dir).getName()); |
| 756 | + |
| 757 | + int writeBufSize = 30 * 1024 * 1024 + 1; |
| 758 | + byte[] writeBuf = new byte[writeBufSize]; |
| 759 | + try (FSDataOutputStream fsdos = dfs.create( |
| 760 | + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { |
| 761 | + Random random = new Random(); |
| 762 | + random.nextBytes(writeBuf); |
| 763 | + fsdos.write(writeBuf, 0, writeBuf.length); |
| 764 | + Thread.sleep(1000); |
| 765 | + } |
| 766 | + FileStatus fileStatus = dfs.getFileStatus( |
| 767 | + new Path(testBaseDir + Path.SEPARATOR + testfileName)); |
| 768 | + assertEquals(writeBufSize, fileStatus.getLen()); |
| 769 | + |
| 770 | + DFSClientFaultInjector.set(new DFSClientFaultInjector() { |
| 771 | + @Override |
| 772 | + public void failWhenReadWithStrategy(boolean isRetryRead) throws IOException { |
| 773 | + if (!isRetryRead) { |
| 774 | + throw new IOException("Mock more than parity num blocks fail when readOneStripe."); |
| 775 | + } |
| 776 | + } |
| 777 | + }); |
| 778 | + |
| 779 | + // We use unaligned buffer size to trigger some corner cases. |
| 780 | + byte[] readBuf = new byte[4095]; |
| 781 | + byte[] totalReadBuf = new byte[writeBufSize]; // Buffer to store all read data |
| 782 | + int ret = 0; |
| 783 | + int totalReadBytes = 0; |
| 784 | + try (FSDataInputStream fsdis = dfs.open( |
| 785 | + new Path(testBaseDir + Path.SEPARATOR + testfileName))) { |
| 786 | + while((ret = fsdis.read(readBuf)) > 0) { |
| 787 | + System.arraycopy(readBuf, 0, totalReadBuf, totalReadBytes, ret); |
| 788 | + totalReadBytes += ret; |
| 789 | + } |
| 790 | + |
| 791 | + // Compare the read data with the original writeBuf. |
| 792 | + assertEquals(writeBufSize, totalReadBytes, "Total bytes read should match writeBuf size"); |
| 793 | + assertArrayEquals(writeBuf, totalReadBuf, "Read data should match original write data"); |
| 794 | + } |
| 795 | + assertTrue(dfs.delete(new Path(testBaseDir + Path.SEPARATOR + testfileName), true)); |
| 796 | + } finally { |
| 797 | + DFSClientFaultInjector.set(old); |
| 798 | + } |
| 799 | + } |
738 | 800 | } |
0 commit comments