Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/druid/indexer/TaskStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -206,7 +206,7 @@ public TaskStatus withLocation(TaskLocation location)
@Override
public String toString()
{
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public void testLiteralArraysExplicitDoubleParseException()
public void testFunctions()
{
validateParser("sqrt(x)", "(sqrt [x])", ImmutableList.of("x"));
validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("cond", "else", "then"));
validateParser("if(cond,then,else)", "(if [cond, then, else])", ImmutableList.of("then", "cond", "else"));
validateParser("cast(x, 'STRING')", "(cast [x, STRING])", ImmutableList.of("x"));
validateParser("cast(x, 'LONG')", "(cast [x, LONG])", ImmutableList.of("x"));
validateParser("cast(x, 'DOUBLE')", "(cast [x, DOUBLE])", ImmutableList.of("x"));
Expand Down
4 changes: 2 additions & 2 deletions extensions-contrib/cloudfiles-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<jclouds.version>1.9.1</jclouds.version>
<jclouds.version>2.3.0-SNAPSHOT</jclouds.version>
<!-- The version of guice is forced to 3.0 since JClouds 1.9.1 does not
work with guice 4.0-beta -->
<guice.version>3.0</guice.version>
<guice.version>4.1.0</guice.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.jclouds.ContextBuilder;
import org.jclouds.logging.slf4j.config.SLF4JLoggingModule;
import org.jclouds.openstack.v2_0.config.InternalUrlModule;
import org.jclouds.openstack.keystone.catalog.config.InternalUrlModule;
import org.jclouds.osgi.ProviderRegistry;
import org.jclouds.rackspace.cloudfiles.uk.CloudFilesUKProviderMetadata;
import org.jclouds.rackspace.cloudfiles.us.CloudFilesUSProviderMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,12 +905,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception

final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
(AsyncFunction<Task, TaskStatus>) this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask,
MoreExecutors.directExecutor()
);

while (normalReplica.getRunner().getStatus() != Status.PAUSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2425,12 +2425,13 @@ public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception
((TestableKinesisIndexTask) staleReplica).setLocalSupplier(recordSupplier2);
final ListenableFuture<TaskStatus> normalReplicaFuture = runTask(normalReplica);
// Simulating one replica is slower than the other
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transform(
final ListenableFuture<TaskStatus> staleReplicaFuture = Futures.transformAsync(
taskExec.submit(() -> {
Thread.sleep(1000);
return staleReplica;
}),
(AsyncFunction<Task, TaskStatus>) this::runTask
(AsyncFunction<Task, TaskStatus>) this::runTask,
MoreExecutors.directExecutor()
);

while (normalReplica.getRunner().getStatus() != SeekableStreamIndexTaskRunner.Status.PAUSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
Expand Down Expand Up @@ -669,9 +670,10 @@ private void publishSegments(
committerSupplier.get(),
Collections.singletonList(sequenceName)
);
pendingHandoffs.add(Futures.transform(
pendingHandoffs.add(Futures.transformAsync(
publishFuture,
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff
(AsyncFunction<SegmentsAndCommitMetadata, SegmentsAndCommitMetadata>) driver::registerHandoff,
MoreExecutors.directExecutor()
));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskToolbox;
Expand Down Expand Up @@ -262,7 +263,8 @@ public void onFailure(Throwable t)
LOG.error(t, "Error while running a task for spec[%s]", spec.getId());
taskCompleteEvents.offer(SubTaskCompleteEvent.fail(spec, t));
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ public boolean matches(char c)
if (inQuotes) {
return false;
}
return CharMatcher.BREAKING_WHITESPACE.matches(c);
return CharMatcher.breakingWhitespace().matches(c);
}
}
).omitEmptyStrings().split(string).iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ public void onFailure(Throwable throwable)
waitingForMonitor.notifyAll();
}
}
}
},
MoreExecutors.directExecutor()
);
break;
case CHILD_UPDATED:
Expand Down Expand Up @@ -1229,7 +1230,8 @@ public void onFailure(Throwable t)
{
removedWorkerCleanups.remove(worker, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -1108,13 +1108,13 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hashCode(taskLock, taskIds);
return java.util.Objects.hash(taskLock, taskIds);
}

@Override
public String toString()
{
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("taskLock", taskLock)
.add("taskIds", taskIds)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -569,7 +570,8 @@ private void handleStatus(final TaskStatus status)
.emit();
}
}
}
},
MoreExecutors.directExecutor()
);
return statusFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,8 @@ public void onFailure(Throwable t)
{
removedWorkerCleanups.remove(workerHostAndPort, cleanupTask);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.data.input.Committer;
import org.apache.druid.data.input.InputFormat;
Expand Down Expand Up @@ -687,7 +688,8 @@ public void onFailure(Throwable t)
log.error("Persist failed, dying");
backgroundThreadException = t;
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -949,7 +951,8 @@ private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType, Sequenc
} else {
return publishedSegmentsAndMetadata;
}
}
},
MoreExecutors.directExecutor()
);
publishWaitList.add(publishFuture);

