Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Add checkpoint index retention for multi entity detector (#283)
Browse files Browse the repository at this point in the history
  • Loading branch information
weicongs-amazon authored Nov 6, 2020
1 parent 75d564b commit f0105cc
Show file tree
Hide file tree
Showing 7 changed files with 496 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

@Deprecated
public class DailyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(DailyCron.class);
protected static final String FIELD_MODEL = "queue";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool;

import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.IndexCleanup;
import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
import com.google.common.annotations.VisibleForTesting;

public class MasterEventListener implements LocalNodeMasterListener {

private Cancellable dailyCron;
private Cancellable checkpointIndexRetentionCron;
private Cancellable hourlyCron;
private ClusterService clusterService;
private ThreadPool threadPool;
Expand Down Expand Up @@ -70,18 +73,19 @@ public void beforeStop() {
});
}

if (dailyCron == null) {
dailyCron = threadPool
if (checkpointIndexRetentionCron == null) {
IndexCleanup indexCleanup = new IndexCleanup(client, clientUtil, clusterService);
checkpointIndexRetentionCron = threadPool
.scheduleWithFixedDelay(
new DailyCron(clock, AnomalyDetectorSettings.CHECKPOINT_TTL, clientUtil),
new ModelCheckpointIndexRetention(AnomalyDetectorSettings.CHECKPOINT_TTL, clock, indexCleanup),
TimeValue.timeValueHours(24),
executorName()
);
clusterService.addLifecycleListener(new LifecycleListener() {
@Override
public void beforeStop() {
cancel(dailyCron);
dailyCron = null;
cancel(checkpointIndexRetentionCron);
checkpointIndexRetentionCron = null;
}
});
}
Expand All @@ -90,9 +94,9 @@ public void beforeStop() {
@Override
public void offMaster() {
cancel(hourlyCron);
cancel(dailyCron);
cancel(checkpointIndexRetentionCron);
hourlyCron = null;
dailyCron = null;
checkpointIndexRetentionCron = null;
}

private void cancel(Cancellable cron) {
Expand All @@ -101,11 +105,12 @@ private void cancel(Cancellable cron) {
}
}

public Cancellable getDailyCron() {
return dailyCron;
@VisibleForTesting
protected Cancellable getCheckpointIndexRetentionCron() {
return checkpointIndexRetentionCron;
}

public Cancellable getHourlyCron() {
protected Cancellable getHourlyCron() {
return hourlyCron;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup;

import java.util.Arrays;
import java.util.Objects;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.store.StoreStats;

import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;

/**
* Clean up the old docs for indices.
*/
public class IndexCleanup {
private static final Logger LOG = LogManager.getLogger(IndexCleanup.class);

private final Client client;
private final ClientUtil clientUtil;
private final ClusterService clusterService;

public IndexCleanup(Client client, ClientUtil clientUtil, ClusterService clusterService) {
this.client = client;
this.clientUtil = clientUtil;
this.clusterService = clusterService;
}

/**
* delete docs when shard size is bigger than max limitation.
* @param indexName index name
* @param maxShardSize max shard size
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsBasedOnShardSize(
String indexName,
long maxShardSize,
QueryBuilder queryForDeleteByQueryRequest,
ActionListener<Boolean> listener
) {

if (!clusterService.state().getRoutingTable().hasIndex(indexName)) {
LOG.debug("skip as the index:{} doesn't exist", indexName);
return;
}

ActionListener<IndicesStatsResponse> indicesStatsResponseListener = ActionListener.wrap(indicesStatsResponse -> {
// Check if any shard size is bigger than maxShardSize
boolean cleanupNeeded = Arrays
.stream(indicesStatsResponse.getShards())
.map(ShardStats::getStats)
.filter(Objects::nonNull)
.map(CommonStats::getStore)
.filter(Objects::nonNull)
.map(StoreStats::getSizeInBytes)
.anyMatch(size -> size > maxShardSize);

if (cleanupNeeded) {
deleteDocsByQuery(
indexName,
queryForDeleteByQueryRequest,
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure)
);
} else {
listener.onResponse(false);
}
}, listener::onFailure);

getCheckpointShardStoreStats(indexName, indicesStatsResponseListener);
}

private void getCheckpointShardStoreStats(String indexName, ActionListener<IndicesStatsResponse> listener) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.store();
indicesStatsRequest.indices(indexName);
client.admin().indices().stats(indicesStatsRequest, listener);
}

/**
* Delete docs based on query request
* @param indexName index name
* @param queryForDeleteByQueryRequest query request
* @param listener action listener
*/
public void deleteDocsByQuery(String indexName, QueryBuilder queryForDeleteByQueryRequest, ActionListener<Long> listener) {
DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(indexName)
.setQuery(queryForDeleteByQueryRequest)
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN)
.setRefresh(true);

try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> {
// if 0 docs get deleted, it means our query cannot find any matching doc
LOG.info("{} docs are deleted for index:{}", response.getDeleted(), indexName);
listener.onResponse(response.getDeleted());
}, listener::onFailure));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup;

import java.time.Clock;
import java.time.Duration;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.index.query.QueryBuilders;

import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.ml.CheckpointDao;

/**
* Model checkpoints cleanup of multi-entity detectors.
* <p> <b>Problem:</b>
* In multi-entity detectors, we can have thousands, even millions of entities, of which the model checkpoints will consume
* lots of disk resources. To protect the our disk usage, the checkpoint index size will be limited with specified threshold.
* Once its size exceeds the threshold, the model checkpoints cleanup process will be activated.
* </p>
* <p> <b>Solution:</b>
* Before multi-entity detectors, there is daily cron job to clean up the inactive checkpoints longer than some configurable days.
* We will keep the this logic, and add new clean up way based on shard size.
* </p>
*/
public class ModelCheckpointIndexRetention implements Runnable {
private static final Logger LOG = LogManager.getLogger(ModelCheckpointIndexRetention.class);

// The recommended max shard size is 50G, we don't wanna our index exceeds this number
private static final long MAX_SHARD_SIZE_IN_BYTE = 50 * 1024 * 1024 * 1024L;
// We can't clean up all of the checkpoints. At least keep models for 1 day
private static final Duration MINIMUM_CHECKPOINT_TTL = Duration.ofDays(1);

private final Duration defaultCheckpointTtl;
private final Clock clock;
private final IndexCleanup indexCleanup;

public ModelCheckpointIndexRetention(Duration defaultCheckpointTtl, Clock clock, IndexCleanup indexCleanup) {
this.defaultCheckpointTtl = defaultCheckpointTtl;
this.clock = clock;
this.indexCleanup = indexCleanup;
}

@Override
public void run() {
indexCleanup
.deleteDocsByQuery(
CommonName.CHECKPOINT_INDEX_NAME,
QueryBuilders
.boolQuery()
.filter(
QueryBuilders
.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - defaultCheckpointTtl.toMillis())
.format(CommonName.EPOCH_MILLIS_FORMAT)
),
ActionListener
.wrap(
response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); },
// The docs will be deleted in next scheduled windows. No need for retrying.
exception -> LOG.error("delete docs by query fails for checkpoint index", exception)
)
);

}

private void cleanupBasedOnShardSize(Duration cleanUpTtl) {
indexCleanup
.deleteDocsBasedOnShardSize(
CommonName.CHECKPOINT_INDEX_NAME,
MAX_SHARD_SIZE_IN_BYTE,
QueryBuilders
.boolQuery()
.filter(
QueryBuilders
.rangeQuery(CheckpointDao.TIMESTAMP)
.lte(clock.millis() - cleanUpTtl.toMillis())
.format(CommonName.EPOCH_MILLIS_FORMAT)
),
ActionListener.wrap(cleanupNeeded -> {
if (cleanupNeeded) {
if (cleanUpTtl.equals(MINIMUM_CHECKPOINT_TTL)) {
return;
}

Duration nextCleanupTtl = cleanUpTtl.minusDays(1);
if (nextCleanupTtl.compareTo(MINIMUM_CHECKPOINT_TTL) < 0) {
nextCleanupTtl = MINIMUM_CHECKPOINT_TTL;
}
cleanupBasedOnShardSize(nextCleanupTtl);
} else {
LOG.debug("clean up not needed anymore for checkpoint index");
}
},
// The docs will be deleted in next scheduled windows. No need for retrying.
exception -> LOG.error("checkpoint index retention based on shard size fails", exception)
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.junit.Before;

import com.amazon.opendistroforelasticsearch.ad.AbstractADTest;
import com.amazon.opendistroforelasticsearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import com.amazon.opendistroforelasticsearch.ad.constant.CommonName;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer;
Expand All @@ -46,7 +47,7 @@ public class MasterEventListenerTests extends AbstractADTest {
private Client client;
private Clock clock;
private Cancellable hourlyCancellable;
private Cancellable dailyCancellable;
private Cancellable checkpointIndexRetentionCancellable;
private MasterEventListener masterService;
private ClientUtil clientUtil;
private DiscoveryNodeFilterer nodeFilter;
Expand All @@ -58,10 +59,11 @@ public void setUp() throws Exception {
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
hourlyCancellable = mock(Cancellable.class);
dailyCancellable = mock(Cancellable.class);
checkpointIndexRetentionCancellable = mock(Cancellable.class);
when(threadPool.scheduleWithFixedDelay(any(HourlyCron.class), any(TimeValue.class), any(String.class)))
.thenReturn(hourlyCancellable);
when(threadPool.scheduleWithFixedDelay(any(DailyCron.class), any(TimeValue.class), any(String.class))).thenReturn(dailyCancellable);
when(threadPool.scheduleWithFixedDelay(any(ModelCheckpointIndexRetention.class), any(TimeValue.class), any(String.class)))
.thenReturn(checkpointIndexRetentionCancellable);
client = mock(Client.class);
clock = mock(Clock.class);
clientUtil = mock(ClientUtil.class);
Expand All @@ -75,11 +77,11 @@ public void setUp() throws Exception {
public void testOnOffMaster() {
masterService.onMaster();
assertThat(hourlyCancellable, is(notNullValue()));
assertThat(dailyCancellable, is(notNullValue()));
assertThat(checkpointIndexRetentionCancellable, is(notNullValue()));
assertTrue(!masterService.getHourlyCron().isCancelled());
assertTrue(!masterService.getDailyCron().isCancelled());
assertTrue(!masterService.getCheckpointIndexRetentionCron().isCancelled());
masterService.offMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
}

Expand All @@ -100,10 +102,10 @@ public void testBeforeStop() {
}).when(clusterService).addLifecycleListener(any());

masterService.onMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
masterService.offMaster();
assertThat(masterService.getDailyCron(), is(nullValue()));
assertThat(masterService.getCheckpointIndexRetentionCron(), is(nullValue()));
assertThat(masterService.getHourlyCron(), is(nullValue()));
}
}
Loading

0 comments on commit f0105cc

Please sign in to comment.