Skip to content

Commit

Permalink
[paypal#290] Add admin API to kill running queries
Browse files Browse the repository at this point in the history
  • Loading branch information
pjeli committed Jan 2, 2020
1 parent 0746448 commit 74aed53
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ enum Endpoint {
removeCachedQuery,
cachedMaps,
sql,
fileTypes
fileTypes,
queryGuard
}

EnumSet<Endpoint> UNSECURED_ENDPOINTS =
Expand Down Expand Up @@ -353,7 +354,8 @@ enum Endpoint {
Endpoint.removeDirectory,
Endpoint.metrics,
Endpoint.setCachedQuery,
Endpoint.removeCachedQuery);
Endpoint.removeCachedQuery,
Endpoint.queryGuard);

EnumSet<Filter> FILTER_FILE =
EnumSet.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public class HadoopWebServerMain implements ApplicationMain {
private final List<BaseQuery> runningQueries = Collections.synchronizedList(new LinkedList<>());
private final ReentrantReadWriteLock queryLock = new ReentrantReadWriteLock();
private final AtomicBoolean savingNamespace = new AtomicBoolean(false);
private final AtomicBoolean cancelRequest = new AtomicBoolean(false);

private final NameNodeLoader nameNodeLoader = new NameNodeLoader();
private final HsqlDriver hsqlDriver = new HsqlDriver();
Expand Down Expand Up @@ -150,7 +151,8 @@ public void init(
internalService,
operationService,
queryLock,
savingNamespace);
savingNamespace,
cancelRequest);
nnaHttpServer.start();

// Bootstrap classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class NameNodeAnalyticsHttpServer {
public static final String NNA_OPERATION_SERVICE = "nna.operation.service";
public static final String NNA_QUERY_LOCK = "nna.query.lock";
public static final String NNA_SAVING_NAMESPACE = "nna.saving.namespace";
public static final String NNA_CANCEL_REQUEST = "nna.cancel.request";
public static final String NNA_APP_CONF = "nna.app.conf";
public static final String NNA_HADOOP_CONF = "nna.hadoop.conf";

