Skip to content

Commit 3fe9cdb

Browse files
committed
HADOOP-19131. add "hbase" as a format
declare that hbase is an hbase table; s3a maps to random IO. abfs recommends disabling prefetch for these files...it should do it automatically when support for read policies is wired up. Change-Id: Ia072c0f724f7d7cb538ef9d0a047e74c0191af73
1 parent 66acab9 commit 3fe9cdb

File tree

5 files changed

+66
-21
lines changed

5 files changed

+66
-21
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,13 @@ private OpenFileOptions() {
631631
public static final String FS_OPTION_OPENFILE_READ_POLICY_DEFAULT =
632632
"default";
633633

634+
/**
635+
* This is a table file for Apache HBase.
636+
* Do whatever is needed to optimize for it: {@value}.
637+
*/
638+
public static final String FS_OPTION_OPENFILE_READ_POLICY_HBASE =
639+
"hbase";
640+
634641
/**
635642
* This is a JSON file of UTF-8 text, including a
636643
* JSON line file where each line is a JSON entity.

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstreambuilder.md

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -382,22 +382,26 @@ performance -and vice versa.
382382
| `adaptive` | Any adaptive policy implemented by the store. |
383383
| `avro` | This is an avro format which will be read sequentially |
384384
| `csv` | This is CSV data which will be read sequentially |
385-
| `json` | This is a UTF-8 JSON/JSON lines format which will be read sequentially |
386385
| `default` | The default policy for this store. Generally "adaptive". |
387386
| `columnar` | This is any columnar format other than ORC/parquet. |
387+
| `hbase` | This is an HBase Table |
388+
| `json` | This is a UTF-8 JSON/JSON lines format which will be read sequentially |
388389
| `orc` | This is an ORC file. Optimize for it. |
389390
| `parquet` | This is a Parquet file. Optimize for it. |
390391
| `random` | Optimize for random access. |
391392
| `sequential` | Optimize for sequential access. |
392393
| `vector` | The Vectored IO API is intended to be used. |
393394
| `whole-file` | The whole file will be read. |
394395

395-
Choosing the wrong read policy for an input source may be inefficient.
396+
Choosing the wrong read policy for an input source may be inefficient but never fatal.
396397

397398
A list of read policies MAY be supplied; the first one recognized/supported by
398-
the filesystem SHALL be the one used. This allows for custom policies to be
399-
supported, for example an `hbase-hfile` policy optimized for HBase HFiles.
400-
399+
the filesystem SHALL be the one used. This allows for configurations which are compatible
400+
across versions. A policy `parquet, columnar, vector, random, adaptive` will use the parquet policy for
401+
any filesystem aware of it, falling back to `columnar`, `vector`, `random` and finally `adaptive`.
402+
The S3A connector will recognize the `random` since Hadoop 3.3.5 (i.e. since the `openFile()` API
403+
was added), and `vector` from Hadoop 3.4.0.
404+
401405
The S3A and ABFS input streams both implement
402406
the [IOStatisticsSource](iostatistics.html) API, and can be queried for their IO
403407
Performance.
@@ -479,7 +483,6 @@ Strategies can include:
479483
Applications which know that the entire file is to be read from an opened stream SHOULD declare this
480484
read policy.
481485

482-
483486
#### <a name="read.policy.columnar"></a> Read Policy `columnar`
484487

485488
Declare that the data is some (unspecific) columnar format and that read sequencies
@@ -511,6 +514,13 @@ libraries targeting multiple versions, SHOULD list their fallback
511514
policies if these are not recognized, e.g. request a policy such as `avro, sequential`.
512515

513516

517+
#### <a name="read.policy.fileformat.hbase"></a> File Format Read Policy `hbase`
518+
519+
The file is an HBase table.
520+
Use whatever policy is appropriate for these files, where `random` is
521+
what should be used unless there are specific optimizations related to HBase.
522+
523+
514524
### <a name="openfile.length"></a> Option: `fs.option.openfile.length`: `Long`
515525

516526
Declare the length of a file.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/functional/TestFunctionalIO.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,13 @@ public void testUncheckAndExtract() throws Throwable {
9999

100100
@Test
101101
public void testUncheckedFunction() throws Throwable {
102+
// java function which should raise a FileNotFoundException
103+
// wrapped into an unchecked exeption
102104
final Function<String, Object> fn =
103-
toUncheckedFunction((String a) -> {throw new FileNotFoundException(a);});
104-
intercept(UncheckedIOException.class, "404", () ->
105+
toUncheckedFunction((String a) -> {
106+
throw new FileNotFoundException(a);
107+
});
108+
intercept(UncheckedIOException.class, "missing", () ->
105109
fn.apply("missing"));
106110
}
107111
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
3131
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_CSV;
3232
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
33+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_HBASE;
3334
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_JSON;
3435
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
3536
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
@@ -87,7 +88,8 @@ boolean isAdaptive() {
8788
* Choose an access policy.
8889
* @param name strategy name from a configuration option, etc.
8990
* @param defaultPolicy default policy to fall back to.
90-
* @return the chosen strategy
91+
* @return the chosen strategy or null if there was no match and
92+
* the value of {@code defaultPolicy} was "null".
9193
*/
9294
public static S3AInputPolicy getPolicy(
9395
String name,
@@ -100,6 +102,7 @@ public static S3AInputPolicy getPolicy(
100102
return Normal;
101103

102104
// all these options currently map to random IO.
105+
case FS_OPTION_OPENFILE_READ_POLICY_HBASE:
103106
case FS_OPTION_OPENFILE_READ_POLICY_RANDOM:
104107
case FS_OPTION_OPENFILE_READ_POLICY_VECTOR:
105108
return Random;
@@ -111,7 +114,7 @@ public static S3AInputPolicy getPolicy(
111114
case FS_OPTION_OPENFILE_READ_POLICY_PARQUET:
112115
return Random;
113116

114-
// hadle the sequential formats.
117+
// handle the sequential formats.
115118
case FS_OPTION_OPENFILE_READ_POLICY_AVRO:
116119
case FS_OPTION_OPENFILE_READ_POLICY_CSV:
117120
case FS_OPTION_OPENFILE_READ_POLICY_JSON:

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestOpenFileSupport.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,26 @@
4343
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
4444
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
4545
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE;
46+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_AVRO;
47+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR;
48+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_CSV;
4649
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT;
50+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_HBASE;
51+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_JSON;
52+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_ORC;
53+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
4754
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
4855
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
56+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
57+
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
4958
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_END;
5059
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_SPLIT_START;
5160
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_ASYNC_DRAIN_THRESHOLD;
5261
import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
5362
import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
63+
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Normal;
64+
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Random;
65+
import static org.apache.hadoop.fs.s3a.S3AInputPolicy.Sequential;
5466
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
5567

5668
/**
@@ -69,7 +81,7 @@ public class TestOpenFileSupport extends HadoopTestBase {
6981

7082
private static final String USERNAME = "hadoop";
7183

72-
public static final S3AInputPolicy INPUT_POLICY = S3AInputPolicy.Sequential;
84+
public static final S3AInputPolicy INPUT_POLICY = Sequential;
7385

7486
public static final String TESTFILE = "s3a://bucket/name";
7587

@@ -142,7 +154,7 @@ public void testSeekRandomIOPolicy() throws Throwable {
142154
// is picked up
143155
assertOpenFile(INPUT_FADVISE, option)
144156
.extracting(f -> f.getInputPolicy())
145-
.isEqualTo(S3AInputPolicy.Random);
157+
.isEqualTo(Random);
146158
// and as neither status nor length was set: no file status
147159
assertOpenFile(INPUT_FADVISE, option)
148160
.extracting(f -> f.getStatus())
@@ -161,7 +173,7 @@ public void testSeekPolicyAdaptive() throws Throwable {
161173
assertOpenFile(FS_OPTION_OPENFILE_READ_POLICY,
162174
FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE)
163175
.extracting(f -> f.getInputPolicy())
164-
.isEqualTo(S3AInputPolicy.Normal);
176+
.isEqualTo(Normal);
165177
}
166178

167179
/**
@@ -184,7 +196,7 @@ public void testSeekPolicyListS3AOption() throws Throwable {
184196
// fall back to the second seek policy if the first is unknown
185197
assertOpenFile(INPUT_FADVISE, "hbase, random")
186198
.extracting(f -> f.getInputPolicy())
187-
.isEqualTo(S3AInputPolicy.Random);
199+
.isEqualTo(Random);
188200
}
189201

190202
/**
@@ -199,14 +211,14 @@ public void testSeekPolicyExtractionFromList() throws Throwable {
199211
FS_OPTION_OPENFILE_READ_POLICY);
200212
Assertions.assertThat(S3AInputPolicy.getFirstSupportedPolicy(options, null))
201213
.describedAs("Policy from " + plist)
202-
.isEqualTo(S3AInputPolicy.Random);
214+
.isEqualTo(Random);
203215
}
204216

205217
@Test
206218
public void testAdaptiveSeekPolicyRecognized() throws Throwable {
207219
Assertions.assertThat(S3AInputPolicy.getPolicy("adaptive", null))
208220
.describedAs("adaptive")
209-
.isEqualTo(S3AInputPolicy.Normal);
221+
.isEqualTo(Normal);
210222
}
211223

212224
@Test
@@ -222,11 +234,20 @@ public void testUnknownSeekPolicyFallback() throws Throwable {
222234
@Test
223235
public void testInputPolicyMapping() throws Throwable {
224236
Object[][] policyMapping = {
225-
{"normal", S3AInputPolicy.Normal},
226-
{FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, S3AInputPolicy.Normal},
227-
{FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, S3AInputPolicy.Normal},
228-
{FS_OPTION_OPENFILE_READ_POLICY_RANDOM, S3AInputPolicy.Random},
229-
{FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, S3AInputPolicy.Sequential},
237+
{"normal", Normal},
238+
{FS_OPTION_OPENFILE_READ_POLICY_ADAPTIVE, Normal},
239+
{FS_OPTION_OPENFILE_READ_POLICY_AVRO, Sequential},
240+
{FS_OPTION_OPENFILE_READ_POLICY_COLUMNAR, Random},
241+
{FS_OPTION_OPENFILE_READ_POLICY_CSV, Sequential},
242+
{FS_OPTION_OPENFILE_READ_POLICY_DEFAULT, Normal},
243+
{FS_OPTION_OPENFILE_READ_POLICY_HBASE, Random},
244+
{FS_OPTION_OPENFILE_READ_POLICY_JSON, Sequential},
245+
{FS_OPTION_OPENFILE_READ_POLICY_ORC, Random},
246+
{FS_OPTION_OPENFILE_READ_POLICY_PARQUET, Random},
247+
{FS_OPTION_OPENFILE_READ_POLICY_RANDOM, Random},
248+
{FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL, Sequential},
249+
{FS_OPTION_OPENFILE_READ_POLICY_VECTOR, Random},
250+
{FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE, Sequential},
230251
};
231252
for (Object[] mapping : policyMapping) {
232253
String name = (String) mapping[0];

0 commit comments

Comments
 (0)