diff --git a/build.gradle b/build.gradle index 336ceab5..22060d4c 100644 --- a/build.gradle +++ b/build.gradle @@ -15,7 +15,7 @@ buildscript { ext { - es_mv = '7.1' + es_mv = '7.2' es_group = "org.elasticsearch" es_version = '7.2.1' es_distribution = 'oss-zip' @@ -311,6 +311,8 @@ dependencies { checkstyle "com.puppycrawl.tools:checkstyle:${project.checkstyle.toolVersion}" } +compileJava.options.compilerArgs << "-Xlint:-deprecation,-rawtypes,-serial,-try,-unchecked" + apply plugin: 'nebula.ospackage' // This is afterEvaluate because the bundlePlugin ZIP task is updated afterEvaluate and changes the ZIP name to match the plugin name diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java index 843b7d82..0f2b4457 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorPlugin.java @@ -19,10 +19,8 @@ import com.amazon.opendistroforelasticsearch.ad.cluster.ADClusterEventListener; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData; import com.amazon.opendistroforelasticsearch.ad.cluster.ADMetaData.ADMetaDataDiff; -import com.amazon.opendistroforelasticsearch.ad.cluster.DailyCron; import com.amazon.opendistroforelasticsearch.ad.cluster.DeleteDetector; import com.amazon.opendistroforelasticsearch.ad.cluster.HashRing; -import com.amazon.opendistroforelasticsearch.ad.cluster.HourlyCron; import com.amazon.opendistroforelasticsearch.ad.cluster.MasterEventListener; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.dataprocessor.Interpolator; @@ -202,7 +200,7 @@ public Collection createComponents( NamedWriteableRegistry namedWriteableRegistry ) { Settings settings = environment.settings(); - ClientUtil clientUtil = new ClientUtil(settings); + ClientUtil clientUtil = new ClientUtil(settings, client); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); anomalyDetectionIndices = new AnomalyDetectionIndices(client, clusterService, threadPool, settings, clientUtil); this.clusterService = clusterService; @@ -267,8 +265,6 @@ public Collection createComponents( anomalyDetectorRunner = new AnomalyDetectorRunner(modelManager, featureManager); DeleteDetector deleteUtil = new DeleteDetector(clusterService, clock); - DailyCron dailyCron = new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL); - HourlyCron hourlyCron = new HourlyCron(clusterService, client); Map> stats = ImmutableMap .>builder() @@ -313,11 +309,9 @@ public Collection createComponents( runner, new ADClusterEventListener(clusterService, hashRing, modelManager), deleteUtil, - dailyCron, - hourlyCron, adCircuitBreakerService, adStats, - new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock) + new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil) ); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index f9ce8382..707daa49 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -20,6 +20,8 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -41,11 +43,13 @@ public class DailyCron implements Runnable { private final Clock clock; private final Client client; private final Duration checkpointTtl; + private final ClientUtil clientUtil; - public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl) { + public DailyCron(DeleteDetector deleteUtil, Clock clock, Client client, Duration checkpointTtl, ClientUtil clientUtil) { this.deleteUtil = deleteUtil; this.clock = clock; this.client = client; + this.clientUtil = clientUtil; this.checkpointTtl = checkpointTtl; } @@ -63,7 +67,7 @@ public void run() { ) ) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - client + clientUtil .execute( DeleteByQueryAction.INSTANCE, deleteRequest, diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index c1dc4169..a3735579 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -25,6 +25,7 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import org.elasticsearch.threadpool.ThreadPool; @@ -37,13 +38,15 @@ public class MasterEventListener implements LocalNodeMasterListener { private DeleteDetector deleteUtil; private Client client; private Clock clock; + private ClientUtil clientUtil; public MasterEventListener( ClusterService clusterService, ThreadPool threadPool, DeleteDetector deleteUtil, Client client, - Clock clock + Clock clock, + ClientUtil clientUtil ) { this.clusterService = clusterService; this.threadPool = threadPool; @@ -51,6 +54,7 @@ public MasterEventListener( this.client = client; this.clusterService.addLocalNodeMasterListener(this); this.clock = clock; + this.clientUtil = clientUtil; } @Override @@ -70,7 +74,7 @@ public void beforeStop() { if (dailyCron == null) { dailyCron = threadPool .scheduleWithFixedDelay( - new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL), + new DailyCron(deleteUtil, clock, client, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil), TimeValue.timeValueHours(24), executorName() ); diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java index 53126609..37431226 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/ClientUtil.java @@ -22,13 +22,17 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Function; import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.client.Client; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -37,14 +41,16 @@ public class ClientUtil { private volatile TimeValue requestTimeout; + private Client client; @Inject - public ClientUtil(Settings setting) { + public ClientUtil(Settings setting, Client client) { this.requestTimeout = REQUEST_TIMEOUT.get(setting); + this.client = client; } /** - * Generates a nonblocking request with a timeout. Blocking is not allowed in a + * Send a nonblocking request with a timeout and return response. Blocking is not allowed in a * transport call context. See BaseFuture.blockingAllowed * @param request request like index/search/get * @param LOG log @@ -86,4 +92,64 @@ public Optional throw new IllegalStateException(e1); } } + + /** + * Send an asynchronous request and handle response with the provided listener. + * @param ActionRequest + * @param ActionResponse + * @param request request body + * @param consumer request method, functional interface to operate as a client request like client::get + * @param listener needed to handle response + */ + public void asyncRequest( + Request request, + BiConsumer> consumer, + ActionListener listener + ) { + consumer + .accept( + request, + ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); }) + ); + } + + /** + * Execute a transport action and handle response with the provided listener. + * @param ActionRequest + * @param ActionResponse + * @param action transport action + * @param request request body + * @param listener needed to handle response + */ + public void execute( + Action action, + Request request, + ActionListener listener + ) { + client + .execute( + action, + request, + ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); }) + ); + } + + /** + * Send an synchronous request and handle response with the provided listener. + * + * @deprecated use asyncRequest with listener instead. + * + * @param ActionRequest + * @param ActionResponse + * @param request request body + * @param function request method, functional interface to operate as a client request like client::get + * @return the response + */ + @Deprecated + public Response syncRequest( + Request request, + Function> function + ) { + return function.apply(request).actionGet(requestTimeout); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java index 8deab795..95084c20 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCronTests.java @@ -27,6 +27,7 @@ import java.util.Arrays; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; @@ -59,7 +60,8 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { DeleteDetector deleteUtil = mock(DeleteDetector.class); Clock clock = mock(Clock.class); Client client = mock(Client.class); - DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24)); + ClientUtil clientUtil = mock(ClientUtil.class); + DailyCron cron = new DailyCron(deleteUtil, clock, client, Duration.ofHours(24), clientUtil); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -79,7 +81,7 @@ private void templateDailyCron(DailyCronTestExecutionMode mode) { } return null; - }).when(client).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); + }).when(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); doNothing().when(deleteUtil).deleteDetectorResult(eq(client)); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 31be4f61..0a0a4a17 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -27,6 +27,7 @@ import java.util.Arrays; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -44,6 +45,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Cancellable hourlyCancellable; private Cancellable dailyCancellable; private MasterEventListener masterService; + private ClientUtil clientUtil; @Override @Before @@ -59,7 +61,8 @@ public void setUp() throws Exception { deleteUtil = mock(DeleteDetector.class); client = mock(Client.class); clock = mock(Clock.class); - masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock); + clientUtil = mock(ClientUtil.class); + masterService = new MasterEventListener(clusterService, threadPool, deleteUtil, client, clock, clientUtil); } public void testOnOffMaster() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java index 6fe8fde9..1ddf0396 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -15,6 +15,8 @@ package com.amazon.opendistroforelasticsearch.ad.indices; +import static org.mockito.Mockito.mock; + import com.amazon.opendistroforelasticsearch.ad.TestHelpers; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; @@ -24,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -46,6 +49,7 @@ public class AnomalyDetectionIndicesTests extends ESIntegTestCase { private ClientUtil requestUtil; private Settings settings; private ClusterService clusterService; + private Client client; @Before public void setup() { @@ -65,7 +69,8 @@ public void setup() { clusterSettings.add(AnomalyDetectorSettings.REQUEST_TIMEOUT); clusterSetting = new ClusterSettings(settings, clusterSettings); clusterService = TestHelpers.createClusterService(client().threadPool(), clusterSetting); - requestUtil = new ClientUtil(settings); + client = mock(Client.class); + requestUtil = new ClientUtil(settings, client); indices = new AnomalyDetectionIndices(client(), clusterService, client().threadPool(), settings, requestUtil); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java index 0623a09b..03267257 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStateManagerTests.java @@ -86,7 +86,15 @@ public void setUp() throws Exception { .build(); clock = mock(Clock.class); duration = Duration.ofHours(1); - stateManager = new ADStateManager(client, xContentRegistry(), modelManager, settings, new ClientUtil(settings), clock, duration); + stateManager = new ADStateManager( + client, + xContentRegistry(), + modelManager, + settings, + new ClientUtil(settings, client), + clock, + duration + ); } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java index 1514949a..0388da77 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/ADStatsTransportActionTests.java @@ -25,6 +25,7 @@ import com.amazon.opendistroforelasticsearch.ad.util.IndexUtils; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.transport.TransportService; @@ -53,7 +54,8 @@ public class ADStatsTransportActionTests extends ESIntegTestCase { public void setUp() throws Exception { super.setUp(); - IndexUtils indexUtils = new IndexUtils(client(), new ClientUtil(Settings.EMPTY), clusterService()); + Client client = client(); + IndexUtils indexUtils = new IndexUtils(client, new ClientUtil(Settings.EMPTY, client), clusterService()); ModelManager modelManager = mock(ModelManager.class); clusterStatName1 = "clusterStat1"; diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java index eced901c..e49973bc 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyResultTests.java @@ -233,7 +233,7 @@ public void setUp() throws Exception { indexNameResolver = new IndexNameExpressionResolver(); - ClientUtil clientUtil = new ClientUtil(Settings.EMPTY); + ClientUtil clientUtil = new ClientUtil(Settings.EMPTY, client); IndexUtils indexUtils = new IndexUtils(client, clientUtil, clusterService); Map> statsMap = new HashMap>() { diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java index 96f71ee7..896a1a8a 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/util/IndexUtilsTests.java @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.ad.util; import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ESIntegTestCase; import org.junit.Before; @@ -27,7 +28,8 @@ public class IndexUtilsTests extends ESIntegTestCase { @Before public void setup() { - clientUtil = new ClientUtil(Settings.EMPTY); + Client client = client(); + clientUtil = new ClientUtil(Settings.EMPTY, client); } @Test