diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 24372ef5e40..365dddba234 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -354,6 +354,10 @@ private ExecConstants() { "enables statistics usage for varchar and decimal data types. Default is unset, i.e. empty string. " + "Allowed values: 'true', 'false', '' (empty string)."), "true", "false", ""); + public static final String PARQUET_REFRESH_TIMEOUT = "store.parquet.refresh_timeout_per_runnable_in_msec"; + public static final LongValidator PARQUET_REFRESH_TIMEOUT_VALIDATOR = new LongValidator(PARQUET_REFRESH_TIMEOUT, + new OptionDescription("Sets a timeout (in msec) for REFRESH TABLE METADATA processing of a single subdirectory")); + public static final String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async"; public static final OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, new OptionDescription("Enable the asynchronous page reader. This pipelines the reading of data from disk for high performance.")); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 6719db70b7e..65ec2e49c7a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -174,6 +174,7 @@ public static CaseInsensitiveMap createDefaultOptionDefinition new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR), + new OptionDefinition(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR), new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java index 9c90f0894c0..254ebd61a36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/TimedCallable.java @@ -166,14 +166,13 @@ public final V call() throws Exception { throw e; } finally { long time = System.nanoTime() - start; - if (logger.isWarnEnabled()) { - long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS); - if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) { - logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS); - } else { - logger.debug("Task '{}' execution time is {} ms", this, timeMillis); - } + long timeMillis = TimeUnit.MILLISECONDS.convert(time, TimeUnit.NANOSECONDS); + if (timeMillis > TIMEOUT_PER_RUNNABLE_IN_MSECS) { + logger.warn("Task '{}' execution time {} ms exceeds timeout {} ms.", this, timeMillis, TIMEOUT_PER_RUNNABLE_IN_MSECS); + } else { + logger.debug("Task '{}' execution time is {} ms", this, timeMillis); } + executionTime = time; } } @@ -188,6 +187,23 @@ private long getExecutionTime(TimeUnit unit) { return unit.convert(executionTime, TimeUnit.NANOSECONDS); } + /** + * Execute the list of runnables with the given parallelization. At end, return values and report completion time + * stats to provided logger. Each runnable is allowed a certain timeout. If the timeout exceeds, existing/pending + * tasks will be cancelled and a {@link UserException} is thrown. + * @param activity Name of activity for reporting in logger. + * @param logger The logger to use to report results. + * @param tasks List of callable that should be executed and timed. If this list has one item, task will be + * completed in-thread. Each callable must handle {@link InterruptedException}s. + * @param parallelism The number of threads that should be run to complete this task. + * @param timeout if bigger than zero, set the timeout per runnable (in msec) + * @return The list of outcome objects. + * @throws IOException All exceptions are coerced to IOException since this was build for storage system tasks initially. + */ + public static List run(final String activity, final Logger logger, final List> tasks, int parallelism, long timeout) throws IOException { + TIMEOUT_PER_RUNNABLE_IN_MSECS = timeout; + return run(activity, logger, tasks, parallelism); + } /** * Execute the list of runnables with the given parallelization. At end, return values and report completion time diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java index 3b668c75712..182e62751ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderConfig.java @@ -50,6 +50,7 @@ public class ParquetReaderConfig { private boolean enableTimeReadCounter = false; private boolean autoCorrectCorruptedDates = true; private boolean enableStringsSignedMinMax = false; + private long timeoutPerRunnableInMsec = 15_000; public static ParquetReaderConfig.Builder builder() { return new ParquetReaderConfig.Builder(); @@ -64,12 +65,15 @@ public ParquetReaderConfig(@JsonProperty("enableBytesReadCounter") Boolean enabl @JsonProperty("enableBytesTotalCounter") Boolean enableBytesTotalCounter, @JsonProperty("enableTimeReadCounter") Boolean enableTimeReadCounter, @JsonProperty("autoCorrectCorruptedDates") Boolean autoCorrectCorruptedDates, - @JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax) { + @JsonProperty("enableStringsSignedMinMax") Boolean enableStringsSignedMinMax, + @JsonProperty("timeoutPerRunnableInMsec") Long timeoutPerRunnableInMsec) { this.enableBytesReadCounter = enableBytesReadCounter == null ? this.enableBytesReadCounter : enableBytesReadCounter; this.enableBytesTotalCounter = enableBytesTotalCounter == null ? this.enableBytesTotalCounter : enableBytesTotalCounter; this.enableTimeReadCounter = enableTimeReadCounter == null ? this.enableTimeReadCounter : enableTimeReadCounter; this.autoCorrectCorruptedDates = autoCorrectCorruptedDates == null ? this.autoCorrectCorruptedDates : autoCorrectCorruptedDates; this.enableStringsSignedMinMax = enableStringsSignedMinMax == null ? this.enableStringsSignedMinMax : enableStringsSignedMinMax; + this.timeoutPerRunnableInMsec = timeoutPerRunnableInMsec == null || Long.valueOf(timeoutPerRunnableInMsec) <= 0 ? // zero means: use default + this.timeoutPerRunnableInMsec : timeoutPerRunnableInMsec; } private ParquetReaderConfig() { } @@ -99,6 +103,9 @@ public boolean enableStringsSignedMinMax() { return enableStringsSignedMinMax; } + @JsonProperty("timeoutPerRunnableInMsec") + public long timeoutPerRunnableInMsec() { return timeoutPerRunnableInMsec; } + public ParquetReadOptions toReadOptions() { return ParquetReadOptions.builder() .useSignedStringMinMax(enableStringsSignedMinMax) @@ -119,7 +126,8 @@ public int hashCode() { enableBytesTotalCounter, enableTimeReadCounter, autoCorrectCorruptedDates, - enableStringsSignedMinMax); + enableStringsSignedMinMax, + timeoutPerRunnableInMsec); } @Override @@ -135,7 +143,8 @@ public boolean equals(Object o) { && enableBytesTotalCounter == that.enableBytesTotalCounter && enableTimeReadCounter == that.enableTimeReadCounter && autoCorrectCorruptedDates == that.autoCorrectCorruptedDates - && enableStringsSignedMinMax == that.enableStringsSignedMinMax; + && enableStringsSignedMinMax == that.enableStringsSignedMinMax + && timeoutPerRunnableInMsec == that.timeoutPerRunnableInMsec; } @Override @@ -146,6 +155,7 @@ public String toString() { + ", enableTimeReadCounter=" + enableTimeReadCounter + ", autoCorrectCorruptedDates=" + autoCorrectCorruptedDates + ", enableStringsSignedMinMax=" + enableStringsSignedMinMax + + ", timeoutPerRunnableInMsec=" + timeoutPerRunnableInMsec + '}'; } @@ -188,10 +198,12 @@ public ParquetReaderConfig build() { // last assign values from session options, session options have higher priority than other configurations if (options != null) { - String option = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR); - if (!option.isEmpty()) { - readerConfig.enableStringsSignedMinMax = Boolean.valueOf(option); + String optionSignedMinMax = options.getOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX_VALIDATOR); + if (!optionSignedMinMax.isEmpty()) { + readerConfig.enableStringsSignedMinMax = Boolean.valueOf(optionSignedMinMax); } + Long optionTimeout = options.getOption(ExecConstants.PARQUET_REFRESH_TIMEOUT_VALIDATOR); + readerConfig.timeoutPerRunnableInMsec = Long.valueOf(optionTimeout); } return readerConfig; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 2b0581c3951..4f72e603e19 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -444,8 +444,7 @@ private ParquetTableMetadata_v4 getParquetTableMetadata(Map getParquetFileMetadata_v4(ParquetTableMetadata_v4 parquetTableMetadata_v4, Map fileStatusMap, boolean allColumnsInteresting, Set columnSet) throws IOException { return TimedCallable.run("Fetch parquet metadata", logger, Collectors.toList(fileStatusMap, - (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)), - 16 + (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v4, fileStatus, fileSystem, allColumnsInteresting, columnSet)), 16, readerConfig.timeoutPerRunnableInMsec() ); } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index b2ff4a594cb..ac862841a76 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -629,6 +629,7 @@ drill.exec.options: { store.parquet.reader.columnreader.async: false, store.parquet.reader.int96_as_timestamp: false, store.parquet.reader.strings_signed_min_max: "", + store.parquet.refresh_timeout_per_runnable_in_msec: 15000, store.parquet.reader.pagereader.async: true, store.parquet.reader.pagereader.bufferedread: true, store.parquet.reader.pagereader.buffersize: 1048576, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java index 7bbae415b1d..e4d645ed803 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetReaderConfig.java @@ -45,7 +45,7 @@ public void testDefaultsDeserialization() throws Exception { // change the default: set autoCorrectCorruptedDates to false // keep the default: set enableStringsSignedMinMax to false - readerConfig = new ParquetReaderConfig(false, false, false, false, false); + readerConfig = new ParquetReaderConfig(false, false, false, false, false, 0L); value = mapper.writeValueAsString(readerConfig); assertEquals("{\"autoCorrectCorruptedDates\":false}", value); @@ -79,12 +79,12 @@ public void testAddConfigToConf() { @Test public void testReadOptions() { // set enableStringsSignedMinMax to true - ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true); + ParquetReaderConfig readerConfig = new ParquetReaderConfig(false, false, false, true, true, 12345L); ParquetReadOptions readOptions = readerConfig.toReadOptions(); assertTrue(readOptions.useSignedStringMinMax()); // set enableStringsSignedMinMax to false - readerConfig = new ParquetReaderConfig(false, false, false, true, false); + readerConfig = new ParquetReaderConfig(false, false, false, true, false, 12345L); readOptions = readerConfig.toReadOptions(); assertFalse(readOptions.useSignedStringMinMax()); }