Expand All @@ -83,6 +84,7 @@ public class NameNodeAnalyticsHttpServer {
private final ExecutorService operationService;
private final ReentrantReadWriteLock queryLock;
private final AtomicBoolean savingNamespace;
private final AtomicBoolean cancelRequest;

// Jetty Http server.
private HttpServer2 httpServer;
Expand Down Expand Up @@ -112,6 +114,7 @@ public class NameNodeAnalyticsHttpServer {
* @param operationService executor for operation service threads
* @param queryLock lock around queries
* @param savingNamespace lock around namespace operations
* @param cancelRequest lock around allowing next queries to process or not
*/
public NameNodeAnalyticsHttpServer(
Configuration conf,
Expand All @@ -126,7 +129,8 @@ public NameNodeAnalyticsHttpServer(
ExecutorService internalService,
ExecutorService operationService,
ReentrantReadWriteLock queryLock,
AtomicBoolean savingNamespace) {
AtomicBoolean savingNamespace,
AtomicBoolean cancelRequest) {
this.conf = conf;
this.nnaConf = nnaConf;
this.bindAddress = bindAddress;
Expand All @@ -140,6 +144,7 @@ public NameNodeAnalyticsHttpServer(
this.operationService = operationService;
this.queryLock = queryLock;
this.savingNamespace = savingNamespace;
this.cancelRequest = cancelRequest;
}

/**
Expand Down Expand Up @@ -191,6 +196,7 @@ public void start() throws IOException {
httpServer.getWebAppContext().setAttribute(NNA_OPERATION_SERVICE, operationService);
httpServer.getWebAppContext().setAttribute(NNA_QUERY_LOCK, queryLock);
httpServer.getWebAppContext().setAttribute(NNA_SAVING_NAMESPACE, savingNamespace);
httpServer.getWebAppContext().setAttribute(NNA_CANCEL_REQUEST, cancelRequest);
httpServer.getWebAppContext().setAttribute(NNA_APP_CONF, nnaConf);
httpServer.getWebAppContext().setAttribute(NNA_HADOOP_CONF, conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class WebServerMain implements ApplicationMain {
Collections.synchronizedMap(new HashMap<>());

private final AtomicBoolean savingNamespace = new AtomicBoolean(false);
private final AtomicBoolean cancelRequest = new AtomicBoolean(false);

/**
* This is the main launching call for use in production. Should not accept any arguments. Service
Expand Down Expand Up @@ -401,6 +402,7 @@ public void init(
boolean isProvidingSuggestions = nameNodeLoader.getSuggestionsEngine().isLoaded();
sb.append("Current system time (ms): ").append(Time.now()).append("\n");
sb.append("Ready to service queries: ").append(isInit).append("\n");
sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n");
sb.append("Ready to service history: ").append(isHistorical).append("\n");
sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n");
AnalysisState currentAnalysisState =
Expand Down Expand Up @@ -567,8 +569,23 @@ public void init(
}
});

/* QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */
get(
"/queryGuard",
(req, res) -> {
res.header("Access-Control-Allow-Origin", "*");
res.header("Content-Type", "text/plain");
boolean guarding = !cancelRequest.get();
cancelRequest.set(guarding);
if (guarding) {
return "Guarding against queries. All queries after current processing will be killed.";
} else {
return "All queries allowed.";
}
});

/* METRICS endpoint is meant to return information on the users and the
amount of queries they are making */
amount of queries they are making. */
get(
"/metrics",
(req, res) -> {
Expand Down Expand Up @@ -716,6 +733,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String filterStr1 = req.queryMap("filters1").value();
String filterStr2 = req.queryMap("filters2").value();
String emailsToStr = req.queryMap("emailTo").value();
Expand Down Expand Up @@ -794,6 +814,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String fullFilterStr = req.queryMap("filters").value();
String emailsToStr = req.queryMap("emailTo").value();
String emailsCcStr = req.queryMap("emailCC").value();
Expand Down Expand Up @@ -882,6 +905,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String histogramConditionsStr = req.queryMap("histogramConditions").value();
final String emailsToStr = req.queryMap("emailTo").value();
Expand Down Expand Up @@ -1025,6 +1051,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down Expand Up @@ -1101,6 +1130,9 @@ public void init(

lock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = req.queryMap("filters").value();
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.hadoop.hdfs.server.namenode.analytics.web;

import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_APP_CONF;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_CANCEL_REQUEST;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_HSQL_DRIVER;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_NN_LOADER;
import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_OPERATION_SERVICE;
Expand Down Expand Up @@ -409,6 +410,31 @@ public Response operations() {
}
}

/** QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */
@GET
@Path("/queryGuard")
@Produces({MediaType.TEXT_PLAIN})
public Response queryGuard() {
try {
before();
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
boolean guarding = !cancelRequest.get();
cancelRequest.set(guarding);
if (guarding) {
return Response.ok(
"Guarding against queries. All queries after current processing will be killed.",
MediaType.TEXT_PLAIN)
.build();
} else {
return Response.ok("All queries allowed.", MediaType.TEXT_PLAIN).build();
}
} catch (Exception ex) {
return handleException(ex);
} finally {
after();
}
}

/**
* METRICS endpoint is meant to return information on the users and the amount of queries they are
* making.
Expand Down Expand Up @@ -585,6 +611,7 @@ public Response info() {
@SuppressWarnings("unchecked")
final List<BaseQuery> runningQueries =
(List<BaseQuery>) context.getAttribute(NNA_RUNNING_QUERIES);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -602,6 +629,7 @@ public Response info() {
boolean isProvidingSuggestions = nnLoader.getSuggestionsEngine().isLoaded();
sb.append("Current system time (ms): ").append(Time.now()).append("\n");
sb.append("Ready to service queries: ").append(isInit).append("\n");
sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n");
sb.append("Ready to service history: ").append(isHistorical).append("\n");
sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n");
if (isInit) {
Expand Down Expand Up @@ -1282,6 +1310,7 @@ public Response divide() {
final NameNodeLoader nnLoader = (NameNodeLoader) context.getAttribute(NNA_NN_LOADER);
final ReentrantReadWriteLock queryLock =
(ReentrantReadWriteLock) context.getAttribute(NNA_QUERY_LOCK);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -1295,6 +1324,9 @@ public Response divide() {

queryLock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String filterStr1 = request.getParameter("filters1");
String filterStr2 = request.getParameter("filters2");
String emailsToStr = request.getParameter("emailTo");
Expand Down Expand Up @@ -1373,6 +1405,7 @@ public Response filter() {
final NameNodeLoader nnLoader = (NameNodeLoader) context.getAttribute(NNA_NN_LOADER);
final ReentrantReadWriteLock queryLock =
(ReentrantReadWriteLock) context.getAttribute(NNA_QUERY_LOCK);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -1386,6 +1419,9 @@ public Response filter() {

queryLock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
String fullFilterStr = request.getParameter("filters");
String emailsToStr = request.getParameter("emailTo");
String emailsCcStr = request.getParameter("emailCC");
Expand Down Expand Up @@ -1481,6 +1517,7 @@ public Response histogram() {
final NameNodeLoader nnLoader = (NameNodeLoader) context.getAttribute(NNA_NN_LOADER);
final ReentrantReadWriteLock queryLock =
(ReentrantReadWriteLock) context.getAttribute(NNA_QUERY_LOCK);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -1496,6 +1533,9 @@ public Response histogram() {

queryLock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = request.getParameter("filters");
final String histogramConditionsStr = request.getParameter("histogramConditions");
final String emailsToStr = request.getParameter("emailTo");
Expand Down Expand Up @@ -1683,6 +1723,7 @@ public Response histogram2() {
final NameNodeLoader nnLoader = (NameNodeLoader) context.getAttribute(NNA_NN_LOADER);
final ReentrantReadWriteLock queryLock =
(ReentrantReadWriteLock) context.getAttribute(NNA_QUERY_LOCK);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -1698,6 +1739,9 @@ public Response histogram2() {

queryLock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = request.getParameter("filters");
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down Expand Up @@ -1791,6 +1835,7 @@ public Response histogram3() {
final NameNodeLoader nnLoader = (NameNodeLoader) context.getAttribute(NNA_NN_LOADER);
final ReentrantReadWriteLock queryLock =
(ReentrantReadWriteLock) context.getAttribute(NNA_QUERY_LOCK);
final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST);
try {
before();

Expand All @@ -1806,6 +1851,9 @@ public Response histogram3() {

queryLock.writeLock().lock();
try {
if (cancelRequest.get()) {
throw new IOException("Query cancelled.");
}
final String fullFilterStr = request.getParameter("filters");
final String[] filters = Helper.parseFilters(fullFilterStr);
final String[] filterOps = Helper.parseFilterOps(fullFilterStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.server.namenode.queries;

import java.util.Objects;

public class BaseQuery {

private final String trackingUrl;
Expand Down Expand Up @@ -52,9 +54,7 @@ public boolean equals(Object o) {

BaseQuery baseQuery = (BaseQuery) o;

return (trackingUrl != null
? trackingUrl.equals(baseQuery.trackingUrl)
: baseQuery.trackingUrl == null)
&& (userName != null ? userName.equals(baseQuery.userName) : baseQuery.userName == null);
return (Objects.equals(trackingUrl, baseQuery.trackingUrl))
&& (Objects.equals(userName, baseQuery.userName));
}
}
Loading

0 comments on commit 74aed53

Please sign in to comment.