Skip to content

Commit

Permalink
DRILL-7223: Create an option to control timeout for REFRESH METADATA
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben-Zvi committed Apr 30, 2019
1 parent 887dee2 commit ef35bf5
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ private ExecConstants() {
"enables statistics usage for varchar and decimal data types. Default is unset, i.e. empty string. " +
"Allowed values: 'true', 'false', '' (empty string)."), "true", "false", "");

public static final String PARQUET_REFRESH_TIMEOUT = "store.parquet.refresh_timeout_per_runnable_in_msec";
public static final LongValidator PARQUET_REFRESH_TIMEOUT_VALIDATOR = new LongValidator(PARQUET_REFRESH_TIMEOUT,
new OptionDescription("Sets a timeout (in msec) for REFRESH TABLE METADATA processing of a single subdirectory"));

public static final String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
public static final OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC,
new OptionDescription("Enable the asynchronous page reader. This pipelines the reading of data from disk for high performance."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,13 @@ public final V call() throws Exception {
throw e;
} finally {
long time = System.nanoTime() - start;
if (logger.isWarnEnabled()) {
long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
} else {
logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
}
long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
} else {
logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
}

executionTime = time;
}
}
Expand All @@ -188,6 +187,23 @@ private long getExecutionTime(TimeUnit unit) {
return unit.convert(executionTime, TimeUnit.NANOSECONDS);
}

/**
* Execute the list of runnables with the given parallelization. At end, return values and report completion time
* stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
* tasks will be cancelled and a {@link UserException} is thrown.
* @param activity Name of activity for reporting in logger.
* @param logger The logger to use to report results.
* @param tasks List of callable that should be executed and timed. If this list has one item, task will be
* completed in-thread. Each callable must handle {@link InterruptedException}s.
* @param parallelism The number of threads that should be run to complete this task.
* @param timeout if bigger than zero, set the timeout per runnable (in msec)
* @return The list of outcome objects.
* @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
*/
public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism, long timeout) throws IOException {
TIMEOUT_PER_RUNNABLE_IN_MSECS = timeout;
return run(activity, logger, tasks, parallelism);
}

