Skip to content

Commit

Permalink
[paypal#290] Add admin API to kill running queries
Browse files Browse the repository at this point in the history
  • Loading branch information
pjeli committed Jan 2, 2020
1 parent 0746448 commit 7040a9b
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ enum Endpoint {
removeCachedQuery,
cachedMaps,
sql,
fileTypes
fileTypes,
queryGuard
}

EnumSet<Endpoint> UNSECURED_ENDPOINTS =
Expand Down Expand Up @@ -353,7 +354,8 @@ enum Endpoint {
Endpoint.removeDirectory,
Endpoint.metrics,
Endpoint.setCachedQuery,
Endpoint.removeCachedQuery);
Endpoint.removeCachedQuery,
Endpoint.queryGuard);

EnumSet<Filter> FILTER_FILE =
EnumSet.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class HadoopWebServerMain implements ApplicationMain {
private final List<BaseQuery> runningQueries = Collections.synchronizedList(new LinkedList<>());
private final ReentrantReadWriteLock queryLock = new ReentrantReadWriteLock();
private final AtomicBoolean savingNamespace = new AtomicBoolean(false);
private final AtomicBoolean cancelRequest = new AtomicBoolean(false);

private final NameNodeLoader nameNodeLoader = new NameNodeLoader();
private final HsqlDriver hsqlDriver = new HsqlDriver();
Expand Down Expand Up @@ -150,7 +151,8 @@ public void init(
internalService,
operationService,
queryLock,
savingNamespace);
savingNamespace,
cancelRequest);
nnaHttpServer.start();

// Bootstrap classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class NameNodeAnalyticsHttpServer {
public static final String NNA_OPERATION_SERVICE = "nna.operation.service";
public static final String NNA_QUERY_LOCK = "nna.query.lock";
public static final String NNA_SAVING_NAMESPACE = "nna.saving.namespace";
public static final String NNA_CANCEL_REQUEST = "nna.cancel.request";
public static final String NNA_APP_CONF = "nna.app.conf";
public static final String NNA_HADOOP_CONF = "nna.hadoop.conf";

