From 7040a9b53b008c99d05f6942f4f9261a74d43c71 Mon Sep 17 00:00:00 2001 From: pljeliazkov Date: Thu, 2 Jan 2020 12:02:49 -0800 Subject: [PATCH] [#290] Add admin API to kill running queries --- .../hdfs/server/namenode/Constants.java | 6 ++- .../analytics/HadoopWebServerMain.java | 4 +- .../NameNodeAnalyticsHttpServer.java | 8 ++- .../namenode/analytics/WebServerMain.java | 34 +++++++++++- .../web/NamenodeAnalyticsMethods.java | 28 ++++++++++ .../server/namenode/queries/BaseQuery.java | 8 +-- .../analytics/TestNNAnalyticsBase.java | 52 +++++++++++++++++++ 7 files changed, 131 insertions(+), 9 deletions(-) diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/Constants.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/Constants.java index a7d8ee45..e356f7cb 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/Constants.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/Constants.java @@ -288,7 +288,8 @@ enum Endpoint { removeCachedQuery, cachedMaps, sql, - fileTypes + fileTypes, + queryGuard } EnumSet UNSECURED_ENDPOINTS = @@ -353,7 +354,8 @@ enum Endpoint { Endpoint.removeDirectory, Endpoint.metrics, Endpoint.setCachedQuery, - Endpoint.removeCachedQuery); + Endpoint.removeCachedQuery, + Endpoint.queryGuard); EnumSet FILTER_FILE = EnumSet.of( diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/HadoopWebServerMain.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/HadoopWebServerMain.java index d5301712..afde1a3c 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/HadoopWebServerMain.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/HadoopWebServerMain.java @@ -81,6 +81,7 @@ public class HadoopWebServerMain implements ApplicationMain { private final List runningQueries = Collections.synchronizedList(new LinkedList<>()); private final ReentrantReadWriteLock queryLock = new ReentrantReadWriteLock(); private final AtomicBoolean savingNamespace = new AtomicBoolean(false); + private final AtomicBoolean cancelRequest = new AtomicBoolean(false); private final NameNodeLoader nameNodeLoader = new NameNodeLoader(); private final HsqlDriver hsqlDriver = new HsqlDriver(); @@ -150,7 +151,8 @@ public void init( internalService, operationService, queryLock, - savingNamespace); + savingNamespace, + cancelRequest); nnaHttpServer.start(); // Bootstrap classes. diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/NameNodeAnalyticsHttpServer.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/NameNodeAnalyticsHttpServer.java index 9188e212..ef0e7d73 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/NameNodeAnalyticsHttpServer.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/NameNodeAnalyticsHttpServer.java @@ -69,6 +69,7 @@ public class NameNodeAnalyticsHttpServer { public static final String NNA_OPERATION_SERVICE = "nna.operation.service"; public static final String NNA_QUERY_LOCK = "nna.query.lock"; public static final String NNA_SAVING_NAMESPACE = "nna.saving.namespace"; + public static final String NNA_CANCEL_REQUEST = "nna.cancel.request"; public static final String NNA_APP_CONF = "nna.app.conf"; public static final String NNA_HADOOP_CONF = "nna.hadoop.conf"; @@ -83,6 +84,7 @@ public class NameNodeAnalyticsHttpServer { private final ExecutorService operationService; private final ReentrantReadWriteLock queryLock; private final AtomicBoolean savingNamespace; + private final AtomicBoolean cancelRequest; // Jetty Http server. private HttpServer2 httpServer; @@ -112,6 +114,7 @@ public class NameNodeAnalyticsHttpServer { * @param operationService executor for operation service threads * @param queryLock lock around queries * @param savingNamespace lock around namespace operations + * @param cancelRequest lock around allowing next queries to process or not */ public NameNodeAnalyticsHttpServer( Configuration conf, @@ -126,7 +129,8 @@ public NameNodeAnalyticsHttpServer( ExecutorService internalService, ExecutorService operationService, ReentrantReadWriteLock queryLock, - AtomicBoolean savingNamespace) { + AtomicBoolean savingNamespace, + AtomicBoolean cancelRequest) { this.conf = conf; this.nnaConf = nnaConf; this.bindAddress = bindAddress; @@ -140,6 +144,7 @@ public NameNodeAnalyticsHttpServer( this.operationService = operationService; this.queryLock = queryLock; this.savingNamespace = savingNamespace; + this.cancelRequest = cancelRequest; } /** @@ -191,6 +196,7 @@ public void start() throws IOException { httpServer.getWebAppContext().setAttribute(NNA_OPERATION_SERVICE, operationService); httpServer.getWebAppContext().setAttribute(NNA_QUERY_LOCK, queryLock); httpServer.getWebAppContext().setAttribute(NNA_SAVING_NAMESPACE, savingNamespace); + httpServer.getWebAppContext().setAttribute(NNA_CANCEL_REQUEST, cancelRequest); httpServer.getWebAppContext().setAttribute(NNA_APP_CONF, nnaConf); httpServer.getWebAppContext().setAttribute(NNA_HADOOP_CONF, conf); diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/WebServerMain.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/WebServerMain.java index 1fd314ea..bff17fd2 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/WebServerMain.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/WebServerMain.java @@ -155,6 +155,7 @@ public class WebServerMain implements ApplicationMain { Collections.synchronizedMap(new HashMap<>()); private final AtomicBoolean savingNamespace = new AtomicBoolean(false); + private final AtomicBoolean cancelRequest = new AtomicBoolean(false); /** * This is the main launching call for use in production. Should not accept any arguments. Service @@ -401,6 +402,7 @@ public void init( boolean isProvidingSuggestions = nameNodeLoader.getSuggestionsEngine().isLoaded(); sb.append("Current system time (ms): ").append(Time.now()).append("\n"); sb.append("Ready to service queries: ").append(isInit).append("\n"); + sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n"); sb.append("Ready to service history: ").append(isHistorical).append("\n"); sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n"); AnalysisState currentAnalysisState = @@ -567,8 +569,23 @@ public void init( } }); + /* QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */ + get( + "/queryGuard", + (req, res) -> { + res.header("Access-Control-Allow-Origin", "*"); + res.header("Content-Type", "text/plain"); + boolean guarding = !cancelRequest.get(); + cancelRequest.set(guarding); + if (guarding) { + return "Guarding against queries. All queries after current processing will be killed."; + } else { + return "All queries allowed."; + } + }); + /* METRICS endpoint is meant to return information on the users and the - amount of queries they are making */ + amount of queries they are making. */ get( "/metrics", (req, res) -> { @@ -716,6 +733,9 @@ public void init( lock.writeLock().lock(); try { + if (cancelRequest.get()) { + throw new IOException("Query cancelled."); + } String filterStr1 = req.queryMap("filters1").value(); String filterStr2 = req.queryMap("filters2").value(); String emailsToStr = req.queryMap("emailTo").value(); @@ -794,6 +814,9 @@ public void init( lock.writeLock().lock(); try { + if (cancelRequest.get()) { + throw new IOException("Query cancelled."); + } String fullFilterStr = req.queryMap("filters").value(); String emailsToStr = req.queryMap("emailTo").value(); String emailsCcStr = req.queryMap("emailCC").value(); @@ -882,6 +905,9 @@ public void init( lock.writeLock().lock(); try { + if (cancelRequest.get()) { + throw new IOException("Query cancelled."); + } final String fullFilterStr = req.queryMap("filters").value(); final String histogramConditionsStr = req.queryMap("histogramConditions").value(); final String emailsToStr = req.queryMap("emailTo").value(); @@ -1025,6 +1051,9 @@ public void init( lock.writeLock().lock(); try { + if (cancelRequest.get()) { + throw new IOException("Query cancelled."); + } final String fullFilterStr = req.queryMap("filters").value(); final String[] filters = Helper.parseFilters(fullFilterStr); final String[] filterOps = Helper.parseFilterOps(fullFilterStr); @@ -1101,6 +1130,9 @@ public void init( lock.writeLock().lock(); try { + if (cancelRequest.get()) { + throw new IOException("Query cancelled."); + } final String fullFilterStr = req.queryMap("filters").value(); final String[] filters = Helper.parseFilters(fullFilterStr); final String[] filterOps = Helper.parseFilterOps(fullFilterStr); diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/web/NamenodeAnalyticsMethods.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/web/NamenodeAnalyticsMethods.java index 04ce09aa..924a670e 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/web/NamenodeAnalyticsMethods.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/analytics/web/NamenodeAnalyticsMethods.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.analytics.web; import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_APP_CONF; +import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_CANCEL_REQUEST; import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_HSQL_DRIVER; import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_NN_LOADER; import static org.apache.hadoop.hdfs.server.namenode.analytics.NameNodeAnalyticsHttpServer.NNA_OPERATION_SERVICE; @@ -409,6 +410,31 @@ public Response operations() { } } + /** QUERYGUARD endpoint is an admin-level endpoint meant to kill all running analysis queries. */ + @GET + @Path("/queryGaurd") + @Produces({MediaType.TEXT_PLAIN}) + public Response queryGuard() { + try { + before(); + final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST); + boolean guarding = !cancelRequest.get(); + cancelRequest.set(guarding); + if (guarding) { + return Response.ok( + "Guarding against queries. All queries after current processing will be killed.", + MediaType.TEXT_PLAIN) + .build(); + } else { + return Response.ok("All queries allowed.", MediaType.TEXT_PLAIN).build(); + } + } catch (Exception ex) { + return handleException(ex); + } finally { + after(); + } + } + /** * METRICS endpoint is meant to return information on the users and the amount of queries they are * making. @@ -585,6 +611,7 @@ public Response info() { @SuppressWarnings("unchecked") final List runningQueries = (List) context.getAttribute(NNA_RUNNING_QUERIES); + final AtomicBoolean cancelRequest = (AtomicBoolean) context.getAttribute(NNA_CANCEL_REQUEST); try { before(); @@ -602,6 +629,7 @@ public Response info() { boolean isProvidingSuggestions = nnLoader.getSuggestionsEngine().isLoaded(); sb.append("Current system time (ms): ").append(Time.now()).append("\n"); sb.append("Ready to service queries: ").append(isInit).append("\n"); + sb.append("Guarding against queries: ").append(cancelRequest.get()).append("\n"); sb.append("Ready to service history: ").append(isHistorical).append("\n"); sb.append("Ready to service suggestions: ").append(isProvidingSuggestions).append("\n\n"); if (isInit) { diff --git a/src/main/java/org/apache/hadoop/hdfs/server/namenode/queries/BaseQuery.java b/src/main/java/org/apache/hadoop/hdfs/server/namenode/queries/BaseQuery.java index c02b2d09..8d8f00b8 100644 --- a/src/main/java/org/apache/hadoop/hdfs/server/namenode/queries/BaseQuery.java +++ b/src/main/java/org/apache/hadoop/hdfs/server/namenode/queries/BaseQuery.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode.queries; +import java.util.Objects; + public class BaseQuery { private final String trackingUrl; @@ -52,9 +54,7 @@ public boolean equals(Object o) { BaseQuery baseQuery = (BaseQuery) o; - return (trackingUrl != null - ? trackingUrl.equals(baseQuery.trackingUrl) - : baseQuery.trackingUrl == null) - && (userName != null ? userName.equals(baseQuery.userName) : baseQuery.userName == null); + return (Objects.equals(trackingUrl, baseQuery.trackingUrl)) + && (Objects.equals(userName, baseQuery.userName)); } } diff --git a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java index f8110e58..6143c829 100644 --- a/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java +++ b/src/test/java/org/apache/hadoop/hdfs/server/namenode/analytics/TestNNAnalyticsBase.java @@ -24,6 +24,7 @@ import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.hamcrest.core.StringContains.containsString; @@ -47,6 +48,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.function.Function; import java.util.stream.Stream; import org.apache.commons.io.IOUtils; @@ -63,16 +68,19 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLoader; import org.apache.hadoop.hdfs.server.namenode.QueryEngine; import org.apache.hadoop.hdfs.server.namenode.queries.Transforms; +import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; +import org.apache.http.client.ResponseHandler; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; import org.hamcrest.CoreMatchers; import org.junit.AfterClass; import org.junit.Before; @@ -879,6 +887,50 @@ public void testMemoryConsumedHistogram() throws IOException { assertThat(res.getStatusLine().getStatusCode(), is(200)); } + @Test + public void testBlockQueries() throws IOException, ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newCachedThreadPool(); + ResponseHandler responseHandler = + response -> { + HttpEntity entity = response.getEntity(); + return entity == null ? null : EntityUtils.toString(entity); + }; + HttpGet get = new HttpGet("http://localhost:4567/histogram?set=all&type=memoryConsumed"); + List> futures = new ArrayList<>(100); + for (int i = 0; i < 100; i++) { + Future result = + executorService.submit( + () -> new DefaultHttpClient().execute(hostPort, get, responseHandler)); + futures.add(result); + } + + HttpGet killQuery = new HttpGet("http://localhost:4567/queryGuard"); + HttpResponse res = client.execute(hostPort, killQuery); + + assertThat(res.getStatusLine().getStatusCode(), is(200)); + List result = IOUtils.readLines(res.getEntity().getContent()); + System.out.println(result); + + int queryCancelled = 0; + for (Future future : futures) { + String output = future.get(); + if (output.contains("Query cancelled.")) { + queryCancelled++; + } + } + assertThat(queryCancelled, is(not(0))); // Some queries cancelled. + assertThat(queryCancelled, is(lessThan(100))); // Not all got cancelled. + + HttpGet allowQueries = new HttpGet("http://localhost:4567/queryGuard"); + HttpResponse allowRes = client.execute(hostPort, allowQueries); + + assertThat(allowRes.getStatusLine().getStatusCode(), is(200)); + List allowStr = IOUtils.readLines(allowRes.getEntity().getContent()); + System.out.println(allowStr); + assertThat(allowStr.size(), is(1)); + assertThat(allowStr.get(0), is("All queries allowed.")); + } + @Test public void testStorageTypeHistogram() throws IOException { HttpGet get = new HttpGet("http://localhost:4567/histogram?set=files&type=storageType");