From 2f7f33970177394ca26117de54ffbcd8a54a4a49 Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Thu, 15 Oct 2020 07:52:25 -0700 Subject: [PATCH 1/7] Add checkpoint index retention for multi entity detector --- .../ad/cluster/DailyCron.java | 1 + .../ad/cluster/MasterEventListener.java | 27 ++-- .../ad/cluster/diskcleanup/IndexCleanup.java | 121 ++++++++++++++++++ .../ModelCheckpointIndexRetention.java | 102 +++++++++++++++ .../ad/cluster/MasterEventListenerTests.java | 18 +-- .../diskcleanup/IndexCleanupTests.java | 120 +++++++++++++++++ .../ModelCheckpointIndexRetentionTests.java | 98 ++++++++++++++ 7 files changed, 468 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java create mode 100644 src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java index 9b22a38e..646c6a0f 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/DailyCron.java @@ -31,6 +31,7 @@ import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +@Deprecated public class DailyCron implements Runnable { private static final Logger LOG = LogManager.getLogger(DailyCron.class); protected static final String FIELD_MODEL = "queue"; diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index 3dbba85c..876679cd 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -17,6 +17,9 @@ import java.time.Clock; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; +import com.google.common.annotations.VisibleForTesting; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; @@ -31,7 +34,7 @@ public class MasterEventListener implements LocalNodeMasterListener { - private Cancellable dailyCron; + private Cancellable checkpointIndexRetentionCron; private Cancellable hourlyCron; private ClusterService clusterService; private ThreadPool threadPool; @@ -70,18 +73,19 @@ public void beforeStop() { }); } - if (dailyCron == null) { - dailyCron = threadPool + if (checkpointIndexRetentionCron == null) { + IndexCleanup indexCleanup = new IndexCleanup(client, clientUtil, clusterService); + checkpointIndexRetentionCron = threadPool .scheduleWithFixedDelay( - new DailyCron(clock, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil), + new ModelCheckpointIndexRetention(AnomalyDetectorSettings.CHECKPOINT_TTL, clock, indexCleanup), TimeValue.timeValueHours(24), executorName() ); clusterService.addLifecycleListener(new LifecycleListener() { @Override public void beforeStop() { - cancel(dailyCron); - dailyCron = null; + cancel(checkpointIndexRetentionCron); + checkpointIndexRetentionCron = null; } }); } @@ -90,9 +94,9 @@ public void beforeStop() { @Override public void offMaster() { cancel(hourlyCron); - cancel(dailyCron); + cancel(checkpointIndexRetentionCron); hourlyCron = null; - dailyCron = null; + checkpointIndexRetentionCron = null; } private void cancel(Cancellable cron) { @@ -101,11 +105,12 @@ private void cancel(Cancellable cron) { } } - public Cancellable getDailyCron() { - return dailyCron; + @VisibleForTesting + protected Cancellable getCheckpointIndexRetentionCron() { + return checkpointIndexRetentionCron; } - public Cancellable getHourlyCron() { + protected Cancellable getHourlyCron() { return hourlyCron; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java new file mode 100644 index 00000000..f57e7598 --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java @@ -0,0 +1,121 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.store.StoreStats; + +import java.util.Arrays; +import java.util.Objects; + +/** + * Clean up the old docs for indices. + */ +public class IndexCleanup { + private static final Logger LOG = LogManager.getLogger(IndexCleanup.class); + + private final Client client; + private final ClientUtil clientUtil; + private final ClusterService clusterService; + + public IndexCleanup(Client client, ClientUtil clientUtil, ClusterService clusterService) { + this.client = client; + this.clientUtil = clientUtil; + this.clusterService = clusterService; + } + + /** + * delete docs when shard size is bigger than max limitation. + * @param indexName index name + * @param maxShardSize max shard size + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsBasedOnShardSize(String indexName, long maxShardSize, QueryBuilder queryForDeleteByQueryRequest, + ActionListener listener) { + + if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { + LOG.debug("skip as the index:{} doesn't exist", indexName); + return; + } + + ActionListener indicesStatsResponseListener = ActionListener.wrap(indicesStatsResponse -> { + //Check if any shard size is bigger than maxShardSize + boolean cleanupNeeded = Arrays.stream(indicesStatsResponse.getShards()) + .map(ShardStats::getStats) + .filter(Objects::nonNull) + .map(CommonStats::getStore) + .filter(Objects::nonNull) + .map(StoreStats::getSizeInBytes) + .anyMatch(size -> size > maxShardSize); + + if (cleanupNeeded) { + deleteDocsByQuery(indexName, queryForDeleteByQueryRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); + } else { + listener.onResponse(false); + } + }, listener::onFailure); + + getCheckpointShardStoreStats(indexName, indicesStatsResponseListener); + } + + private void getCheckpointShardStoreStats(String indexName, ActionListener listener) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.store(); + indicesStatsRequest.indices(indexName); + client.admin().indices().stats(indicesStatsRequest, listener); + } + + /** + * Delete docs based on query request + * @param indexName index name + * @param queryForDeleteByQueryRequest query request + * @param listener action listener + */ + public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener listener) { + DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName) + .setQuery(queryForDeleteByQueryRequest) + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + clientUtil + .execute( + DeleteByQueryAction.INSTANCE, + deleteRequest, + ActionListener + .wrap( + response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); + listener.onResponse(response.getDeleted()); + }, + listener::onFailure + ) + ); + } +} diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java new file mode 100644 index 00000000..ef155bec --- /dev/null +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -0,0 +1,102 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.index.query.QueryBuilders; + +import java.time.Clock; +import java.time.Duration; + +/** + * Model checkpoints cleanup of multi-entity detectors. + *

Problem: + * In multi-entity detectors, we can have thousands, even millions of entities, of which the model checkpoints will consume + * lots of disk resources. To protect the our disk usage, the checkpoint index size will be limited with specified threshold. + * Once its size exceeds the threshold, the model checkpoints cleanup process will be activated. + *

+ *

Solution: + * Before multi-entity detectors, there is daily cron job to clean up the inactive checkpoints longer than some configurable days. + * We will keep the this logic, and add new clean up way based on shard size. + *

+ */ +public class ModelCheckpointIndexRetention implements Runnable{ + private static final Logger LOG = LogManager.getLogger(ModelCheckpointIndexRetention.class); + + //The recommended max shard size is 50G, we don't wanna our index exceeds this number + private static final long MAX_SHARD_SIZE_IN_BYTE = 50 * 1024 * 1024 * 1024L; + //We can't clean up all of the checkpoints. At least keep models for 1 day + private static final Duration MINIMUM_CHECKPOINT_TTL = Duration.ofDays(1); + + private final Duration defaultCheckpointTtl; + private final Clock clock; + private final IndexCleanup indexCleanup; + + public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, + IndexCleanup indexCleanup) { + this.defaultCheckpointTtl = defaultCheckpointTtl; + this.clock = clock; + this.indexCleanup = indexCleanup; + } + + @Override + public void run() { + indexCleanup.deleteDocsByQuery(CommonName.CHECKPOINT_INDEX_NAME, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders + .rangeQuery(CheckpointDao.TIMESTAMP) + .lte(clock.millis() - defaultCheckpointTtl.toMillis()) + .format(CommonName.EPOCH_MILLIS_FORMAT) + ), + ActionListener.wrap(response -> { + cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); + }, exception -> LOG.error("delete docs by query fails for checkpoint index", exception))); + + } + + private void cleanupBasedOnShardSize(Duration cleanUpTtl) { + indexCleanup.deleteDocsBasedOnShardSize(CommonName.CHECKPOINT_INDEX_NAME, MAX_SHARD_SIZE_IN_BYTE, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders + .rangeQuery(CheckpointDao.TIMESTAMP) + .lte(clock.millis() - cleanUpTtl.toMillis()) + .format(CommonName.EPOCH_MILLIS_FORMAT) + ), + ActionListener.wrap(cleanupNeeded -> { + if (cleanupNeeded) { + if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) { + return; + } + + Duration nextCleanupTtl = cleanUpTtl.minusDays(1); + if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) { + nextCleanupTtl = MINIMUM_CHECKPOINT_TTL; + } + cleanupBasedOnShardSize(nextCleanupTtl); + } else { + LOG.debug("clean up not needed anymore for checkpoint index"); + } + }, exception -> LOG.error("checkpoint index retention based on shard size fails", exception))); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 385b45ee..5aff9872 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashMap; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -46,7 +47,7 @@ public class MasterEventListenerTests extends AbstractADTest { private Client client; private Clock clock; private Cancellable hourlyCancellable; - private Cancellable dailyCancellable; + private Cancellable checkpointIndexRetentionCancellable; private MasterEventListener masterService; private ClientUtil clientUtil; private DiscoveryNodeFilterer nodeFilter; @@ -58,10 +59,11 @@ public void setUp() throws Exception { clusterService = mock(ClusterService.class); threadPool = mock(ThreadPool.class); hourlyCancellable = mock(Cancellable.class); - dailyCancellable = mock(Cancellable.class); + checkpointIndexRetentionCancellable = mock(Cancellable.class); when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class))) .thenReturn(hourlyCancellable); - when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable); + when(threadPool.scheduleWithFixedDelay(any(ModelCheckpointIndexRetention.class), any(TimeValue.class), any(String.class))) + .thenReturn(checkpointIndexRetentionCancellable); client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); @@ -75,11 +77,11 @@ public void setUp() throws Exception { public void testOnOffMaster() { masterService.onMaster(); assertThat(hourlyCancellable, is(notNullValue())); - assertThat(dailyCancellable, is(notNullValue())); + assertThat(checkpointIndexRetentionCancellable, is(notNullValue())); assertTrue(!masterService.getHourlyCron().isCancelled()); - assertTrue(!masterService.getDailyCron().isCancelled()); + assertTrue(!masterService.getCheckpointIndexRetentionCron().isCancelled()); masterService.offMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); } @@ -100,10 +102,10 @@ public void testBeforeStop() { }).when(clusterService).addLifecycleListener(any()); masterService.onMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); masterService.offMaster(); - assertThat(masterService.getDailyCron(), is(nullValue())); + assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue())); assertThat(masterService.getHourlyCron(), is(nullValue())); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java new file mode 100644 index 00000000..cc9cca60 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -0,0 +1,120 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.stats.CommonStats; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.ShardStats; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.index.reindex.DeleteByQueryAction; +import org.elasticsearch.index.store.StoreStats; +import org.junit.Assert; +import org.mockito.Answers; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class IndexCleanupTests extends AbstractADTest { + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + Client client; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ClusterService clusterService; + + @Mock(answer = Answers.RETURNS_DEEP_STUBS) + ClientUtil clientUtil; + + @InjectMocks + IndexCleanup indexCleanup; + + @Mock + IndicesStatsResponse indicesStatsResponse; + + @Mock + ShardStats shardStats; + + @Mock + CommonStats commonStats; + + @Mock + StoreStats storeStats; + + @Mock + IndicesAdminClient indicesAdminClient; + + @SuppressWarnings("unchecked") + @Override + public void setUp() throws Exception { + super.setUp(); + super.setUpLog4jForJUnit(IndexCleanup.class); + MockitoAnnotations.initMocks(this); + when(clusterService.state().getRoutingTable().hasIndex(anyString())).thenReturn(true); + indexCleanup = new IndexCleanup(client, clientUtil, clusterService); + when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[]{ + shardStats + }); + when(shardStats.getStats()).thenReturn(commonStats); + when(commonStats.getStore()).thenReturn(storeStats); + when(client.admin().indices()).thenReturn(indicesAdminClient); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + listener.onResponse(indicesStatsResponse); + return null; + }).when(indicesAdminClient).stats(any(), any()); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsTrue() throws Exception { + long maxShardSize = 1000; + when(storeStats.getSizeInBytes()).thenReturn(maxShardSize +1 ); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, + ActionListener.wrap(result -> { + assertTrue(result); + verify(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); + }, exception -> {throw new RuntimeException(exception);})); + } + + public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsFalse() throws Exception { + long maxShardSize = 1000; + when(storeStats.getSizeInBytes()).thenReturn(maxShardSize - 1); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, + ActionListener.wrap(Assert::assertFalse, exception -> {throw new RuntimeException(exception);})); + } + + public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { + when(clusterService.state().getRoutingTable().hasIndex(anyString())).thenReturn(false); + indexCleanup.deleteDocsBasedOnShardSize("indexname", 1000, null, null); + assertTrue(testAppender.containsMessage("skip as the index:indexname doesn't exist")); + } +} \ No newline at end of file diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java new file mode 100644 index 00000000..0c5e6539 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java @@ -0,0 +1,98 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import org.elasticsearch.action.ActionListener; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.time.Clock; +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class ModelCheckpointIndexRetentionTests extends AbstractADTest { + + Duration defaultCheckpointTtl = Duration.ofDays(3); + + Clock clock = Clock.systemUTC(); + + @Mock + IndexCleanup indexCleanup; + + ModelCheckpointIndexRetention modelCheckpointIndexRetention; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + super.setUp(); + super.setUpLog4jForJUnit(IndexCleanup.class); + MockitoAnnotations.initMocks(this); + modelCheckpointIndexRetention = new ModelCheckpointIndexRetention(defaultCheckpointTtl, clock, indexCleanup); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[2]; + listener.onResponse(1L); + return null; + }).when(indexCleanup).deleteDocsByQuery(anyString(), any(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + super.tearDownLog4jForJUnit(); + } + + @SuppressWarnings("unchecked") + @Test + public void testRunWithCleanupAsNeeded() throws Exception { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[3]; + listener.onResponse(true); + return null; + }).when(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + + modelCheckpointIndexRetention.run(); + verify(indexCleanup, times(2)).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), + eq(50 * 1024 * 1024 * 1024L), any(), any()); + verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testRunWithCleanupAsFalse() throws Exception { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[3]; + listener.onResponse(false); + return null; + }).when(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + + modelCheckpointIndexRetention.run(); + verify(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); + verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); + } +} \ No newline at end of file From 2773a92369f2f7cdb703efd5bd1d81afd0645438 Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Tue, 20 Oct 2020 10:31:05 -0700 Subject: [PATCH 2/7] run splotlessJavaApply to fix style issues --- .../ad/cluster/MasterEventListener.java | 6 +- .../ad/cluster/diskcleanup/IndexCleanup.java | 46 +++++----- .../ModelCheckpointIndexRetention.java | 89 ++++++++++--------- .../ad/cluster/MasterEventListenerTests.java | 4 +- .../diskcleanup/IndexCleanupTests.java | 44 ++++----- .../ModelCheckpointIndexRetentionTests.java | 30 ++++--- 6 files changed, 117 insertions(+), 102 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java index 876679cd..e35d53fa 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListener.java @@ -17,9 +17,6 @@ import java.time.Clock; -import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup; -import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; -import com.google.common.annotations.VisibleForTesting; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.LocalNodeMasterListener; import org.elasticsearch.cluster.service.ClusterService; @@ -28,9 +25,12 @@ import org.elasticsearch.threadpool.Scheduler.Cancellable; import org.elasticsearch.threadpool.ThreadPool; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; +import com.google.common.annotations.VisibleForTesting; public class MasterEventListener implements LocalNodeMasterListener { diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java index f57e7598..8dfaf050 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java @@ -15,7 +15,9 @@ package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; -import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import java.util.Arrays; +import java.util.Objects; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; @@ -31,8 +33,7 @@ import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.store.StoreStats; -import java.util.Arrays; -import java.util.Objects; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; /** * Clean up the old docs for indices. @@ -57,8 +58,12 @@ public IndexCleanup(Client client, ClientUtil clientUtil, ClusterService cluster * @param queryForDeleteByQueryRequest query request * @param listener action listener */ - public void deleteDocsBasedOnShardSize(String indexName, long maxShardSize, QueryBuilder queryForDeleteByQueryRequest, - ActionListener listener) { + public void deleteDocsBasedOnShardSize( + String indexName, + long maxShardSize, + QueryBuilder queryForDeleteByQueryRequest, + ActionListener listener + ) { if (!clusterService.state().getRoutingTable().hasIndex(indexName)) { LOG.debug("skip as the index:{} doesn't exist", indexName); @@ -66,8 +71,9 @@ public void deleteDocsBasedOnShardSize(String indexName, long maxShardSize, Quer } ActionListener indicesStatsResponseListener = ActionListener.wrap(indicesStatsResponse -> { - //Check if any shard size is bigger than maxShardSize - boolean cleanupNeeded = Arrays.stream(indicesStatsResponse.getShards()) + // Check if any shard size is bigger than maxShardSize + boolean cleanupNeeded = Arrays + .stream(indicesStatsResponse.getShards()) .map(ShardStats::getStats) .filter(Objects::nonNull) .map(CommonStats::getStore) @@ -76,8 +82,11 @@ public void deleteDocsBasedOnShardSize(String indexName, long maxShardSize, Quer .anyMatch(size -> size > maxShardSize); if (cleanupNeeded) { - deleteDocsByQuery(indexName, queryForDeleteByQueryRequest, - ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)); + deleteDocsByQuery( + indexName, + queryForDeleteByQueryRequest, + ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure) + ); } else { listener.onResponse(false); } @@ -103,19 +112,10 @@ public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQue DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName) .setQuery(queryForDeleteByQueryRequest) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - clientUtil - .execute( - DeleteByQueryAction.INSTANCE, - deleteRequest, - ActionListener - .wrap( - response -> { - // if 0 docs get deleted, it means our query cannot find any matching doc - LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); - listener.onResponse(response.getDeleted()); - }, - listener::onFailure - ) - ); + clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); + listener.onResponse(response.getDeleted()); + }, listener::onFailure)); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java index ef155bec..cc3f1ff8 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -15,15 +15,16 @@ package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; -import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; -import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; +import java.time.Clock; +import java.time.Duration; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.index.query.QueryBuilders; -import java.time.Clock; -import java.time.Duration; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao; /** * Model checkpoints cleanup of multi-entity detectors. @@ -37,20 +38,19 @@ * We will keep the this logic, and add new clean up way based on shard size. *

*/ -public class ModelCheckpointIndexRetention implements Runnable{ +public class ModelCheckpointIndexRetention implements Runnable { private static final Logger LOG = LogManager.getLogger(ModelCheckpointIndexRetention.class); - //The recommended max shard size is 50G, we don't wanna our index exceeds this number + // The recommended max shard size is 50G, we don't wanna our index exceeds this number private static final long MAX_SHARD_SIZE_IN_BYTE = 50 * 1024 * 1024 * 1024L; - //We can't clean up all of the checkpoints. At least keep models for 1 day + // We can't clean up all of the checkpoints. At least keep models for 1 day private static final Duration MINIMUM_CHECKPOINT_TTL = Duration.ofDays(1); private final Duration defaultCheckpointTtl; private final Clock clock; private final IndexCleanup indexCleanup; - public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, - IndexCleanup indexCleanup) { + public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, IndexCleanup indexCleanup) { this.defaultCheckpointTtl = defaultCheckpointTtl; this.clock = clock; this.indexCleanup = indexCleanup; @@ -58,45 +58,54 @@ public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, @Override public void run() { - indexCleanup.deleteDocsByQuery(CommonName.CHECKPOINT_INDEX_NAME, - QueryBuilders - .boolQuery() - .filter( - QueryBuilders + indexCleanup + .deleteDocsByQuery( + CommonName.CHECKPOINT_INDEX_NAME, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders .rangeQuery(CheckpointDao.TIMESTAMP) .lte(clock.millis() - defaultCheckpointTtl.toMillis()) .format(CommonName.EPOCH_MILLIS_FORMAT) - ), - ActionListener.wrap(response -> { - cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); - }, exception -> LOG.error("delete docs by query fails for checkpoint index", exception))); + ), + ActionListener + .wrap( + response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, + exception -> LOG.error("delete docs by query fails for checkpoint index", exception) + ) + ); } private void cleanupBasedOnShardSize(Duration cleanUpTtl) { - indexCleanup.deleteDocsBasedOnShardSize(CommonName.CHECKPOINT_INDEX_NAME, MAX_SHARD_SIZE_IN_BYTE, - QueryBuilders - .boolQuery() - .filter( - QueryBuilders - .rangeQuery(CheckpointDao.TIMESTAMP) - .lte(clock.millis() - cleanUpTtl.toMillis()) - .format(CommonName.EPOCH_MILLIS_FORMAT) - ), - ActionListener.wrap(cleanupNeeded -> { - if (cleanupNeeded) { - if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) { - return; - } + indexCleanup + .deleteDocsBasedOnShardSize( + CommonName.CHECKPOINT_INDEX_NAME, + MAX_SHARD_SIZE_IN_BYTE, + QueryBuilders + .boolQuery() + .filter( + QueryBuilders + .rangeQuery(CheckpointDao.TIMESTAMP) + .lte(clock.millis() - cleanUpTtl.toMillis()) + .format(CommonName.EPOCH_MILLIS_FORMAT) + ), + ActionListener.wrap(cleanupNeeded -> { + if (cleanupNeeded) { + if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) { + return; + } - Duration nextCleanupTtl = cleanUpTtl.minusDays(1); - if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) { - nextCleanupTtl = MINIMUM_CHECKPOINT_TTL; + Duration nextCleanupTtl = cleanUpTtl.minusDays(1); + if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) { + nextCleanupTtl = MINIMUM_CHECKPOINT_TTL; + } + cleanupBasedOnShardSize(nextCleanupTtl); + } else { + LOG.debug("clean up not needed anymore for checkpoint index"); } - cleanupBasedOnShardSize(nextCleanupTtl); - } else { - LOG.debug("clean up not needed anymore for checkpoint index"); - } - }, exception -> LOG.error("checkpoint index retention based on shard size fails", exception))); + }, exception -> LOG.error("checkpoint index retention based on shard size fails", exception)) + ); } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java index 5aff9872..eb1f8dfb 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/MasterEventListenerTests.java @@ -27,7 +27,6 @@ import java.util.Arrays; import java.util.HashMap; -import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.LifecycleListener; @@ -37,6 +36,7 @@ import org.junit.Before; import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention; import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; @@ -63,7 +63,7 @@ public void setUp() throws Exception { when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class))) .thenReturn(hourlyCancellable); when(threadPool.scheduleWithFixedDelay(any(ModelCheckpointIndexRetention.class), any(TimeValue.class), any(String.class))) - .thenReturn(checkpointIndexRetentionCancellable); + .thenReturn(checkpointIndexRetentionCancellable); client = mock(Client.class); clock = mock(Clock.class); clientUtil = mock(ClientUtil.class); diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java index cc9cca60..8d7239ff 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -12,10 +12,16 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ + package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; -import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; -import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.stats.CommonStats; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; @@ -31,12 +37,8 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil; public class IndexCleanupTests extends AbstractADTest { @@ -75,9 +77,7 @@ public void setUp() throws Exception { MockitoAnnotations.initMocks(this); when(clusterService.state().getRoutingTable().hasIndex(anyString())).thenReturn(true); indexCleanup = new IndexCleanup(client, clientUtil, clusterService); - when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[]{ - shardStats - }); + when(indicesStatsResponse.getShards()).thenReturn(new ShardStats[] { shardStats }); when(shardStats.getStats()).thenReturn(commonStats); when(commonStats.getStore()).thenReturn(storeStats); when(client.admin().indices()).thenReturn(indicesAdminClient); @@ -97,19 +97,23 @@ public void tearDown() throws Exception { public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsTrue() throws Exception { long maxShardSize = 1000; - when(storeStats.getSizeInBytes()).thenReturn(maxShardSize +1 ); - indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, - ActionListener.wrap(result -> { - assertTrue(result); - verify(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); - }, exception -> {throw new RuntimeException(exception);})); + when(storeStats.getSizeInBytes()).thenReturn(maxShardSize + 1); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, ActionListener.wrap(result -> { + assertTrue(result); + verify(clientUtil).execute(eq(DeleteByQueryAction.INSTANCE), any(), any()); + }, exception -> { throw new RuntimeException(exception); })); } public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsFalse() throws Exception { long maxShardSize = 1000; when(storeStats.getSizeInBytes()).thenReturn(maxShardSize - 1); - indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, - ActionListener.wrap(Assert::assertFalse, exception -> {throw new RuntimeException(exception);})); + indexCleanup + .deleteDocsBasedOnShardSize( + "indexname", + maxShardSize, + null, + ActionListener.wrap(Assert::assertFalse, exception -> { throw new RuntimeException(exception); }) + ); } public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { @@ -117,4 +121,4 @@ public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { indexCleanup.deleteDocsBasedOnShardSize("indexname", 1000, null, null); assertTrue(testAppender.containsMessage("skip as the index:indexname doesn't exist")); } -} \ No newline at end of file +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java index 0c5e6539..24bd6848 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetentionTests.java @@ -12,10 +12,19 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ + package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup; -import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; -import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.time.Clock; +import java.time.Duration; + import org.elasticsearch.action.ActionListener; import org.junit.After; import org.junit.Before; @@ -23,15 +32,8 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import java.time.Clock; -import java.time.Duration; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; public class ModelCheckpointIndexRetentionTests extends AbstractADTest { @@ -76,8 +78,8 @@ public void testRunWithCleanupAsNeeded() throws Exception { }).when(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); modelCheckpointIndexRetention.run(); - verify(indexCleanup, times(2)).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), - eq(50 * 1024 * 1024 * 1024L), any(), any()); + verify(indexCleanup, times(2)) + .deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); } @@ -95,4 +97,4 @@ public void testRunWithCleanupAsFalse() throws Exception { verify(indexCleanup).deleteDocsBasedOnShardSize(eq(CommonName.CHECKPOINT_INDEX_NAME), eq(50 * 1024 * 1024 * 1024L), any(), any()); verify(indexCleanup).deleteDocsByQuery(eq(CommonName.CHECKPOINT_INDEX_NAME), any(), any()); } -} \ No newline at end of file +} From a819d8d062a87fa897fcc819cf7360c8feb20760 Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Tue, 20 Oct 2020 19:45:53 -0700 Subject: [PATCH 3/7] set refresh as true --- .../ad/cluster/diskcleanup/IndexCleanup.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java index 8dfaf050..d95c528a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java @@ -111,7 +111,8 @@ private void getCheckpointShardStoreStats(String indexName, ActionListener listener) { DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName) .setQuery(queryForDeleteByQueryRequest) - .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); + .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) + .setRefresh(true); clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { // if 0 docs get deleted, it means our query cannot find any matching doc LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); From c9e13f26f1473181d1c2a85c14e2eef0b5af40bc Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Fri, 6 Nov 2020 13:22:54 -0800 Subject: [PATCH 4/7] stash context for index clean up --- .../ad/cluster/diskcleanup/IndexCleanup.java | 15 ++++++++++----- .../ad/cluster/diskcleanup/IndexCleanupTests.java | 4 ++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java index d95c528a..7044f459 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanup.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -113,10 +114,14 @@ public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQue .setQuery(queryForDeleteByQueryRequest) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN) .setRefresh(true); - clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { - // if 0 docs get deleted, it means our query cannot find any matching doc - LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); - listener.onResponse(response.getDeleted()); - }, listener::onFailure)); + + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName); + listener.onResponse(response.getDeleted()); + }, listener::onFailure)); + } + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java index 8d7239ff..a5803660 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -19,6 +19,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -29,6 +30,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.reindex.DeleteByQueryAction; import org.elasticsearch.index.store.StoreStats; import org.junit.Assert; @@ -81,6 +84,7 @@ public void setUp() throws Exception { when(shardStats.getStats()).thenReturn(commonStats); when(commonStats.getStore()).thenReturn(storeStats); when(client.admin().indices()).thenReturn(indicesAdminClient); + when(client.threadPool().getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); ActionListener listener = (ActionListener) args[1]; From facd5c5593e3ff3bfb86ef9bb5c66ae98560d77e Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Fri, 6 Nov 2020 13:28:30 -0800 Subject: [PATCH 5/7] remove useless import to fix checkstyle error --- .../ad/cluster/diskcleanup/IndexCleanupTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java index a5803660..fe181f1e 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -19,7 +19,6 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.eq; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; From 4dbb6523b1cbad6511267c98cb8eaa5473ca00f4 Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Fri, 6 Nov 2020 14:22:25 -0800 Subject: [PATCH 6/7] Add comments for error handling --- .../cluster/diskcleanup/ModelCheckpointIndexRetention.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java index cc3f1ff8..83c8fbc9 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -72,6 +72,7 @@ public void run() { ActionListener .wrap( response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, + //The docs will be deleted in next scheduled windows. No need for retrying. exception -> LOG.error("delete docs by query fails for checkpoint index", exception) ) ); @@ -105,7 +106,9 @@ private void cleanupBasedOnShardSize(Duration cleanUpTtl) { } else { LOG.debug("clean up not needed anymore for checkpoint index"); } - }, exception -> LOG.error("checkpoint index retention based on shard size fails", exception)) + }, + //The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("checkpoint index retention based on shard size fails", exception)) ); } } From 7c24c8bcedb4654e97617a05f3bba448f793b362 Mon Sep 17 00:00:00 2001 From: Weicong Sun Date: Fri, 6 Nov 2020 14:29:20 -0800 Subject: [PATCH 7/7] Fix format error --- .../cluster/diskcleanup/ModelCheckpointIndexRetention.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java index 83c8fbc9..8db2ab82 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -72,7 +72,7 @@ public void run() { ActionListener .wrap( response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, - //The docs will be deleted in next scheduled windows. No need for retrying. + // The docs will be deleted in next scheduled windows. No need for retrying. exception -> LOG.error("delete docs by query fails for checkpoint index", exception) ) ); @@ -107,8 +107,9 @@ private void cleanupBasedOnShardSize(Duration cleanUpTtl) { LOG.debug("clean up not needed anymore for checkpoint index"); } }, - //The docs will be deleted in next scheduled windows. No need for retrying. - exception -> LOG.error("checkpoint index retention based on shard size fails", exception)) + // The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("checkpoint index retention based on shard size fails", exception) + ) ); } }