Expand All @@ -83,6 +84,7 @@ public class NameNodeAnalyticsHttpServer {
private final ExecutorService operationService;
private final ReentrantReadWriteLock queryLock;
private final AtomicBoolean savingNamespace;
private final AtomicBoolean cancelRequest;

// Jetty Http server.
private HttpServer2 httpServer;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class NameNodeAnalyticsHttpServer {
* @param operationService executor for operation service threads
* @param queryLock lock around queries
* @param savingNamespace lock around namespace operations
* @param cancelRequest lock around allowing next queries to process or not
*/
public NameNodeAnalyticsHttpServer(
Configuration conf,
Expand All @@ -126,7 +129,8 @@ public NameNodeAnalyticsHttpServer(
ExecutorService internalService,
ExecutorService operationService,
ReentrantReadWriteLock queryLock,
AtomicBoolean savingNamespace) {
AtomicBoolean savingNamespace,
AtomicBoolean cancelRequest) {
this.conf = conf;
this.nnaConf = nnaConf;
this.bindAddress = bindAddress;
Expand All @@ -140,6 +144,7 @@ public NameNodeAnalyticsHttpServer(
this.operationService = operationService;
this.queryLock = queryLock;
this.savingNamespace = savingNamespace;
this.cancelRequest = cancelRequest;
}

/**
Expand Down Expand Up @@ -191,6 +196,7 @@ public void start() throws IOException {
httpServer.getWebAppContext().setAttribute(NNA_OPERATION_SERVICE, operationService);
httpServer.getWebAppContext().setAttribute(NNA_QUERY_LOCK, queryLock);
httpServer.getWebAppContext().setAttribute(NNA_SAVING_NAMESPACE, savingNamespace);
httpServer.getWebAppContext().setAttribute(NNA_CANCEL_REQUEST, cancelRequest);
httpServer.getWebAppContext().setAttribute(NNA_APP_CONF, nnaConf);
httpServer.getWebAppContext().setAttribute(NNA_HADOOP_CONF, conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class WebServerMain implements ApplicationMain {
Collections.synchronizedMap(new HashMap<>());

private final AtomicBoolean savingNamespace = new AtomicBoolean(false);
private final AtomicBoolean cancelRequest = new AtomicBoolean(false);

/**
* This is the main launching call for use in production. Should not accept any arguments. Service
Expand Down Expand Up @@ -401,6 +402,7 @@ public void init(
boolean isProvidingSuggestions = nameNodeLoader.getSuggestionsEngine().isLoaded();
sb.append("Current system time (ms): ").append(Time.now()).append("\n");
sb.append("Ready to service queries: ").append(isInit).append("\n");
sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n");
sb.append("Ready to service history: ").append(isHistorical).append("\n");
sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n");
AnalysisState currentAnalysisState =
Expand Down Expand Up @@ -567,8 +569,23 @@ public void init(
}
});

/* QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */
get(
"/queryGuard",
(req, res) -> {
res.header("Access-Control-Allow-Origin", "*");
res.header("Content-Type", "text/plain");
boolean guarding = !cancelRequest.get();
cancelRequest.set(guarding);
if (guarding) {
return "Guarding against queries. All queries after current processing will be killed.";
} else {
return "All queries allowed.";
}
});

/* METRICS endpoint is meant to return information on the users and the
amount of queries they are making */
amount of queries they are making. */
get(
"/metrics",
(req, res) -> {
Expand Down Expand Up @@ -716,6 +733,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String filterStr1 = req.queryMap("filters1").value();
String filterStr2 = req.queryMap("filters2").value();
String emailsToStr = req.queryMap("emailTo").value();
Expand Down Expand Up @@ -794,6 +814,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String fullFilterStr = req.queryMap("filters").value();
String emailsToStr = req.queryMap("emailTo").value();
String emailsCcStr = req.queryMap("emailCC").value();
Expand Down Expand Up @@ -882,6 +905,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String histogramConditionsStr = req.queryMap("histogramConditions").value();
final String emailsToStr = req.queryMap("emailTo").value();
Expand Down Expand Up @@ -1025,6 +1051,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down Expand Up @@ -1101,6 +1130,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hadoop.hdfs.server.namenode.analytics.web;

import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_APP_CONF;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_CANCEL_REQUEST;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_HSQL_DRIVER;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_NN_LOADER;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_OPERATION_SERVICE;
Expand Down Expand Up @@ -409,6 +410,31 @@ public Response operations() {
}
}

/** QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */
@GET
@Path("/queryGaurd")
@Produces({MediaType.TEXT_PLAIN})
public Response queryGuard() {
try {
before();
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
boolean guarding = !cancelRequest.get();
cancelRequest.set(guarding);
if (guarding) {
return Response.ok(
"Guarding against queries. All queries after current processing will be killed.",
MediaType.TEXT_PLAIN)
.build();
} else {
return Response.ok("All queries allowed.", MediaType.TEXT_PLAIN).build();
}
} catch (Exception ex) {
return handleException(ex);
} finally {
after();
}
}

/**
* METRICS endpoint is meant to return information on the users and the amount of queries they are
* making.
Expand Down Expand Up @@ -585,6 +611,7 @@ public Response info() {
@SuppressWarnings("unchecked")
final List<BaseQuery> runningQueries =
(List<BaseQuery>) context.getAttribute(NNA_RUNNING_QUERIES);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -602,6 +629,7 @@ public Response info() {
boolean isProvidingSuggestions = nnLoader.getSuggestionsEngine().isLoaded();
sb.append("Current system time (ms): ").append(Time.now()).append("\n");
sb.append("Ready to service queries: ").append(isInit).append("\n");
sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n");
sb.append("Ready to service history: ").append(isHistorical).append("\n");
sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n");
if (isInit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.server.namenode.queries;

import java.util.Objects;

public class BaseQuery {

private final String trackingUrl;
Expand Down Expand Up @@ -52,9 +54,7 @@ public boolean equals(Object o) {

BaseQuery baseQuery = (BaseQuery) o;

return (trackingUrl != null
? trackingUrl.equals(baseQuery.trackingUrl)
: baseQuery.trackingUrl == null)
&& (userName != null ? userName.equals(baseQuery.userName) : baseQuery.userName == null);
return (Objects.equals(trackingUrl, baseQuery.trackingUrl))
&& (Objects.equals(userName, baseQuery.userName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.core.StringContains.containsString;
Expand All @@ -47,6 +48,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
Expand All @@ -63,16 +68,19 @@
import org.apache.hadoop.hdfs.server.namenode.NameNodeLoader;
import org.apache.hadoop.hdfs.server.namenode.QueryEngine;
import org.apache.hadoop.hdfs.server.namenode.queries.Transforms;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Before;
Expand Down Expand Up @@ -879,6 +887,50 @@ public void testMemoryConsumedHistogram() throws IOException {
assertThat(res.getStatusLine().getStatusCode(), is(200));
}

@Test
public void testBlockQueries() throws IOException, ExecutionException, InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
ResponseHandler<String> responseHandler =
response -> {
HttpEntity entity = response.getEntity();
return entity == null ? null : EntityUtils.toString(entity);
};
HttpGet get = new HttpGet("http://localhost:4567/histogram?set=all&type=memoryConsumed");
List<Future<String>> futures = new ArrayList<>(100);
for (int i = 0; i < 100; i++) {
Future<String> result =
executorService.submit(
() -> new DefaultHttpClient().execute(hostPort, get, responseHandler));
futures.add(result);
}

HttpGet killQuery = new HttpGet("http://localhost:4567/queryGuard");
HttpResponse res = client.execute(hostPort, killQuery);

assertThat(res.getStatusLine().getStatusCode(), is(200));
List<String> result = IOUtils.readLines(res.getEntity().getContent());
System.out.println(result);

int queryCancelled = 0;
for (Future<String> future : futures) {
String output = future.get();
if (output.contains("Query cancelled.")) {
queryCancelled++;
}
}
assertThat(queryCancelled, is(not(0))); // Some queries cancelled.
assertThat(queryCancelled, is(lessThan(100))); // Not all got cancelled.

HttpGet allowQueries = new HttpGet("http://localhost:4567/queryGuard");
HttpResponse allowRes = client.execute(hostPort, allowQueries);

assertThat(allowRes.getStatusLine().getStatusCode(), is(200));
List<String> allowStr = IOUtils.readLines(allowRes.getEntity().getContent());
System.out.println(allowStr);
assertThat(allowStr.size(), is(1));
assertThat(allowStr.get(0), is("All queries allowed."));
}

@Test
public void testStorageTypeHistogram() throws IOException {
HttpGet get = new HttpGet("http://localhost:4567/histogram?set=files&type=storageType");
Expand Down

0 comments on commit 7040a9b

Please sign in to comment.