/**
* Execute the list of runnables with the given parallelization. At end, return values and report completion time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ParquetReaderConfig {
private boolean enableTimeReadCounter = false;
private boolean autoCorrectCorruptedDates = true;
private boolean enableStringsSignedMinMax = false;
private long timeoutPerRunnableInMsec = 15_000;

public static ParquetReaderConfig.Builder builder() {
return new ParquetReaderConfig.Builder();
Expand All @@ -64,12 +65,15 @@ public ParquetReaderConfig(@JsonProperty("enableBytesReadCounter") Boolean enabl
@JsonProperty("enableBytesTotalCounter") Boolean enableBytesTotalCounter,
@JsonProperty("enableTimeReadCounter") Boolean enableTimeReadCounter,
@JsonProperty("autoCorrectCorruptedDates") Boolean autoCorrectCorruptedDates,
@JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax) {
@JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax,
@JsonProperty("timeoutPerRunnableInMsec") Long timeoutPerRunnableInMsec) {
this.enableBytesReadCounter = enableBytesReadCounter == null ? this.enableBytesReadCounter : enableBytesReadCounter;
this.enableBytesTotalCounter = enableBytesTotalCounter == null ? this.enableBytesTotalCounter : enableBytesTotalCounter;
this.enableTimeReadCounter = enableTimeReadCounter == null ? this.enableTimeReadCounter : enableTimeReadCounter;
this.autoCorrectCorruptedDates = autoCorrectCorruptedDates == null ? this.autoCorrectCorruptedDates : autoCorrectCorruptedDates;
this.enableStringsSignedMinMax = enableStringsSignedMinMax == null ? this.enableStringsSignedMinMax : enableStringsSignedMinMax;
this.timeoutPerRunnableInMsec = timeoutPerRunnableInMsec == null || Long.valueOf(timeoutPerRunnableInMsec) <= 0 ? // zero means: use default
this.timeoutPerRunnableInMsec : timeoutPerRunnableInMsec;
}

private ParquetReaderConfig() { }
Expand Down Expand Up @@ -99,6 +103,9 @@ public boolean enableStringsSignedMinMax() {
return enableStringsSignedMinMax;
}

@JsonProperty("timeoutPerRunnableInMsec")
public long timeoutPerRunnableInMsec() { return timeoutPerRunnableInMsec; }

public ParquetReadOptions toReadOptions() {
return ParquetReadOptions.builder()
.useSignedStringMinMax(enableStringsSignedMinMax)
Expand All @@ -119,7 +126,8 @@ public int hashCode() {
enableBytesTotalCounter,
enableTimeReadCounter,
autoCorrectCorruptedDates,
enableStringsSignedMinMax);
enableStringsSignedMinMax,
timeoutPerRunnableInMsec);
}

@Override
Expand All @@ -135,7 +143,8 @@ public boolean equals(Object o) {
&& enableBytesTotalCounter == that.enableBytesTotalCounter
&& enableTimeReadCounter == that.enableTimeReadCounter
&& autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax;
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax
&& timeoutPerRunnableInMsec == that.timeoutPerRunnableInMsec;
}

@Override
Expand All @@ -146,6 +155,7 @@ public String toString() {
+ ", enableTimeReadCounter=" + enableTimeReadCounter
+ ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+ ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+ ", timeoutPerRunnableInMsec=" + timeoutPerRunnableInMsec
+ '}';
}

Expand Down Expand Up @@ -188,10 +198,12 @@ public ParquetReaderConfig build() {

// last assign values from session options, session options have higher priority than other configurations
if (options != null) {
String option = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
if (!option.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
String optionSignedMinMax = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
if (!optionSignedMinMax.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optionSignedMinMax);
}
Long optionTimeout = options.getOption(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR);
readerConfig.timeoutPerRunnableInMsec = Long.valueOf(optionTimeout);
}

return readerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,8 +444,7 @@ private ParquetTableMetadata_v4 getParquetTableMetadata(Map<FileStatus, FileSyst
private List<ParquetFileAndRowCountMetadata> getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata_v4, Map<FileStatus, FileSystem> fileStatusMap, boolean allColumnsInteresting, Set<String> columnSet) throws IOException {
return TimedCallable.run("Fetch parquet metadata", logger,
Collectors.toList(fileStatusMap,
(fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)),
16
(fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)), 16, readerConfig.timeoutPerRunnableInMsec()
);
}

Expand Down
1 change: 1 addition & 0 deletions exec/java-exec/src/main/resources/drill-module.conf
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ drill.exec.options: {
store.parquet.reader.columnreader.async: false,
store.parquet.reader.int96_as_timestamp: false,
store.parquet.reader.strings_signed_min_max: "",
store.parquet.refresh_timeout_per_runnable_in_msec: 15000,
store.parquet.reader.pagereader.async: true,
store.parquet.reader.pagereader.bufferedread: true,
store.parquet.reader.pagereader.buffersize: 1048576,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testDefaultsDeserialization() throws Exception {

// change the default: set autoCorrectCorruptedDates to false
// keep the default: set enableStringsSignedMinMax to false
readerConfig = new ParquetReaderConfig(false, false, false, false, false);
readerConfig = new ParquetReaderConfig(false, false, false, false, false, 0L);

value = mapper.writeValueAsString(readerConfig);
assertEquals("{\"autoCorrectCorruptedDates\":false}", value);
Expand Down Expand Up @@ -79,12 +79,12 @@ public void testAddConfigToConf() {
@Test
public void testReadOptions() {
// set enableStringsSignedMinMax to true
ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true);
ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true, 12345L);
ParquetReadOptions readOptions = readerConfig.toReadOptions();
assertTrue(readOptions.useSignedStringMinMax());

// set enableStringsSignedMinMax to false
readerConfig = new ParquetReaderConfig(false, false, false, true, false);
readerConfig = new ParquetReaderConfig(false, false, false, true, false, 12345L);
readOptions = readerConfig.toReadOptions();
assertFalse(readOptions.useSignedStringMinMax());
}
Expand Down

0 comments on commit ef35bf5

Please sign in to comment.