From b7550ab214a35e993f7cbb63829ca69d4116db26 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 4 Jul 2018 13:56:32 +0100 Subject: [PATCH] [ML] Rate limit established model memory updates (#31768) There is at most one model size stats document per bucket, but during lookback a job can churn through many buckets very quickly. This can lead to many cluster state updates if established model memory needs to be updated for a given model size stats document. This change rate limits established model memory updates to one per job per 5 seconds. This is done by scheduling the updates 5 seconds in the future, but replacing the value to be written if another model size stats document is received during the waiting period. Updating the values in arrears like this means that the last value received will be the one associated with the job in the long term, whereas alternative approaches such as not updating the value if a new value was close to the old value would not. --- .../output/AutoDetectResultProcessor.java | 131 ++++++++++++++---- .../AutoDetectResultProcessorTests.java | 91 ++++++++++-- 2 files changed, 186 insertions(+), 36 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java index c9a79ecaa4da8..470fba1b6cc06 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java @@ -14,6 +14,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.xpack.core.ml.MachineLearningField; import org.elasticsearch.xpack.core.ml.action.PutJobAction; import org.elasticsearch.xpack.core.ml.action.UpdateJobAction; @@ -30,6 +33,7 @@ import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; +import org.elasticsearch.xpack.ml.MachineLearning; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcess; @@ -43,6 +47,7 @@ import java.util.List; import java.util.Objects; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -71,6 +76,13 @@ public class AutoDetectResultProcessor { private static final Logger LOGGER = Loggers.getLogger(AutoDetectResultProcessor.class); + /** + * This is how far behind real-time we'll update the job with the latest established model memory. + * If more updates are received during the delay period then they'll take precedence. + * As a result there will be at most one update of established model memory per delay period. + */ + private static final TimeValue ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY = TimeValue.timeValueSeconds(5); + private final Client client; private final Auditor auditor; private final String jobId; @@ -90,8 +102,10 @@ public class AutoDetectResultProcessor { * New model size stats are read as the process is running */ private volatile ModelSizeStats latestModelSizeStats; + private volatile Date latestDateForEstablishedModelMemoryCalc; private volatile long latestEstablishedModelMemory; private volatile boolean haveNewLatestModelSizeStats; + private Future scheduledEstablishedModelMemoryUpdate; // only accessed in synchronized methods public AutoDetectResultProcessor(Client client, Auditor auditor, String jobId, Renormalizer renormalizer, JobResultsPersister persister, JobProvider jobProvider, ModelSizeStats latestModelSizeStats, boolean restoredSnapshot) { @@ -148,6 +162,7 @@ public void process(AutodetectProcess process) { } LOGGER.info("[{}] {} buckets parsed from autodetect output", jobId, bucketCount); + runEstablishedModelMemoryUpdate(true); } catch (Exception e) { failed = true; @@ -194,15 +209,15 @@ void processResult(Context context, AutodetectResult result) { // persist after deleting interim results in case the new // results are also interim context.bulkResultsPersister.persistBucket(bucket).executeRequest(); + latestDateForEstablishedModelMemoryCalc = bucket.getTimestamp(); ++bucketCount; // if we haven't previously set established model memory, consider trying again after - // a reasonable amount of time has elapsed since the last model size stats update + // a reasonable number of buckets have elapsed since the last model size stats update long minEstablishedTimespanMs = JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE * bucket.getBucketSpan() * 1000L; - if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 - && bucket.getTimestamp().getTime() > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { - persister.commitResultWrites(context.jobId); - updateEstablishedModelMemoryOnJob(bucket.getTimestamp(), latestModelSizeStats); + if (haveNewLatestModelSizeStats && latestEstablishedModelMemory == 0 && latestDateForEstablishedModelMemoryCalc.getTime() + > latestModelSizeStats.getTimestamp().getTime() + minEstablishedTimespanMs) { + scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); haveNewLatestModelSizeStats = false; } } @@ -293,15 +308,14 @@ private void processModelSizeStats(Context context, ModelSizeStats modelSizeStat persister.persistModelSizeStats(modelSizeStats); notifyModelMemoryStatusChange(context, modelSizeStats); latestModelSizeStats = modelSizeStats; + latestDateForEstablishedModelMemoryCalc = modelSizeStats.getTimestamp(); haveNewLatestModelSizeStats = true; // This is a crude way to NOT refresh the index and NOT attempt to update established model memory during the first 20 buckets // because this is when the model size stats are likely to be least stable and lots of updates will be coming through, and // we'll NEVER consider memory usage to be established during this period if (restoredSnapshot || bucketCount >= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { - // We need to make all results written up to and including these stats available for the established memory calculation - persister.commitResultWrites(context.jobId); - updateEstablishedModelMemoryOnJob(modelSizeStats.getTimestamp(), modelSizeStats); + scheduleEstablishedModelMemoryUpdate(ESTABLISHED_MODEL_MEMORY_UPDATE_DELAY); } } @@ -348,26 +362,91 @@ public void onFailure(Exception e) { }); } - private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) { - jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> { - JobUpdate update = new JobUpdate.Builder(jobId) - .setEstablishedModelMemory(establishedModelMemory).build(); - UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); - updateRequest.setWaitForAck(false); - - executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener() { - @Override - public void onResponse(PutJobAction.Response response) { - latestEstablishedModelMemory = establishedModelMemory; - LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); - } + /** + * The purpose of this method is to avoid saturating the cluster state update thread + * when a lookback job is churning through buckets very fast and the memory usage of + * the job is changing regularly. The idea is to only update the established model + * memory associated with the job a few seconds after the new value has been received. + * If more updates are received during the delay period then they simply replace the + * value that originally caused the update to be scheduled. This rate limits cluster + * state updates due to established model memory changing to one per job per delay period. + * (In reality updates will only occur this rapidly during lookback. During real-time + * operation the limit of one model size stats document per bucket will mean there is a + * maximum of one cluster state update per job per bucket, and usually the bucket span + * is 5 minutes or more.) + * @param delay The delay before updating established model memory. + */ + synchronized void scheduleEstablishedModelMemoryUpdate(TimeValue delay) { - @Override - public void onFailure(Exception e) { - LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + establishedModelMemory + "]", - e); + if (scheduledEstablishedModelMemoryUpdate == null) { + try { + scheduledEstablishedModelMemoryUpdate = client.threadPool().schedule(delay, MachineLearning.UTILITY_THREAD_POOL_NAME, + () -> runEstablishedModelMemoryUpdate(false)); + LOGGER.trace("[{}] Scheduled established model memory update to run in [{}]", jobId, delay); + } catch (EsRejectedExecutionException e) { + if (e.isExecutorShutdown()) { + LOGGER.debug("failed to schedule established model memory update; shutting down", e); + } else { + throw e; } - }); + } + } + } + + /** + * This method is called from two places: + * - From the {@link Future} used for delayed updates + * - When shutting down this result processor + * When shutting down the result processor it's only necessary to do anything + * if an update has been scheduled, but we want to do the update immediately. + * Despite cancelling the scheduled update in this case, it's possible that + * it's already started running, in which case this method will get called + * twice in quick succession. But the second call will do nothing, as + * scheduledEstablishedModelMemoryUpdate will have been reset + * to null by the first call. + */ + private synchronized void runEstablishedModelMemoryUpdate(boolean cancelExisting) { + + if (scheduledEstablishedModelMemoryUpdate != null) { + if (cancelExisting) { + LOGGER.debug("[{}] Bringing forward previously scheduled established model memory update", jobId); + FutureUtils.cancel(scheduledEstablishedModelMemoryUpdate); + } + scheduledEstablishedModelMemoryUpdate = null; + updateEstablishedModelMemoryOnJob(); + } + } + + private void updateEstablishedModelMemoryOnJob() { + + // Copy these before committing writes, so the calculation is done based on committed documents + Date latestBucketTimestamp = latestDateForEstablishedModelMemoryCalc; + ModelSizeStats modelSizeStatsForCalc = latestModelSizeStats; + + // We need to make all results written up to and including these stats available for the established memory calculation + persister.commitResultWrites(jobId); + + jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStatsForCalc, establishedModelMemory -> { + if (latestEstablishedModelMemory != establishedModelMemory) { + JobUpdate update = new JobUpdate.Builder(jobId).setEstablishedModelMemory(establishedModelMemory).build(); + UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update); + updateRequest.setWaitForAck(false); + + executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, + new ActionListener() { + @Override + public void onResponse(PutJobAction.Response response) { + latestEstablishedModelMemory = establishedModelMemory; + LOGGER.debug("[{}] Updated job with established model memory [{}]", jobId, establishedModelMemory); + } + + @Override + public void onFailure(Exception e) { + LOGGER.error("[" + jobId + "] Failed to update job with new established model memory [" + + establishedModelMemory + "]", e); + } + }); + } }, e -> LOGGER.error("[" + jobId + "] Failed to calculate established model memory", e)); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java index bd11ecb3128c6..f591f5cd8c778 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessorTests.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; @@ -33,6 +34,7 @@ import org.elasticsearch.xpack.ml.job.process.normalizer.Renormalizer; import org.elasticsearch.xpack.ml.job.results.AutodetectResult; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.junit.After; import org.junit.Before; import org.mockito.InOrder; @@ -42,14 +44,16 @@ import java.util.Date; import java.util.Iterator; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.same; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -63,7 +67,9 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private static final String JOB_ID = "_id"; + private static final long BUCKET_SPAN_MS = 1000; + private ThreadPool threadPool; private Client client; private Auditor auditor; private Renormalizer renormalizer; @@ -71,12 +77,14 @@ public class AutoDetectResultProcessorTests extends ESTestCase { private JobProvider jobProvider; private FlushListener flushListener; private AutoDetectResultProcessor processorUnderTest; + private ScheduledThreadPoolExecutor executor; @Before public void setUpMocks() { + executor = new ScheduledThreadPoolExecutor(1); client = mock(Client.class); auditor = mock(Auditor.class); - ThreadPool threadPool = mock(ThreadPool.class); + threadPool = mock(ThreadPool.class); when(client.threadPool()).thenReturn(threadPool); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); renormalizer = mock(Renormalizer.class); @@ -84,7 +92,12 @@ public void setUpMocks() { jobProvider = mock(JobProvider.class); flushListener = mock(FlushListener.class); processorUnderTest = new AutoDetectResultProcessor(client, auditor, JOB_ID, renormalizer, persister, jobProvider, - new ModelSizeStats.Builder(JOB_ID).build(), false, flushListener); + new ModelSizeStats.Builder(JOB_ID).setTimestamp(new Date(BUCKET_SPAN_MS)).build(), false, flushListener); + } + + @After + public void cleanup() { + executor.shutdown(); } public void testProcess() throws TimeoutException { @@ -288,6 +301,8 @@ public void testProcessResult_modelSizeStats() { public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + setupScheduleDelayTime(TimeValue.timeValueSeconds(5)); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; AutodetectResult result = mock(AutodetectResult.class); @@ -321,11 +336,14 @@ public void testProcessResult_modelSizeStatsWithMemoryStatusChanges() { verifyNoMoreInteractions(auditor); } - public void testProcessResult_modelSizeStatsAfterManyBuckets() { + public void testProcessResult_modelSizeStatsAfterManyBuckets() throws Exception { JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + // To avoid slowing down the test this is using a delay of 1 nanosecond rather than the 5 seconds used in production + setupScheduleDelayTime(TimeValue.timeValueNanos(1)); + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); context.deleteInterimRequired = false; for (int i = 0; i < JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE; ++i) { @@ -337,16 +355,64 @@ public void testProcessResult_modelSizeStatsAfterManyBuckets() { AutodetectResult result = mock(AutodetectResult.class); ModelSizeStats modelSizeStats = mock(ModelSizeStats.class); + Date timestamp = new Date(BUCKET_SPAN_MS); + when(modelSizeStats.getTimestamp()).thenReturn(timestamp); when(result.getModelSizeStats()).thenReturn(modelSizeStats); processorUnderTest.processResult(context, result); - verify(persister, times(1)).persistModelSizeStats(modelSizeStats); - verify(persister, times(1)).commitResultWrites(JOB_ID); - verifyNoMoreInteractions(persister); - verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), isNull(Date.class), eq(modelSizeStats), + // Some calls will be made 1 nanosecond later in a different thread, hence the assertBusy() + assertBusy(() -> { + verify(persister, times(1)).persistModelSizeStats(modelSizeStats); + verify(persister, times(1)).commitResultWrites(JOB_ID); + verifyNoMoreInteractions(persister); + verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(timestamp), eq(modelSizeStats), any(Consumer.class), + any(Consumer.class)); + verifyNoMoreInteractions(jobProvider); + assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); + }); + } + + public void testProcessResult_manyModelSizeStatsInQuickSuccession() throws Exception { + JobResultsPersister.Builder bulkBuilder = mock(JobResultsPersister.Builder.class); + when(persister.bulkPersisterBuilder(JOB_ID)).thenReturn(bulkBuilder); + when(bulkBuilder.persistBucket(any(Bucket.class))).thenReturn(bulkBuilder); + + setupScheduleDelayTime(TimeValue.timeValueSeconds(1)); + + AutoDetectResultProcessor.Context context = new AutoDetectResultProcessor.Context(JOB_ID, bulkBuilder); + context.deleteInterimRequired = false; + ModelSizeStats modelSizeStats = null; + for (int i = 1; i <= JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE + 5; ++i) { + AutodetectResult result = mock(AutodetectResult.class); + Bucket bucket = mock(Bucket.class); + when(bucket.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); + when(result.getBucket()).thenReturn(bucket); + processorUnderTest.processResult(context, result); + if (i > JobProvider.BUCKETS_FOR_ESTABLISHED_MEMORY_SIZE) { + result = mock(AutodetectResult.class); + modelSizeStats = mock(ModelSizeStats.class); + when(modelSizeStats.getTimestamp()).thenReturn(new Date(BUCKET_SPAN_MS * i)); + when(result.getModelSizeStats()).thenReturn(modelSizeStats); + processorUnderTest.processResult(context, result); + } + } + + ModelSizeStats lastModelSizeStats = modelSizeStats; + assertNotNull(lastModelSizeStats); + Date lastTimestamp = lastModelSizeStats.getTimestamp(); + + // Some calls will be made 1 second later in a different thread, hence the assertBusy() + assertBusy(() -> { + // All the model size stats should be persisted to the index... + verify(persister, times(5)).persistModelSizeStats(any(ModelSizeStats.class)); + // ...but only the last should trigger an established model memory update + verify(persister, times(1)).commitResultWrites(JOB_ID); + verifyNoMoreInteractions(persister); + verify(jobProvider, times(1)).getEstablishedMemoryUsage(eq(JOB_ID), eq(lastTimestamp), eq(lastModelSizeStats), any(Consumer.class), any(Consumer.class)); - verifyNoMoreInteractions(jobProvider); - assertEquals(modelSizeStats, processorUnderTest.modelSizeStats()); + verifyNoMoreInteractions(jobProvider); + assertEquals(lastModelSizeStats, processorUnderTest.modelSizeStats()); + }); } public void testProcessResult_modelSnapshot() { @@ -484,4 +550,9 @@ public void testKill() throws TimeoutException { verify(renormalizer, times(1)).waitUntilIdle(); verify(flushListener, times(1)).clear(); } + + private void setupScheduleDelayTime(TimeValue delay) { + when(threadPool.schedule(any(TimeValue.class), anyString(), any(Runnable.class))) + .thenAnswer(i -> executor.schedule((Runnable) i.getArguments()[2], delay.nanos(), TimeUnit.NANOSECONDS)); + } }