diff --git a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java index 6bef27ee2143..d1050f9e8c54 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskStatus.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskStatus.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import javax.annotation.Nullable; @@ -206,7 +206,7 @@ public TaskStatus withLocation(TaskLocation location) @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("id", id) .add("status", status) .add("duration", duration) diff --git a/core/src/test/java/org/apache/druid/math/expr/ParserTest.java b/core/src/test/java/org/apache/druid/math/expr/ParserTest.java index 1ebd71f1cc43..8e19210db068 100644 --- a/core/src/test/java/org/apache/druid/math/expr/ParserTest.java +++ b/core/src/test/java/org/apache/druid/math/expr/ParserTest.java @@ -288,7 +288,7 @@ public void testLiteralArraysExplicitDoubleParseException() public void testFunctions() { validateParser("sqrt(x)", "(sqrt [x])", ImmutableList.of("x")); - validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("cond", "else", "then")); + validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("then", "cond", "else")); validateParser("cast(x, 'STRING')", "(cast [x, STRING])", ImmutableList.of("x")); validateParser("cast(x, 'LONG')", "(cast [x, LONG])", ImmutableList.of("x")); validateParser("cast(x, 'DOUBLE')", "(cast [x, DOUBLE])", ImmutableList.of("x")); diff --git a/extensions-contrib/cloudfiles-extensions/pom.xml b/extensions-contrib/cloudfiles-extensions/pom.xml index 04d7de8c82cd..a9f56c91cc02 100644 --- a/extensions-contrib/cloudfiles-extensions/pom.xml +++ b/extensions-contrib/cloudfiles-extensions/pom.xml @@ -35,10 +35,10 @@ UTF-8 - 1.9.1 + 2.3.0-SNAPSHOT - 3.0 + 4.1.0 diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java index c54342fd32de..224be81125f2 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/storage/cloudfiles/CloudFilesStorageDruidModule.java @@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.jclouds.ContextBuilder; import org.jclouds.logging.slf4j.config.SLF4JLoggingModule; -import org.jclouds.openstack.v2_0.config.InternalUrlModule; +import org.jclouds.openstack.keystone.catalog.config.InternalUrlModule; import org.jclouds.osgi.ProviderRegistry; import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata; import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata; diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index fa86c49933a6..97307eb3ed7e 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -905,12 +905,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception final ListenableFuture normalReplicaFuture = runTask(normalReplica); // Simulating one replica is slower than the other - final ListenableFuture staleReplicaFuture = Futures.transform( + final ListenableFuture staleReplicaFuture = Futures.transformAsync( taskExec.submit(() -> { Thread.sleep(1000); return staleReplica; }), - (AsyncFunction) this::runTask + (AsyncFunction) this::runTask, + MoreExecutors.directExecutor() ); while (normalReplica.getRunner().getStatus() != Status.PAUSED) { diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 2c32ece55e77..3f8850b8ed7e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2425,12 +2425,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception ((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2); final ListenableFuture normalReplicaFuture = runTask(normalReplica); // Simulating one replica is slower than the other - final ListenableFuture staleReplicaFuture = Futures.transform( + final ListenableFuture staleReplicaFuture = Futures.transformAsync( taskExec.submit(() -> { Thread.sleep(1000); return staleReplica; }), - (AsyncFunction) this::runTask + (AsyncFunction) this::runTask, + MoreExecutors.directExecutor() ); while (normalReplica.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 064d2caa3350..74256b878a40 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -30,6 +30,7 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.Firehose; import org.apache.druid.data.input.FirehoseFactory; @@ -669,9 +670,10 @@ private void publishSegments( committerSupplier.get(), Collections.singletonList(sequenceName) ); - pendingHandoffs.add(Futures.transform( + pendingHandoffs.add(Futures.transformAsync( publishFuture, - (AsyncFunction) driver::registerHandoff + (AsyncFunction) driver::registerHandoff, + MoreExecutors.directExecutor() )); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java index e34aee976f2b..018174aee208 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.indexer.TaskState; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskToolbox; @@ -262,7 +263,8 @@ public void onFailure(Throwable t) LOG.error(t, "Error while running a task for spec[%s]", spec.getId()); taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t)); } - } + }, + MoreExecutors.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java index 2d0833868607..aa1be977d384 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java @@ -780,7 +780,7 @@ public boolean matches(char c) if (inQuotes) { return false; } - return CharMatcher.BREAKING_WHITESPACE.matches(c); + return CharMatcher.breakingWhitespace().matches(c); } } ).omitEmptyStrings().split(string).iterator(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 6fd3f7fa3281..799a3d29cac1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -257,7 +257,8 @@ public void onFailure(Throwable throwable) waitingForMonitor.notifyAll(); } } - } + }, + MoreExecutors.directExecutor() ); break; case CHILD_UPDATED: @@ -1229,7 +1230,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(worker, cleanupTask); } - } + }, + MoreExecutors.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index 3dce5917fe9c..7f95b1952d6b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -21,7 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; import com.google.common.collect.ImmutableList; @@ -1108,13 +1108,13 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hashCode(taskLock, taskIds); + return java.util.Objects.hash(taskLock, taskIds); } @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("taskLock", taskLock) .add("taskIds", taskIds) .toString(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java index 6c7dd4e3bd5c..a4584b725ce9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java @@ -26,6 +26,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -569,7 +570,8 @@ private void handleStatus(final TaskStatus status) .emit(); } } - } + }, + MoreExecutors.directExecutor() ); return statusFuture; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 623458fc1cc7..97cb58b760b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -685,7 +685,8 @@ public void onFailure(Throwable t) { removedWorkerCleanups.remove(workerHostAndPort, cleanupTask); } - } + }, + MoreExecutors.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 2a2bf2d7f05f..526dee051b1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -35,6 +35,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputFormat; @@ -687,7 +688,8 @@ public void onFailure(Throwable t) log.error("Persist failed, dying"); backgroundThreadException = t; } - } + }, + MoreExecutors.directExecutor() ); } @@ -949,7 +951,8 @@ private void publishAndRegisterHandoff(SequenceMetadata> getCurrentTotalStats() groupId, taskId, currentStats - ) + ), + MoreExecutors.directExecutor() ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -966,7 +967,8 @@ private Map> getCurrentTotalStats() groupId, taskId, currentStats - ) + ), + MoreExecutors.directExecutor() ) ); groupAndTaskIds.add(new Pair<>(groupId, taskId)); @@ -1805,7 +1807,8 @@ public Void apply(@Nullable Boolean result) } return null; } - } + }, + MoreExecutors.directExecutor() ); } @@ -2485,7 +2488,8 @@ public Map apply(@Nullable Object input) { return null; } - } + }, + MoreExecutors.directExecutor() ); } @@ -3188,7 +3192,8 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept } return null; - } + }, + MoreExecutors.directExecutor() ) ).collect(Collectors.toList()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java index 19598d05dc03..e830efcc79f0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerTaskManager.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import org.apache.druid.client.indexing.IndexingService; @@ -253,7 +254,8 @@ public void onFailure(Throwable t) { submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId()))); } - } + }, + MoreExecutors.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java index 3568e79e4e03..757be62f5eb2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/executor/ExecutorLifecycle.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import org.apache.commons.io.FileUtils; import org.apache.druid.indexer.TaskStatus; @@ -202,7 +203,8 @@ public TaskStatus apply(TaskStatus taskStatus) throw new RuntimeException(e); } } - } + }, + MoreExecutors.directExecutor() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java index 57046ff80a03..fc6d6dc23815 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/http/TaskManagementResource.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.guice.annotations.Json; @@ -191,7 +192,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + MoreExecutors.directExecutor() ); asyncContext.setTimeout(timeout); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index bfe7f40c51c5..3f140a309a42 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -323,7 +323,7 @@ public void testHandoffTimeout() throws Exception // handoff would timeout, resulting in exception TaskStatus status = statusFuture.get(); Assert.assertTrue(status.getErrorMsg() - .contains("java.util.concurrent.TimeoutException: Timeout waiting for task.")); + .contains("java.util.concurrent.TimeoutException: Waited 100 milliseconds")); } @Test(timeout = 60_000L) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 84c1c7280a14..9962718266cb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -371,7 +371,8 @@ private Future runTask(Task task) (Function) status -> { shutdownTask(task); return status; - } + }, + MoreExecutors.directExecutor() ); return cleanupFuture; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java index 5fc2989ff030..bf94f43668ff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestTaskRunner.java @@ -228,7 +228,8 @@ public void onFailure(Throwable t) { runningItems.remove(taskRunnerWorkItem); } - } + }, + MoreExecutors.directExecutor() ); return statusFuture; diff --git a/pom.xml b/pom.xml index 023c90cdc155..2bef504ba57f 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,7 @@ 1.2.0-incubating 10.14.2.0 4.0.0 - 16.0.1 + 27.0-jre 4.1.0 1.3 9.4.34.v20201102 diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index d5be096f79b9..866990e803a1 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -500,7 +500,7 @@ private class DruidServerHolder smileMapper, httpClient, executor, - new URL(druidServer.getScheme(), hostAndPort.getHostText(), hostAndPort.getPort(), "/"), + new URL(druidServer.getScheme(), hostAndPort.getHost(), hostAndPort.getPort(), "/"), "/druid-internal/v1/segments", SEGMENT_LIST_RESP_TYPE_REF, config.getServerTimeout(), diff --git a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java index b9348731bdae..355c18566e47 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/QueryStatus.java @@ -22,7 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; /** @@ -80,7 +80,7 @@ public boolean isComplete() @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("id", id) .add("status", status) .add("duration", duration) diff --git a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java index cb506876cd36..ff23878b5fe1 100644 --- a/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java +++ b/server/src/main/java/org/apache/druid/client/indexing/TaskStatus.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; +import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import org.apache.druid.indexer.TaskState; @@ -92,7 +92,7 @@ public int hashCode() @Override public String toString() { - return Objects.toStringHelper(this) + return MoreObjects.toStringHelper(this) .add("id", id) .add("status", status) .add("duration", duration) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index ec9200cf81da..b83704f84065 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -336,7 +336,8 @@ public void onFailure(Throwable t) { persistError = t; } - } + }, + MoreExecutors.directExecutor() ); } else { isPersistRequired = true; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java index 2aa5d43a2eaa..e6093a6fb48a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -28,6 +28,7 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.common.guava.ThreadRenamingCallable; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -330,7 +331,8 @@ public void onFailure(Throwable e) // TODO: Retry? log.warn(e, "Failed to drop segment: %s", identifier); } - } + }, + MoreExecutors.directExecutor() ); } @@ -483,7 +485,8 @@ public void onFailure(Throwable e) log.warn(e, "Failed to push [%,d] segments.", segmentsToPush.size()); errorHandler.apply(e); } - } + }, + MoreExecutors.directExecutor() ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java index 92b9f3cdd00d..6f6e7b6c0538 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java @@ -543,7 +543,8 @@ ListenableFuture dropInBackground(SegmentsAndCommitMe segmentsAndCommitMetadata.getSegments(), metadata == null ? null : ((AppenderatorDriverMetadata) metadata).getCallerMetadata() ); - } + }, + MoreExecutors.directExecutor() ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java index 9993486bc578..b6a8afbd9cde 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderatorDriver.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.ISE; import org.apache.druid.segment.loading.DataSegmentKiller; @@ -139,9 +140,10 @@ private SegmentsAndCommitMetadata pushAndClear( { final Set requestedSegmentIdsForSequences = getAppendingSegments(sequenceNames); - final ListenableFuture future = Futures.transform( + final ListenableFuture future = Futures.transformAsync( pushInBackground(null, requestedSegmentIdsForSequences, false), - (AsyncFunction) this::dropInBackground + (AsyncFunction) this::dropInBackground, + MoreExecutors.directExecutor() ); final SegmentsAndCommitMetadata segmentsAndCommitMetadata = diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ea466f6de109..d90648ed6f02 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; @@ -273,7 +274,7 @@ public ListenableFuture publish( { final List theSegments = getSegmentIdsWithShardSpecs(sequenceNames); - final ListenableFuture publishFuture = Futures.transform( + final ListenableFuture publishFuture = Futures.transformAsync( // useUniquePath=true prevents inconsistencies in segment data when task failures or replicas leads to a second // version of a segment with the same identifier containing different data; see DataSegmentPusher.push() docs pushInBackground(wrapCommitter(committer), theSegments, true), @@ -282,7 +283,8 @@ public ListenableFuture publish( sam, publisher, java.util.function.Function.identity() - ) + ), + MoreExecutors.directExecutor() ); return Futures.transform( publishFuture, @@ -291,7 +293,8 @@ public ListenableFuture publish( sequenceNames.forEach(segments::remove); } return sam; - } + }, + MoreExecutors.directExecutor() ); } @@ -371,7 +374,8 @@ public void onFailure(Throwable e) numRemainingHandoffSegments.decrementAndGet(); resultFuture.setException(e); } - } + }, + MoreExecutors.directExecutor() ); } ); @@ -387,9 +391,10 @@ public ListenableFuture publishAndRegisterHandoff( final Collection sequenceNames ) { - return Futures.transform( + return Futures.transformAsync( publish(publisher, committer, sequenceNames), - (AsyncFunction) this::registerHandoff + (AsyncFunction) this::registerHandoff, + MoreExecutors.directExecutor() ); } diff --git a/server/src/main/java/org/apache/druid/server/DruidNode.java b/server/src/main/java/org/apache/druid/server/DruidNode.java index 1a3603936490..1c077d0cd73c 100644 --- a/server/src/main/java/org/apache/druid/server/DruidNode.java +++ b/server/src/main/java/org/apache/druid/server/DruidNode.java @@ -156,7 +156,7 @@ private void init( Integer portFromHostConfig; if (host != null) { hostAndPort = HostAndPort.fromString(host); - host = hostAndPort.getHostText(); + host = hostAndPort.getHost(); portFromHostConfig = hostAndPort.hasPort() ? hostAndPort.getPort() : null; if (plainTextPort != null && portFromHostConfig != null && !plainTextPort.equals(portFromHostConfig)) { throw new IAE("Conflicting host:port [%s] and port [%d] settings", host, plainTextPort); diff --git a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java index 0c584aab8d0a..297e86f38984 100644 --- a/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java +++ b/server/src/main/java/org/apache/druid/server/http/HostAndPortWithScheme.java @@ -74,7 +74,7 @@ public String getScheme() public String getHostText() { - return hostAndPort.getHostText(); + return hostAndPort.getHost(); } public int getPort() diff --git a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java index 45dae0af0aa9..5852962f6258 100644 --- a/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java +++ b/server/src/main/java/org/apache/druid/server/http/SegmentListerResource.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.sun.jersey.spi.container.ResourceFilters; import org.apache.druid.client.HttpServerInventoryView; @@ -205,7 +206,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + MoreExecutors.directExecutor() ); asyncContext.setTimeout(timeout); @@ -317,7 +319,8 @@ public void onFailure(Throwable th) log.debug(ex, "Request timed out or closed already."); } } - } + }, + MoreExecutors.directExecutor() ); asyncContext.setTimeout(timeout); diff --git a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java index 7526ecbcc10f..2662de6b981b 100644 --- a/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java +++ b/server/src/main/java/org/apache/druid/server/lookup/cache/LookupCoordinatorManager.java @@ -435,7 +435,8 @@ public void onFailure(Throwable t) LOG.makeAlert(t, "Background lookup manager exited with error!").emit(); } } - } + }, + MoreExecutors.directExecutor() ); LOG.debug("Started"); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 5b96d7ad873a..862e2beb1273 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -376,7 +376,8 @@ public void onFailure(Throwable t) { pair.lhs.setException(t); } - } + }, + MoreExecutors.directExecutor() ); } } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index 57df44073460..fb7587525127 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -264,7 +264,7 @@ public void testPollPeriodicallyAndOnDemandInterleave() throws Exception Assert.assertTrue(sqlSegmentsMetadataManager.getLatestDatabasePoll() instanceof SqlSegmentsMetadataManager.PeriodicDatabasePoll); dataSourcesSnapshot = sqlSegmentsMetadataManager.getDataSourcesSnapshot(); Assert.assertEquals( - ImmutableList.of("wikipedia2", "wikipedia3", "wikipedia"), + ImmutableList.of("wikipedia3", "wikipedia", "wikipedia2"), dataSourcesSnapshot.getDataSourcesWithAllUsedSegments() .stream() .map(ImmutableDruidDataSource::getName) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java index 4f9cd3c34158..6d52c4435178 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverFailTest.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.data.input.Committer; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -497,7 +498,8 @@ public ListenableFuture push( .collect(Collectors.toList()); return Futures.transform( persistAll(committer), - (Function) commitMetadata -> new SegmentsAndCommitMetadata(segments, commitMetadata) + (Function) commitMetadata -> new SegmentsAndCommitMetadata(segments, commitMetadata), + MoreExecutors.directExecutor() ); } else { if (interruptPush) { diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java index 82a67b038ed6..9985c0ddc6ab 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.Assert; import org.junit.Test; @@ -146,7 +147,8 @@ public void onFailure(Throwable t) { callbackExcecuted.set(true); } - } + }, + MoreExecutors.directExecutor() ); future.cancel(true);