Skip to content

Commit 3b10cb5

Browse files
HADOOP-18507. VectorIO FileRange type to support a "reference" field (#5076)
Contributed by Steve Loughran
1 parent b1f418f commit 3b10cb5

File tree

5 files changed

+141
-40
lines changed

5 files changed

+141
-40
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileRange.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,33 @@ public interface FileRange {
5555
*/
5656
void setData(CompletableFuture<ByteBuffer> data);
5757

58+
/**
59+
* Get any reference passed in to the file range constructor.
60+
* This is not used by any implementation code; it is to help
61+
* bind this API to libraries retrieving multiple stripes of
62+
* data in parallel.
63+
* @return a reference or null.
64+
*/
65+
Object getReference();
66+
5867
/**
5968
* Factory method to create a FileRange object.
6069
* @param offset starting offset of the range.
6170
* @param length length of the range.
6271
* @return a new instance of FileRangeImpl.
6372
*/
6473
static FileRange createFileRange(long offset, int length) {
65-
return new FileRangeImpl(offset, length);
74+
return new FileRangeImpl(offset, length, null);
75+
}
76+
77+
/**
78+
* Factory method to create a FileRange object.
79+
* @param offset starting offset of the range.
80+
* @param length length of the range.
81+
* @param reference nullable reference to store in the range.
82+
* @return a new instance of FileRangeImpl.
83+
*/
84+
static FileRange createFileRange(long offset, int length, Object reference) {
85+
return new FileRangeImpl(offset, length, reference);
6686
}
6787
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@
2929
* together into a single read for efficiency.
3030
*/
3131
public class CombinedFileRange extends FileRangeImpl {
32-
private ArrayList<FileRange> underlying = new ArrayList<>();
32+
private List<FileRange> underlying = new ArrayList<>();
3333

3434
public CombinedFileRange(long offset, long end, FileRange original) {
35-
super(offset, (int) (end - offset));
35+
super(offset, (int) (end - offset), null);
3636
this.underlying.add(original);
3737
}
3838

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FileRangeImpl.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,21 @@ public class FileRangeImpl implements FileRange {
3434
private int length;
3535
private CompletableFuture<ByteBuffer> reader;
3636

37-
public FileRangeImpl(long offset, int length) {
37+
/**
38+
* nullable reference to store in the range.
39+
*/
40+
private final Object reference;
41+
42+
/**
43+
* Create.
44+
* @param offset offset in file
45+
* @param length length of data to read.
46+
* @param reference nullable reference to store in the range.
47+
*/
48+
public FileRangeImpl(long offset, int length, Object reference) {
3849
this.offset = offset;
3950
this.length = length;
51+
this.reference = reference;
4052
}
4153

4254
@Override
@@ -71,4 +83,9 @@ public void setData(CompletableFuture<ByteBuffer> pReader) {
7183
public CompletableFuture<ByteBuffer> getData() {
7284
return reader;
7385
}
86+
87+
@Override
88+
public Object getReference() {
89+
return reference;
90+
}
7491
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestVectoredReadUtils.java

Lines changed: 99 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -96,52 +96,59 @@ public void testRounding() {
9696

9797
@Test
9898
public void testMerge() {
99-
FileRange base = FileRange.createFileRange(2000, 1000);
99+
// a reference to use for tracking
100+
Object tracker1 = "one";
101+
Object tracker2 = "two";
102+
FileRange base = FileRange.createFileRange(2000, 1000, tracker1);
100103
CombinedFileRange mergeBase = new CombinedFileRange(2000, 3000, base);
101104

102105
// test when the gap between is too big
103106
assertFalse("Large gap ranges shouldn't get merged", mergeBase.merge(5000, 6000,
104107
FileRange.createFileRange(5000, 1000), 2000, 4000));
105108
assertEquals("Number of ranges in merged range shouldn't increase",
106109
1, mergeBase.getUnderlying().size());
107-
assertEquals("post merge offset", 2000, mergeBase.getOffset());
108-
assertEquals("post merge length", 1000, mergeBase.getLength());
110+
assertFileRange(mergeBase, 2000, 1000);
109111

110112
// test when the total size gets exceeded
111113
assertFalse("Large size ranges shouldn't get merged", mergeBase.merge(5000, 6000,
112114
FileRange.createFileRange(5000, 1000), 2001, 3999));
113115
assertEquals("Number of ranges in merged range shouldn't increase",
114116
1, mergeBase.getUnderlying().size());
115-
assertEquals("post merge offset", 2000, mergeBase.getOffset());
116-
assertEquals("post merge length", 1000, mergeBase.getLength());
117+
assertFileRange(mergeBase, 2000, 1000);
117118

118119
// test when the merge works
119120
assertTrue("ranges should get merged ", mergeBase.merge(5000, 6000,
120-
FileRange.createFileRange(5000, 1000), 2001, 4000));
121+
FileRange.createFileRange(5000, 1000, tracker2),
122+
2001, 4000));
121123
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
122-
assertEquals("post merge offset", 2000, mergeBase.getOffset());
123-
assertEquals("post merge length", 4000, mergeBase.getLength());
124+
assertFileRange(mergeBase, 2000, 4000);
125+
126+
Assertions.assertThat(mergeBase.getUnderlying().get(0).getReference())
127+
.describedAs("reference of range %s", mergeBase.getUnderlying().get(0))
128+
.isSameAs(tracker1);
129+
Assertions.assertThat(mergeBase.getUnderlying().get(1).getReference())
130+
.describedAs("reference of range %s", mergeBase.getUnderlying().get(1))
131+
.isSameAs(tracker2);
124132

125133
// reset the mergeBase and test with a 10:1 reduction
126134
mergeBase = new CombinedFileRange(200, 300, base);
127-
assertEquals(200, mergeBase.getOffset());
128-
assertEquals(100, mergeBase.getLength());
135+
assertFileRange(mergeBase, 200, 100);
136+
129137
assertTrue("ranges should get merged ", mergeBase.merge(500, 600,
130138
FileRange.createFileRange(5000, 1000), 201, 400));
131139
assertEquals("post merge size", 2, mergeBase.getUnderlying().size());
132-
assertEquals("post merge offset", 200, mergeBase.getOffset());
133-
assertEquals("post merge length", 400, mergeBase.getLength());
140+
assertFileRange(mergeBase, 200, 400);
134141
}
135142

136143
@Test
137144
public void testSortAndMerge() {
138145
List<FileRange> input = Arrays.asList(
139-
FileRange.createFileRange(3000, 100),
140-
FileRange.createFileRange(2100, 100),
141-
FileRange.createFileRange(1000, 100)
146+
FileRange.createFileRange(3000, 100, "1"),
147+
FileRange.createFileRange(2100, 100, null),
148+
FileRange.createFileRange(1000, 100, "3")
142149
);
143150
assertFalse("Ranges are non disjoint", VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
144-
List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
151+
final List<CombinedFileRange> outputList = VectoredReadUtils.mergeSortedRanges(
145152
Arrays.asList(sortRanges(input)), 100, 1001, 2500);
146153
Assertions.assertThat(outputList)
147154
.describedAs("merged range size")
@@ -150,51 +157,105 @@ public void testSortAndMerge() {
150157
Assertions.assertThat(output.getUnderlying())
151158
.describedAs("merged range underlying size")
152159
.hasSize(3);
153-
assertEquals("range[1000,3100)", output.toString());
160+
// range[1000,3100)
161+
assertFileRange(output, 1000, 2100);
154162
assertTrue("merged output ranges are disjoint",
155163
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
156164

157165
// the minSeek doesn't allow the first two to merge
158166
assertFalse("Ranges are non disjoint",
159167
VectoredReadUtils.isOrderedDisjoint(input, 100, 1000));
160-
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
168+
final List<CombinedFileRange> list2 = VectoredReadUtils.mergeSortedRanges(
169+
Arrays.asList(sortRanges(input)),
161170
100, 1000, 2100);
162-
Assertions.assertThat(outputList)
171+
Assertions.assertThat(list2)
163172
.describedAs("merged range size")
164173
.hasSize(2);
165-
assertEquals("range[1000,1100)", outputList.get(0).toString());
166-
assertEquals("range[2100,3100)", outputList.get(1).toString());
174+
assertFileRange(list2.get(0), 1000, 100);
175+
176+
// range[2100,3100)
177+
assertFileRange(list2.get(1), 2100, 1000);
178+
167179
assertTrue("merged output ranges are disjoint",
168-
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 1000));
180+
VectoredReadUtils.isOrderedDisjoint(list2, 100, 1000));
169181

170182
// the maxSize doesn't allow the third range to merge
171183
assertFalse("Ranges are non disjoint",
172184
VectoredReadUtils.isOrderedDisjoint(input, 100, 800));
173-
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
185+
final List<CombinedFileRange> list3 = VectoredReadUtils.mergeSortedRanges(
186+
Arrays.asList(sortRanges(input)),
174187
100, 1001, 2099);
175-
Assertions.assertThat(outputList)
188+
Assertions.assertThat(list3)
176189
.describedAs("merged range size")
177190
.hasSize(2);
178-
assertEquals("range[1000,2200)", outputList.get(0).toString());
179-
assertEquals("range[3000,3100)", outputList.get(1).toString());
191+
// range[1000,2200)
192+
CombinedFileRange range0 = list3.get(0);
193+
assertFileRange(range0, 1000, 1200);
194+
assertFileRange(range0.getUnderlying().get(0),
195+
1000, 100, "3");
196+
assertFileRange(range0.getUnderlying().get(1),
197+
2100, 100, null);
198+
CombinedFileRange range1 = list3.get(1);
199+
// range[3000,3100)
200+
assertFileRange(range1, 3000, 100);
201+
assertFileRange(range1.getUnderlying().get(0),
202+
3000, 100, "1");
203+
180204
assertTrue("merged output ranges are disjoint",
181-
VectoredReadUtils.isOrderedDisjoint(outputList, 100, 800));
205+
VectoredReadUtils.isOrderedDisjoint(list3, 100, 800));
182206

183207
// test the round up and round down (the maxSize doesn't allow any merges)
184208
assertFalse("Ranges are non disjoint",
185209
VectoredReadUtils.isOrderedDisjoint(input, 16, 700));
186-
outputList = VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(input)),
210+
final List<CombinedFileRange> list4 = VectoredReadUtils.mergeSortedRanges(
211+
Arrays.asList(sortRanges(input)),
187212
16, 1001, 100);
188-
Assertions.assertThat(outputList)
213+
Assertions.assertThat(list4)
189214
.describedAs("merged range size")
190215
.hasSize(3);
191-
assertEquals("range[992,1104)", outputList.get(0).toString());
192-
assertEquals("range[2096,2208)", outputList.get(1).toString());
193-
assertEquals("range[2992,3104)", outputList.get(2).toString());
216+
// range[992,1104)
217+
assertFileRange(list4.get(0), 992, 112);
218+
// range[2096,2208)
219+
assertFileRange(list4.get(1), 2096, 112);
220+
// range[2992,3104)
221+
assertFileRange(list4.get(2), 2992, 112);
194222
assertTrue("merged output ranges are disjoint",
195-
VectoredReadUtils.isOrderedDisjoint(outputList, 16, 700));
223+
VectoredReadUtils.isOrderedDisjoint(list4, 16, 700));
224+
}
225+
226+
/**
227+
* Assert that a file range satisfies the conditions.
228+
* @param range range to validate
229+
* @param offset offset of range
230+
* @param length range length
231+
*/
232+
private void assertFileRange(FileRange range, long offset, int length) {
233+
Assertions.assertThat(range)
234+
.describedAs("file range %s", range)
235+
.isNotNull();
236+
Assertions.assertThat(range.getOffset())
237+
.describedAs("offset of %s", range)
238+
.isEqualTo(offset);
239+
Assertions.assertThat(range.getLength())
240+
.describedAs("length of %s", range)
241+
.isEqualTo(length);
242+
}
243+
244+
/**
245+
* Assert that a file range satisfies the conditions.
246+
* @param range range to validate
247+
* @param offset offset of range
248+
* @param length range length
249+
* @param reference reference; may be null.
250+
*/
251+
private void assertFileRange(FileRange range, long offset, int length, Object reference) {
252+
assertFileRange(range, offset, length);
253+
Assertions.assertThat(range.getReference())
254+
.describedAs("reference field of file range %s", range)
255+
.isEqualTo(reference);
196256
}
197257

258+
198259
@Test
199260
public void testSortAndMergeMoreCases() throws Exception {
200261
List<FileRange> input = Arrays.asList(
@@ -214,7 +275,9 @@ public void testSortAndMergeMoreCases() throws Exception {
214275
Assertions.assertThat(output.getUnderlying())
215276
.describedAs("merged range underlying size")
216277
.hasSize(4);
217-
assertEquals("range[1000,3110)", output.toString());
278+
279+
assertFileRange(output, 1000, 2110);
280+
218281
assertTrue("merged output ranges are disjoint",
219282
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
220283

@@ -227,7 +290,8 @@ public void testSortAndMergeMoreCases() throws Exception {
227290
Assertions.assertThat(output.getUnderlying())
228291
.describedAs("merged range underlying size")
229292
.hasSize(4);
230-
assertEquals("range[1000,3200)", output.toString());
293+
assertFileRange(output, 1000, 2200);
294+
231295
assertTrue("merged output ranges are disjoint",
232296
VectoredReadUtils.isOrderedDisjoint(outputList, 1, 800));
233297

hadoop-tools/hadoop-benchmark/src/main/java/org/apache/hadoop/benchmark/VectoredReadBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ static class FileRangeCallback extends FileRangeImpl implements
169169

170170
FileRangeCallback(AsynchronousFileChannel channel, long offset,
171171
int length, Joiner joiner, ByteBuffer buffer) {
172-
super(offset, length);
172+
super(offset, length, null);
173173
this.channel = channel;
174174
this.joiner = joiner;
175175
this.buffer = buffer;

0 commit comments

Comments
 (0)