Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-7223: Create an option to control timeout for REFRESH METADATA #1776

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ private ExecConstants() {
"enables statistics usage for varchar and decimal data types. Default is unset, i.e. empty string. " +
"Allowed values: 'true', 'false', '' (empty string)."), "true", "false", "");

public static final String PARQUET_REFRESH_TIMEOUT = "store.parquet.refresh_timeout_per_runnable_in_msec";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid the word 'refresh' here and other places for the timeout since this parameter is intended for any timed runnable task, not just the ones initiated by the REFRESH command. For instance, in normal query planning without using metadata cache, the FooterGatherer also creates multiple TimedCallable threads to read parquet footers directly.

public static final LongValidator PARQUET_REFRESH_TIMEOUT_VALIDATOR = new LongValidator(PARQUET_REFRESH_TIMEOUT,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this PositiveLongValidator since timeout of 0 does not make sense. Maximum value can be Integer.MAX_VALUE

new OptionDescription("Sets a timeout (in msec) for REFRESH TABLE METADATA processing of a single subdirectory"));

public static final String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
public static final OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC,
new OptionDescription("Enable the asynchronous page reader. This pipelines the reading of data from disk for high performance."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,13 @@ public final V call() throws Exception {
throw e;
} finally {
long time = System.nanoTime() - start;
if (logger.isWarnEnabled()) {
long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
} else {
logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
}
long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS);
if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) {
logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS);
} else {
logger.debug("Task '{}' execution time is {} ms", this, timeMillis);
}

executionTime = time;
}
}
Expand All @@ -188,6 +187,23 @@ private long getExecutionTime(TimeUnit unit) {
return unit.convert(executionTime, TimeUnit.NANOSECONDS);
}

/**
* Execute the list of runnables with the given parallelization. At end, return values and report completion time
* stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending
* tasks will be cancelled and a {@link UserException} is thrown.
* @param activity Name of activity for reporting in logger.
* @param logger The logger to use to report results.
* @param tasks List of callable that should be executed and timed. If this list has one item, task will be
* completed in-thread. Each callable must handle {@link InterruptedException}s.
* @param parallelism The number of threads that should be run to complete this task.
* @param timeout if bigger than zero, set the timeout per runnable (in msec)
* @return The list of outcome objects.
* @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially.
*/
public static <V> List<V> run(final String activity, final Logger logger, final List<TimedCallable<V>> tasks, int parallelism, long timeout) throws IOException {
TIMEOUT_PER_RUNNABLE_IN_MSECS = timeout;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention we treat these as static final constants (although I see that the final is missing). Also assigning to this static variable is not thread safe. Actually, now that this is a config option, why do we need a static constant ? Why not just pass it as a parameter wherever needed including passing it to the constructor of classes that are extending the abstract class and then we can set the timeout as a local class variable ?

return run(activity, logger, tasks, parallelism);
}

