Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.hadoop.hive.common;

import com.google.common.base.Objects;
import com.google.common.base.MoreObjects;

import org.apache.hadoop.metrics2.MetricsInfo;

Expand Down Expand Up @@ -58,7 +58,7 @@ public enum JvmMetricsInfo implements MetricsInfo {
@Override public String description() { return desc; }

@Override public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("name", name()).add("description", desc)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public String toString() {
private class Monitor implements Runnable {
@Override
public void run() {
Stopwatch sw = new Stopwatch();
Stopwatch sw = Stopwatch.createUnstarted();
Map<String, GcTimes> gcTimesBeforeSleep = getGcTimes();
while (shouldRun) {
sw.reset().start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public String call() throws Exception {
);
}
}, new Predicate<Throwable>() {
@Override
public boolean test(@Nullable Throwable input) {
return input instanceof IOException;
}

@Override
public boolean apply(@Nullable Throwable input) {
return input instanceof IOException;
Expand Down Expand Up @@ -341,6 +346,18 @@ public URL apply(DataSegment dataSegment) {
int numRetries = 0;
while (numRetries++ < maxTries && !setOfUrls.isEmpty()) {
setOfUrls = ImmutableSet.copyOf(Sets.filter(setOfUrls, new Predicate<URL>() {
@Override
public boolean test(URL input) {
try {
String result = DruidStorageHandlerUtils.getURL(httpClient, input);
LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result));
return Strings.isNullOrEmpty(result);
} catch (IOException e) {
LOG.error(String.format("Error while checking URL [%s]", input), e);
return true;
}
}

@Override
public boolean apply(URL input) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.druid.serde;

import com.google.common.collect.Iterators;
import com.metamx.common.lifecycle.Lifecycle;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.HttpClientConfig;
Expand All @@ -37,6 +36,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

Expand All @@ -63,7 +63,8 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
/**
* Query results.
*/
protected Iterator<R> results = Iterators.emptyIterator();

protected Iterator<R> results = Collections.emptyIterator();

@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

Expand All @@ -27,7 +28,6 @@
import org.apache.hadoop.io.NullWritable;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Iterators;

import io.druid.query.Result;
import io.druid.query.select.EventHolder;
Expand All @@ -42,7 +42,7 @@ public class DruidSelectQueryRecordReader

private Result<SelectResultValue> current;

private Iterator<EventHolder> values = Iterators.emptyIterator();
private Iterator<EventHolder> values = Collections.emptyIterator();

@Override
protected SelectQuery createQuery(String content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

Expand All @@ -27,7 +28,6 @@
import org.apache.hadoop.io.NullWritable;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Iterators;

import io.druid.query.Result;
import io.druid.query.topn.DimensionAndMetricValueExtractor;
Expand All @@ -42,7 +42,7 @@ public class DruidTopNQueryRecordReader

private Result<TopNResultValue> current;

private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
private Iterator<DimensionAndMetricValueExtractor> values = Collections.emptyIterator();

@Override
protected TopNQuery createQuery(String content) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public void testHadoopFileStatusAclEntries() throws IOException {
Assert.assertNotNull(sourceStatus.getAclEntries());
Assert.assertTrue(sourceStatus.getAclEntries().size() == 3);
Iterables.removeIf(sourceStatus.getAclEntries(), new Predicate<AclEntry>() {
@Override
public boolean test(AclEntry input) {
if (input.getName() == null) {
return true;
}
return false;
}

@Override
public boolean apply(AclEntry input) {
if (input.getName() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,12 @@ private class ConnParamInfoPred implements Predicate<ConnParamInfo> {
this.pathPrefix = pathPrefix;
}

@Override
public boolean test(ConnParamInfo inputParam) {
return inputParam.host.equals(host) && inputParam.port == port &&
inputParam.path.startsWith(pathPrefix);
}

@Override
public boolean apply(ConnParamInfo inputParam) {
return inputParam.host.equals(host) && inputParam.port == port &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public Void invokeInternal() throws Exception {

@Override
public void runTest(String tname, String fname, String fpath) throws Exception {
Stopwatch sw = new Stopwatch().start();
Stopwatch sw = Stopwatch.createStarted();
boolean skipped = false;
boolean failed = false;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ List<HivePrivilegeObject> filterForBypass(List<HivePrivilegeObject> privilegeObj
return null;
} else {
return Lists.newArrayList(Iterables.filter(privilegeObjects,new Predicate<HivePrivilegeObject>() {
@Override
public boolean test(@Nullable HivePrivilegeObject hivePrivilegeObject) {
// Return true to retain an item, and false to filter it out.
if (hivePrivilegeObject == null){
return true;
}
return !bypassObjectTypes.contains(hivePrivilegeObject.getType());
}

@Override
public boolean apply(@Nullable HivePrivilegeObject hivePrivilegeObject) {
// Return true to retain an item, and false to filter it out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public abstract class ElapsedTimeLoggingWrapper<T> {
public abstract T invokeInternal() throws Exception;

public T invoke(String message, Logger LOG, boolean toStdErr) throws Exception {
Stopwatch sw = new Stopwatch().start();
Stopwatch sw = Stopwatch.createStarted();
try {
T retVal = invokeInternal();
return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public void onSuccess(Void result) {
public void onFailure(Throwable t) {
LOG.warn("RequestManager shutdown with error", t);
}
});
}, requestManagerExecutor);
}

@Override
Expand Down Expand Up @@ -263,7 +263,7 @@ public void shutdown() {
void submitToExecutor(CallableRequest request, LlapNodeId nodeId) {
ListenableFuture<SourceStateUpdatedResponseProto> future =
executor.submit(request);
Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this));
Futures.addCallback(future, new ResponseCallback(request.getCallback(), nodeId, this), executor);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void onFailure(Throwable t) {
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
}
});
}, queueLookupExecutor);
// TODO: why is this needed? we could just save the host and port?
nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId);
Expand Down Expand Up @@ -271,7 +271,7 @@ public void onFailure(Throwable t) {
LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.",
taskAttemptId);
}
});
}, executor);
}

public void queryComplete(QueryIdentifier queryIdentifier) {
Expand Down Expand Up @@ -337,7 +337,7 @@ public void onFailure(Throwable t) {
amNodeInfo.amNodeId, currentQueryIdentifier, t);
queryFailedHandler.queryFailed(currentQueryIdentifier);
}
});
}, executor);
}
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public synchronized void registerTask(RuntimeTask task,
currentCallable = new HeartbeatCallable(completionListener, task, umbilical, pollInterval, sendCounterInterval,
maxEventsToGet, requestCounter, containerIdStr, initialEvent, fragmentRequestId);
ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
Futures.addCallback(future, new HeartbeatCallback(errorReporter));
Futures.addCallback(future, new HeartbeatCallback(errorReporter), heartbeatExecutor);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize,
executionCompletionExecutorService = MoreExecutors.listeningDecorator(
executionCompletionExecutorServiceRaw);
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback());
Futures.addCallback(future, new WaitQueueWorkerCallback(), waitQueueExecutorService);
}

private Comparator<TaskWrapper> createComparator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> {
private final String queryId;
private final HadoopShim tezHadoopShim;
private boolean shouldRunTask = true;
final Stopwatch runtimeWatch = new Stopwatch();
final Stopwatch killtimerWatch = new Stopwatch();
final Stopwatch runtimeWatch = Stopwatch.createUnstarted();
final Stopwatch killtimerWatch = Stopwatch.createUnstarted();
private final AtomicBoolean isStarted = new AtomicBoolean(false);
private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final AtomicBoolean killInvoked = new AtomicBoolean(false);
Expand Down Expand Up @@ -275,7 +275,7 @@ public LlapTaskUmbilicalProtocol run() throws Exception {
} finally {
FileSystem.closeAllForUGI(fsTaskUgi);
LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" +
runtimeWatch.stop().elapsedMillis());
runtimeWatch.stop().elapsed(TimeUnit.MILLISECONDS));
if (LOG.isDebugEnabled()) {
LOG.debug(
"canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish());
Expand Down Expand Up @@ -501,14 +501,14 @@ public void onSuccess(TaskRunner2Result result) {
LOG.info("Killed task {}", requestId);
if (killtimerWatch.isRunning()) {
killtimerWatch.stop();
long elapsed = killtimerWatch.elapsedMillis();
long elapsed = killtimerWatch.elapsed(TimeUnit.MILLISECONDS);
LOG.info("Time to die for task {}", elapsed);
if (metrics != null) {
metrics.addMetricsPreemptionTimeToKill(elapsed);
}
}
if (metrics != null) {
metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsedMillis());
metrics.addMetricsPreemptionTimeLost(runtimeWatch.elapsed(TimeUnit.MILLISECONDS));
metrics.incrExecutorTotalKilled();
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hive.llap.metrics;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.metrics2.MetricsInfo;

import com.google.common.base.Objects;

/**
* Metrics information for llap cache.
*/
Expand Down Expand Up @@ -50,7 +49,7 @@ public String description() {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("name", name()).add("description", desc)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hive.llap.metrics;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.metrics2.MetricsInfo;

import com.google.common.base.Objects;

/**
* Metrics information for llap daemon container.
*/
Expand Down Expand Up @@ -74,7 +73,7 @@ public String description() {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("name", name()).add("description", desc)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hive.llap.metrics;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.metrics2.MetricsInfo;

import com.google.common.base.Objects;

/**
* Llap daemon I/O elevator metrics
*/
Expand All @@ -42,7 +41,7 @@ public String description() {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("name", name()).add("description", desc)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hive.llap.metrics;

import com.google.common.base.MoreObjects;
import org.apache.hadoop.metrics2.MetricsInfo;

import com.google.common.base.Objects;

/**
* Llap daemon JVM info. These are some additional metrics that are not exposed via
* {@link org.apache.hadoop.metrics.jvm.JvmMetrics}
Expand Down Expand Up @@ -53,7 +52,7 @@ public String description() {

@Override
public String toString() {
return Objects.toStringHelper(this)
return MoreObjects.toStringHelper(this)
.add("name", name()).add("description", desc)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,15 @@ public Void call() throws Exception {
}, 10000L, TimeUnit.MILLISECONDS);

nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG), nodeEnabledExecutor);

delayedTaskSchedulerFuture =
delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
Futures.addCallback(delayedTaskSchedulerFuture,
new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG), delayedTaskSchedulerExecutor);

schedulerFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG), schedulerExecutor);

registry.start();
registry.registerStateChangeListener(new NodeStateChangeListener());
Expand Down
Loading