Skip to content

Commit 492dfb8

Browse files
committed
Second batch.
1 parent 77ac50d commit 492dfb8

File tree

18 files changed

+72
-30
lines changed

18 files changed

+72
-30
lines changed

druid-handler/src/java/org/apache/hadoop/hive/druid/DruidStorageHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,11 @@ public String call() throws Exception {
294294
);
295295
}
296296
}, new Predicate<Throwable>() {
297+
@Override
298+
public boolean test(@Nullable Throwable input) {
299+
return input instanceof IOException;
300+
}
301+
297302
@Override
298303
public boolean apply(@Nullable Throwable input) {
299304
return input instanceof IOException;
@@ -341,6 +346,18 @@ public URL apply(DataSegment dataSegment) {
341346
int numRetries = 0;
342347
while (numRetries++ < maxTries && !setOfUrls.isEmpty()) {
343348
setOfUrls = ImmutableSet.copyOf(Sets.filter(setOfUrls, new Predicate<URL>() {
349+
@Override
350+
public boolean test(URL input) {
351+
try {
352+
String result = DruidStorageHandlerUtils.getURL(httpClient, input);
353+
LOG.debug(String.format("Checking segment [%s] response is [%s]", input, result));
354+
return Strings.isNullOrEmpty(result);
355+
} catch (IOException e) {
356+
LOG.error(String.format("Error while checking URL [%s]", input), e);
357+
return true;
358+
}
359+
}
360+
344361
@Override
345362
public boolean apply(URL input) {
346363
try {

druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidQueryRecordReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hive.druid.serde;
1919

20-
import com.google.common.collect.Iterators;
2120
import com.metamx.common.lifecycle.Lifecycle;
2221
import com.metamx.http.client.HttpClient;
2322
import com.metamx.http.client.HttpClientConfig;
@@ -37,6 +36,7 @@
3736

3837
import java.io.IOException;
3938
import java.io.InputStream;
39+
import java.util.Collections;
4040
import java.util.Iterator;
4141
import java.util.List;
4242

@@ -63,7 +63,8 @@ public abstract class DruidQueryRecordReader<T extends BaseQuery<R>, R extends C
6363
/**
6464
* Query results.
6565
*/
66-
protected Iterator<R> results = Iterators.emptyIterator();
66+
67+
protected Iterator<R> results = Collections.emptyIterator();
6768

6869
@Override
6970
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException {

druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidSelectQueryRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.io.InputStream;
22+
import java.util.Collections;
2223
import java.util.Iterator;
2324
import java.util.List;
2425

@@ -27,7 +28,6 @@
2728
import org.apache.hadoop.io.NullWritable;
2829

2930
import com.fasterxml.jackson.core.type.TypeReference;
30-
import com.google.common.collect.Iterators;
3131

3232
import io.druid.query.Result;
3333
import io.druid.query.select.EventHolder;
@@ -42,7 +42,7 @@ public class DruidSelectQueryRecordReader
4242

4343
private Result<SelectResultValue> current;
4444

45-
private Iterator<EventHolder> values = Iterators.emptyIterator();
45+
private Iterator<EventHolder> values = Collections.emptyIterator();
4646

4747
@Override
4848
protected SelectQuery createQuery(String content) throws IOException {

druid-handler/src/java/org/apache/hadoop/hive/druid/serde/DruidTopNQueryRecordReader.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.IOException;
2121
import java.io.InputStream;
22+
import java.util.Collections;
2223
import java.util.Iterator;
2324
import java.util.List;
2425

@@ -27,7 +28,6 @@
2728
import org.apache.hadoop.io.NullWritable;
2829

2930
import com.fasterxml.jackson.core.type.TypeReference;
30-
import com.google.common.collect.Iterators;
3131

3232
import io.druid.query.Result;
3333
import io.druid.query.topn.DimensionAndMetricValueExtractor;
@@ -42,7 +42,7 @@ public class DruidTopNQueryRecordReader
4242

4343
private Result<TopNResultValue> current;
4444

45-
private Iterator<DimensionAndMetricValueExtractor> values = Iterators.emptyIterator();
45+
private Iterator<DimensionAndMetricValueExtractor> values = Collections.emptyIterator();
4646

4747
@Override
4848
protected TopNQuery createQuery(String content) throws IOException {

itests/hive-unit/src/test/java/org/apache/hadoop/hive/io/TestHadoopFileStatus.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ public void testHadoopFileStatusAclEntries() throws IOException {
7979
Assert.assertNotNull(sourceStatus.getAclEntries());
8080
Assert.assertTrue(sourceStatus.getAclEntries().size() == 3);
8181
Iterables.removeIf(sourceStatus.getAclEntries(), new Predicate<AclEntry>() {
82+
@Override
83+
public boolean test(AclEntry input) {
84+
if (input.getName() == null) {
85+
return true;
86+
}
87+
return false;
88+
}
89+
8290
@Override
8391
public boolean apply(AclEntry input) {
8492
if (input.getName() == null) {

itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscovery.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ private class ConnParamInfoPred implements Predicate<ConnParamInfo> {
143143
this.pathPrefix = pathPrefix;
144144
}
145145

146+
@Override
147+
public boolean test(ConnParamInfo inputParam) {
148+
return inputParam.host.equals(host) && inputParam.port == port &&
149+
inputParam.path.startsWith(pathPrefix);
150+
}
151+
146152
@Override
147153
public boolean apply(ConnParamInfo inputParam) {
148154
return inputParam.host.equals(host) && inputParam.port == port &&

itests/util/src/main/java/org/apache/hadoop/hive/cli/control/CoreCliDriver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public Void invokeInternal() throws Exception {
153153

154154
@Override
155155
public void runTest(String tname, String fname, String fpath) throws Exception {
156-
Stopwatch sw = new Stopwatch().start();
156+
Stopwatch sw = Stopwatch.createStarted();
157157
boolean skipped = false;
158158
boolean failed = false;
159159
try {

itests/util/src/main/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAuthorizationValidatorForTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,15 @@ List<HivePrivilegeObject> filterForBypass(List<HivePrivilegeObject> privilegeObj
7979
return null;
8080
} else {
8181
return Lists.newArrayList(Iterables.filter(privilegeObjects,new Predicate<HivePrivilegeObject>() {
82+
@Override
83+
public boolean test(@Nullable HivePrivilegeObject hivePrivilegeObject) {
84+
// Return true to retain an item, and false to filter it out.
85+
if (hivePrivilegeObject == null){
86+
return true;
87+
}
88+
return !bypassObjectTypes.contains(hivePrivilegeObject.getType());
89+
}
90+
8291
@Override
8392
public boolean apply(@Nullable HivePrivilegeObject hivePrivilegeObject) {
8493
// Return true to retain an item, and false to filter it out.

itests/util/src/main/java/org/apache/hadoop/hive/util/ElapsedTimeLoggingWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public abstract class ElapsedTimeLoggingWrapper<T> {
2828
public abstract T invokeInternal() throws Exception;
2929

3030
public T invoke(String message, Logger LOG, boolean toStdErr) throws Exception {
31-
Stopwatch sw = new Stopwatch().start();
31+
Stopwatch sw = Stopwatch.createStarted();
3232
try {
3333
T retVal = invokeInternal();
3434
return retVal;

llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public void onFailure(Throwable t) {
176176
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
177177
}
178178
}
179-
});
179+
}, queueLookupExecutor);
180180
// TODO: why is this needed? we could just save the host and port?
181181
nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
182182
LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId);
@@ -271,7 +271,7 @@ public void onFailure(Throwable t) {
271271
LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.",
272272
taskAttemptId);
273273
}
274-
});
274+
}, executor);
275275
}
276276

277277
public void queryComplete(QueryIdentifier queryIdentifier) {
@@ -337,7 +337,7 @@ public void onFailure(Throwable t) {
337337
amNodeInfo.amNodeId, currentQueryIdentifier, t);
338338
queryFailedHandler.queryFailed(currentQueryIdentifier);
339339
}
340-
});
340+
}, executor);
341341
}
342342
}
343343
} catch (InterruptedException e) {

0 commit comments

Comments
 (0)