/**
* Execute the list of runnables with the given parallelization. At end, return values and report completion time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class ParquetReaderConfig {
private boolean enableTimeReadCounter = false;
private boolean autoCorrectCorruptedDates = true;
private boolean enableStringsSignedMinMax = false;
private long timeoutPerRunnableInMsec = 15_000;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the 'owner' of the default should be ExecConstants, we should avoid this default value.


public static ParquetReaderConfig.Builder builder() {
return new ParquetReaderConfig.Builder();
Expand All @@ -64,12 +65,15 @@ public ParquetReaderConfig(@JsonProperty("enableBytesReadCounter") Boolean enabl
@JsonProperty("enableBytesTotalCounter") Boolean enableBytesTotalCounter,
@JsonProperty("enableTimeReadCounter") Boolean enableTimeReadCounter,
@JsonProperty("autoCorrectCorruptedDates") Boolean autoCorrectCorruptedDates,
@JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax) {
@JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax,
@JsonProperty("timeoutPerRunnableInMsec") Long timeoutPerRunnableInMsec) {
this.enableBytesReadCounter = enableBytesReadCounter == null ? this.enableBytesReadCounter : enableBytesReadCounter;
this.enableBytesTotalCounter = enableBytesTotalCounter == null ? this.enableBytesTotalCounter : enableBytesTotalCounter;
this.enableTimeReadCounter = enableTimeReadCounter == null ? this.enableTimeReadCounter : enableTimeReadCounter;
this.autoCorrectCorruptedDates = autoCorrectCorruptedDates == null ? this.autoCorrectCorruptedDates : autoCorrectCorruptedDates;
this.enableStringsSignedMinMax = enableStringsSignedMinMax == null ? this.enableStringsSignedMinMax : enableStringsSignedMinMax;
this.timeoutPerRunnableInMsec = timeoutPerRunnableInMsec == null || Long.valueOf(timeoutPerRunnableInMsec) <= 0 ? // zero means: use default
this.timeoutPerRunnableInMsec : timeoutPerRunnableInMsec;
}

private ParquetReaderConfig() { }
Expand Down Expand Up @@ -99,6 +103,9 @@ public boolean enableStringsSignedMinMax() {
return enableStringsSignedMinMax;
}

@JsonProperty("timeoutPerRunnableInMsec")
public long timeoutPerRunnableInMsec() { return timeoutPerRunnableInMsec; }

public ParquetReadOptions toReadOptions() {
return ParquetReadOptions.builder()
.useSignedStringMinMax(enableStringsSignedMinMax)
Expand All @@ -119,7 +126,8 @@ public int hashCode() {
enableBytesTotalCounter,
enableTimeReadCounter,
autoCorrectCorruptedDates,
enableStringsSignedMinMax);
enableStringsSignedMinMax,
timeoutPerRunnableInMsec);
}

@Override
Expand All @@ -135,7 +143,8 @@ public boolean equals(Object o) {
&& enableBytesTotalCounter == that.enableBytesTotalCounter
&& enableTimeReadCounter == that.enableTimeReadCounter
&& autoCorrectCorruptedDates == that.autoCorrectCorruptedDates
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax;
&& enableStringsSignedMinMax == that.enableStringsSignedMinMax
&& timeoutPerRunnableInMsec == that.timeoutPerRunnableInMsec;
}

@Override
Expand All @@ -146,6 +155,7 @@ public String toString() {
+ ", enableTimeReadCounter=" + enableTimeReadCounter
+ ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates
+ ", enableStringsSignedMinMax=" + enableStringsSignedMinMax
+ ", timeoutPerRunnableInMsec=" + timeoutPerRunnableInMsec
+ '}';
}

Expand Down Expand Up @@ -188,10 +198,12 @@ public ParquetReaderConfig build() {

// last assign values from session options, session options have higher priority than other configurations
if (options != null) {
String option = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
if (!option.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option);
String optionSignedMinMax = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR);
if (!optionSignedMinMax.isEmpty()) {
readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optionSignedMinMax);
}
Long optionTimeout = options.getOption(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR);
readerConfig.timeoutPerRunnableInMsec = Long.valueOf(optionTimeout);
}

return readerConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,8 +440,7 @@ private ParquetTableMetadata_v4 getParquetTableMetadata(Map<FileStatus, FileSyst
private List<ParquetFileAndRowCountMetadata> getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata_v4, Map<FileStatus, FileSystem> fileStatusMap, boolean allColumnsInteresting, Set<String> columnSet) throws IOException {
return TimedCallable.run("Fetch parquet metadata", logger,
Collectors.toList(fileStatusMap,
(fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)),
16
(fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)), 16, readerConfig.timeoutPerRunnableInMsec()
);
}

Expand Down
1 change: 1 addition & 0 deletions exec/java-exec/src/main/resources/drill-module.conf
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ drill.exec.options: {
store.parquet.reader.columnreader.async: false,
store.parquet.reader.int96_as_timestamp: false,
store.parquet.reader.strings_signed_min_max: "",
store.parquet.refresh_timeout_per_runnable_in_msec: 15000,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See prior comment about omitting 'refresh' . Also I think that we should not put this in the 'parquet' namespace even though currently it is mainly used for Parquet metadata read. The reason is the runnable timeout could in theory be used for other formats also. How about store.timeout_per_runnable_in_msec ?

store.parquet.reader.pagereader.async: true,
store.parquet.reader.pagereader.bufferedread: true,
store.parquet.reader.pagereader.buffersize: 1048576,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void testDefaultsDeserialization() throws Exception {

// change the default: set autoCorrectCorruptedDates to false
// keep the default: set enableStringsSignedMinMax to false
readerConfig = new ParquetReaderConfig(false, false, false, false, false);
readerConfig = new ParquetReaderConfig(false, false, false, false, false, 0L);

value = mapper.writeValueAsString(readerConfig);
assertEquals("{\"autoCorrectCorruptedDates\":false}", value);
Expand Down Expand Up @@ -79,12 +79,12 @@ public void testAddConfigToConf() {
@Test
public void testReadOptions() {
// set enableStringsSignedMinMax to true
ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true);
ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true, 12345L);
ParquetReadOptions readOptions = readerConfig.toReadOptions();
assertTrue(readOptions.useSignedStringMinMax());

// set enableStringsSignedMinMax to false
readerConfig = new ParquetReaderConfig(false, false, false, true, false);
readerConfig = new ParquetReaderConfig(false, false, false, true, false, 12345L);
readOptions = readerConfig.toReadOptions();
assertFalse(readOptions.useSignedStringMinMax());
}
Expand Down