Expand Down Expand Up @@ -1005,7 +1008,8 @@ public Void apply(@Nullable SegmentsAndCommitMetadata handoffSegmentsAndCommitMe
handoffFuture.set(handoffSegmentsAndCommitMetadata);
return null;
}
}
},
MoreExecutors.directExecutor()
);
}

Expand All @@ -1015,7 +1019,8 @@ public void onFailure(Throwable t)
log.error(t, "Error while publishing segments for sequenceNumber[%s]", sequenceMetadata);
handoffFuture.setException(t);
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -948,7 +948,8 @@ private Map<String, Map<String, Object>> getCurrentTotalStats()
groupId,
taskId,
currentStats
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
Expand All @@ -966,7 +967,8 @@ private Map<String, Map<String, Object>> getCurrentTotalStats()
groupId,
taskId,
currentStats
)
),
MoreExecutors.directExecutor()
)
);
groupAndTaskIds.add(new Pair<>(groupId, taskId));
Expand Down Expand Up @@ -1805,7 +1807,8 @@ public Void apply(@Nullable Boolean result)
}
return null;
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -2485,7 +2488,8 @@ public Map<PartitionIdType, SequenceOffsetType> apply(@Nullable Object input)
{
return null;
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down Expand Up @@ -3188,7 +3192,8 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept
}

return null;
}
},
MoreExecutors.directExecutor()
)
).collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import org.apache.druid.client.indexing.IndexingService;
Expand Down Expand Up @@ -253,7 +254,8 @@ public void onFailure(Throwable t)
{
submitNoticeToExec(new StatusNotice(task, TaskStatus.failure(task.getId())));
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.commons.io.FileUtils;
import org.apache.druid.indexer.TaskStatus;
Expand Down Expand Up @@ -202,7 +203,8 @@ public TaskStatus apply(TaskStatus taskStatus)
throw new RuntimeException(e);
}
}
}
},
MoreExecutors.directExecutor()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.sun.jersey.spi.container.ResourceFilters;
import org.apache.druid.guice.annotations.Json;
Expand Down Expand Up @@ -191,7 +192,8 @@ public void onFailure(Throwable th)
log.debug(ex, "Request timed out or closed already.");
}
}
}
},
MoreExecutors.directExecutor()
);

asyncContext.setTimeout(timeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public void testHandoffTimeout() throws Exception
// handoff would timeout, resulting in exception
TaskStatus status = statusFuture.get();
Assert.assertTrue(status.getErrorMsg()
.contains("java.util.concurrent.TimeoutException: Timeout waiting for task."));
.contains("java.util.concurrent.TimeoutException: Waited 100 milliseconds"));
}

@Test(timeout = 60_000L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ private Future<TaskStatus> runTask(Task task)
(Function<TaskStatus, TaskStatus>) status -> {
shutdownTask(task);
return status;
}
},
MoreExecutors.directExecutor()
);
return cleanupFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public void onFailure(Throwable t)
{
runningItems.remove(taskRunnerWorkItem);
}
}
},
MoreExecutors.directExecutor()
);

return statusFuture;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@
<datasketches.memory.version>1.2.0-incubating</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.0.0</dropwizard.metrics.version>
<guava.version>16.0.1</guava.version>
<guava.version>27.0-jre</guava.version>
<guice.version>4.1.0</guice.version>
<hamcrest.version>1.3</hamcrest.version>
<jetty.version>9.4.34.v20201102</jetty.version>
Expand Down
Loading