From b6608fca0d156c31ca0689acc17c460032e21725 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Tue, 26 May 2020 16:05:18 +0100 Subject: [PATCH 1/3] Only request required fields in expired data removers --- .../xpack/core/ml/utils/time/TimeUtils.java | 30 ++++++++ .../retention/ExpiredForecastsRemover.java | 70 ++++++++++++------- .../ExpiredModelSnapshotsRemover.java | 48 ++++++++++--- .../xpack/ml/job/retention/MlDataRemover.java | 20 ++++++ .../AbstractExpiredJobDataRemoverTests.java | 7 ++ .../ExpiredModelSnapshotsRemoverTests.java | 43 +++++++++--- .../ml/job/retention/MlDataRemoverTests.java | 30 ++++++++ .../xpack/ml/utils/time/TimeUtilsTests.java | 6 ++ 8 files changed, 209 insertions(+), 45 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java index 6434b0ff2b98d..169a326402098 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java @@ -29,6 +29,36 @@ public static Date parseTimeField(XContentParser parser, String fieldName) throw "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } + /** + * Safely parses a string epoch representation to a Long + * + * Commonly this function is used for parsing Date fields from doc values + * requested with the format "epoch_millis". + * + * Since nanosecond support was added epoch_millis timestamps may have a fractional component. + * We discard this, taking just whole milliseconds. Arguably it would be better to retain the + * precision here and let the downstream component decide whether it wants the accuracy, but + * that makes it hard to pass around the value as a number. The double type doesn't have + * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would + * work, but that isn't supported by the JSON parser if the number gets round-tripped through + * JSON. So String is really the only format that could be used, but the consumers of time + * are expecting a number. + * + * @param epoch The epoch value as a string. This may contain a fractional component. + * @return The epoch value. + */ + public static long parseToEpochMs(String epoch) { + int dotPos = epoch.indexOf('.'); + if (dotPos == -1) { + return Long.parseLong(epoch); + } else if (dotPos > 0) { + return Long.parseLong(epoch.substring(0, dotPos)); + } else { + // The first character is '.' so round down to 0 + return 0L; + } + } + /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index bce61cb4c23eb..827ff38912079 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,11 +14,6 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -35,12 +30,11 @@ import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import org.elasticsearch.xpack.ml.MachineLearning; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; -import java.io.IOException; -import java.io.InputStream; import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -83,6 +77,10 @@ public void remove(ActionListener listener, Supplier isTimedOu .filter(QueryBuilders.termQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE)) .filter(QueryBuilders.existsQuery(ForecastRequestStats.EXPIRY_TIME.getPreferredName()))); source.size(MAX_FORECASTS); + source.fetchSource(false); + source.docValueField(Job.ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.FORECAST_ID.getPreferredName(), null); + source.docValueField(ForecastRequestStats.EXPIRY_TIME.getPreferredName(), "epoch_millis"); // _doc is the most efficient sort order and will also disable scoring source.sort(ElasticsearchMappings.ES_DOC); @@ -94,11 +92,9 @@ public void remove(ActionListener listener, Supplier isTimedOu } private void deleteForecasts(SearchResponse searchResponse, ActionListener listener, Supplier isTimedOutSupplier) { - List forecastsToDelete; - try { - forecastsToDelete = findForecastsToDelete(searchResponse); - } catch (IOException e) { - listener.onFailure(e); + List forecastsToDelete = findForecastsToDelete(searchResponse); + if (forecastsToDelete.isEmpty()) { + listener.onResponse(true); return; } @@ -129,8 +125,8 @@ public void onFailure(Exception e) { }); } - private List findForecastsToDelete(SearchResponse searchResponse) throws IOException { - List forecastsToDelete = new ArrayList<>(); + private List findForecastsToDelete(SearchResponse searchResponse) { + List forecastsToDelete = new ArrayList<>(); SearchHits hits = searchResponse.getHits(); if (hits.getTotalHits() > MAX_FORECASTS) { @@ -138,19 +134,27 @@ private List findForecastsToDelete(SearchResponse searchRe } for (SearchHit hit : hits.getHits()) { - try (InputStream stream = hit.getSourceRef().streamInput(); - XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser( - NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) { - ForecastRequestStats forecastRequestStats = ForecastRequestStats.LENIENT_PARSER.apply(parser, null); - if (forecastRequestStats.getExpiryTime().toEpochMilli() < cutoffEpochMs) { - forecastsToDelete.add(forecastRequestStats); + String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (expiryTime == null) { + LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + if (expiryMs < cutoffEpochMs) { + JobForecastId idPair = new JobForecastId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, Forecast.FORECAST_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + forecastsToDelete.add(idPair); } } } return forecastsToDelete; } - private DeleteByQueryRequest buildDeleteByQuery(List forecastsToDelete) { + private DeleteByQueryRequest buildDeleteByQuery(List ids) { DeleteByQueryRequest request = new DeleteByQueryRequest(); request.setSlices(5); @@ -158,10 +162,12 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec BoolQueryBuilder boolQuery = QueryBuilders.boolQuery().minimumShouldMatch(1); boolQuery.must(QueryBuilders.termsQuery(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE, Forecast.RESULT_TYPE_VALUE)); - for (ForecastRequestStats forecastToDelete : forecastsToDelete) { - boolQuery.should(QueryBuilders.boolQuery() - .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), forecastToDelete.getJobId())) - .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), forecastToDelete.getForecastId()))); + for (JobForecastId jobForecastId : ids) { + if (jobForecastId.hasNullValue() == false) { + boolQuery.should(QueryBuilders.boolQuery() + .must(QueryBuilders.termQuery(Job.ID.getPreferredName(), jobForecastId.jobId)) + .must(QueryBuilders.termQuery(Forecast.FORECAST_ID.getPreferredName(), jobForecastId.forecastId))); + } } QueryBuilder query = QueryBuilders.boolQuery().filter(boolQuery); request.setQuery(query); @@ -171,4 +177,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List forec return request; } + + private static class JobForecastId { + private final String jobId; + private final String forecastId; + + private JobForecastId(String jobId, String forecastId) { + this.jobId = jobId; + this.forecastId = forecastId; + } + + boolean hasNullValue() { + return jobId == null || forecastId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java index 7f46f63516d73..0ee1029e636a4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemover.java @@ -88,7 +88,14 @@ protected void removeDataBefore(Job job, long cutoffEpochMs, ActionListener(LOGGER, threadPool, MachineLearning.UTILITY_THREAD_POOL_NAME, expiredSnapshotsListener(job.getId(), listener), false)); @@ -99,11 +106,18 @@ private ActionListener expiredSnapshotsListener(String jobId, Ac @Override public void onResponse(SearchResponse searchResponse) { try { - List modelSnapshots = new ArrayList<>(); + List snapshotIds = new ArrayList<>(); for (SearchHit hit : searchResponse.getHits()) { - modelSnapshots.add(ModelSnapshot.fromJson(hit.getSourceRef())); + JobSnapshotId idPair = new JobSnapshotId( + stringFieldValueOrNull(hit, Job.ID.getPreferredName()), + stringFieldValueOrNull(hit, ModelSnapshotField.SNAPSHOT_ID.getPreferredName())); + + if (idPair.hasNullValue() == false) { + snapshotIds.add(idPair); + } } - deleteModelSnapshots(new VolatileCursorIterator<>(modelSnapshots), listener); + + deleteModelSnapshots(new VolatileCursorIterator<>(snapshotIds), listener); } catch (Exception e) { onFailure(e); } @@ -116,14 +130,14 @@ public void onFailure(Exception e) { }; } - private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { + private void deleteModelSnapshots(Iterator modelSnapshotIterator, ActionListener listener) { if (modelSnapshotIterator.hasNext() == false) { listener.onResponse(true); return; } - ModelSnapshot modelSnapshot = modelSnapshotIterator.next(); - DeleteModelSnapshotAction.Request deleteSnapshotRequest = new DeleteModelSnapshotAction.Request( - modelSnapshot.getJobId(), modelSnapshot.getSnapshotId()); + JobSnapshotId idPair = modelSnapshotIterator.next(); + DeleteModelSnapshotAction.Request deleteSnapshotRequest = + new DeleteModelSnapshotAction.Request(idPair.jobId, idPair.snapshotId); getClient().execute(DeleteModelSnapshotAction.INSTANCE, deleteSnapshotRequest, new ActionListener() { @Override public void onResponse(AcknowledgedResponse response) { @@ -136,9 +150,23 @@ public void onResponse(AcknowledgedResponse response) { @Override public void onFailure(Exception e) { - listener.onFailure(new ElasticsearchException("[" + modelSnapshot.getJobId() + "] Failed to delete snapshot [" - + modelSnapshot.getSnapshotId() + "]", e)); + listener.onFailure(new ElasticsearchException("[" + idPair.jobId + "] Failed to delete snapshot [" + + idPair.snapshotId + "]", e)); } }); } + + static class JobSnapshotId { + private final String jobId; + private final String snapshotId; + + JobSnapshotId(String jobId, String snapshotId) { + this.jobId = jobId; + this.snapshotId = snapshotId; + } + + boolean hasNullValue() { + return jobId == null || snapshotId == null; + } + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java index 485d8e9bfa22d..0fa06262801f3 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemover.java @@ -6,9 +6,29 @@ package org.elasticsearch.xpack.ml.job.retention; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.common.document.DocumentField; +import org.elasticsearch.search.SearchHit; import java.util.function.Supplier; public interface MlDataRemover { void remove(ActionListener listener, Supplier isTimedOutSupplier); + + /** + * Extract {@code fieldName} from {@code hit} and if it is a string + * return the string else {@code null}. + * @param hit The search hit + * @param fieldName Field to find + * @return value iff the docfield is present and it is a string. Otherwise {@code null} + */ + default String stringFieldValueOrNull(SearchHit hit, String fieldName) { + DocumentField docField = hit.field(fieldName); + if (docField != null) { + Object value = docField.getValue(); + if (value instanceof String) { + return (String)value; + } + } + return null; + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java index 5256cb9cc0e35..a6287d6f1a031 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/AbstractExpiredJobDataRemoverTests.java @@ -92,6 +92,13 @@ private static SearchResponse createSearchResponse(List to return searchResponse; } + static SearchResponse createSearchResponseFromHits(List hits) { + SearchHits searchHits = new SearchHits(hits.toArray(new SearchHit[] {}), hits.size(), 1.0f); + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.getHits()).thenReturn(searchHits); + return searchResponse; + } + public void testRemoveGivenNoJobs() throws IOException { SearchResponse response = createSearchResponse(Collections.emptyList()); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java index 42e97efbe3f13..6b66965829ac3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredModelSnapshotsRemoverTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.mock.orig.Mockito; +import org.elasticsearch.search.SearchHit; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.FixedExecutorBuilder; import org.elasticsearch.threadpool.ThreadPool; @@ -25,7 +26,9 @@ import org.elasticsearch.xpack.core.ml.job.config.JobTests; import org.elasticsearch.xpack.core.ml.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot; +import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshotField; import org.elasticsearch.xpack.ml.MachineLearning; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; @@ -118,11 +121,13 @@ public void testRemove_GivenJobsWithMixedRetentionPolicies() throws IOException JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() )); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1"); + SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2"); + searchResponsesPerCall.add( + AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Arrays.asList(snapshot1_1, snapshot1_2))); + + SearchHit snapshot2_1 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_1))); createExpiredModelSnapshotsRemover().remove(listener, () -> false); @@ -203,12 +208,13 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio JobTests.buildJobBuilder("snapshots-2").setModelSnapshotRetentionDays(17L).setModelSnapshotId("active").build() )); - List snapshots1JobSnapshots = Arrays.asList(createModelSnapshot("snapshots-1", "snapshots-1_1"), - createModelSnapshot("snapshots-1", "snapshots-1_2")); - List snapshots2JobSnapshots = Collections.singletonList(createModelSnapshot("snapshots-2", "snapshots-2_1")); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots1JobSnapshots)); - searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponse(snapshots2JobSnapshots)); + SearchHit snapshot1_1 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_1"); + SearchHit snapshot1_2 = createModelSnapshotQueryHit("snapshots-1", "snapshots-1_2"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits( + Arrays.asList(snapshot1_1, snapshot1_2))); + SearchHit snapshot2_2 = createModelSnapshotQueryHit("snapshots-2", "snapshots-2_1"); + searchResponsesPerCall.add(AbstractExpiredJobDataRemoverTests.createSearchResponseFromHits(Collections.singletonList(snapshot2_2))); createExpiredModelSnapshotsRemover().remove(listener, () -> false); listener.waitToCompletion(); @@ -224,6 +230,17 @@ public void testRemove_GivenClientDeleteSnapshotRequestsFail() throws IOExceptio assertThat(deleteSnapshotRequest.getSnapshotId(), equalTo("snapshots-1_1")); } + public void testJobSnapshotId() { + ExpiredModelSnapshotsRemover.JobSnapshotId id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", "b"); + assertFalse(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, "b"); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId("a", null); + assertTrue(id.hasNullValue()); + id = new ExpiredModelSnapshotsRemover.JobSnapshotId(null, null); + assertTrue(id.hasNullValue()); + } + @SuppressWarnings("unchecked") private void givenJobs(List jobs) throws IOException { SearchResponse response = AbstractExpiredJobDataRemoverTests.createSearchResponse(jobs); @@ -287,4 +304,10 @@ public Void answer(InvocationOnMock invocationOnMock) { }).when(client).execute(same(DeleteModelSnapshotAction.INSTANCE), any(), any()); } + private static SearchHit createModelSnapshotQueryHit(String jobId, String snapshotId) { + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField(Job.ID.getPreferredName(), Collections.singletonList(jobId)); + hitBuilder.addField(ModelSnapshotField.SNAPSHOT_ID.getPreferredName(), Collections.singletonList(snapshotId)); + return hitBuilder.build(); + } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java new file mode 100644 index 0000000000000..5b5638a904a99 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/MlDataRemoverTests.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.test.SearchHitBuilder; + +import java.util.Collections; +import java.util.Date; + +public class MlDataRemoverTests extends ESTestCase { + public void testStringOrNull() { + MlDataRemover remover = (listener, isTimedOutSupplier) -> { }; + + SearchHitBuilder hitBuilder = new SearchHitBuilder(0); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "missing")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("not_a_string", Collections.singletonList(new Date())); + assertNull(remover.stringFieldValueOrNull(hitBuilder.build(), "not_a_string")); + + hitBuilder = new SearchHitBuilder(0); + hitBuilder.addField("string_field", Collections.singletonList("actual_string_value")); + assertEquals("actual_string_value", remover.stringFieldValueOrNull(hitBuilder.build(), "string_field")); + } +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java index d33968a37cfa7..b4f4d40cc53b3 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java @@ -79,4 +79,10 @@ public void testCheckNonNegativeMultiple_GivenPositiveNonMultiple() { public void testCheckNonNegativeMultiple_GivenPositiveMultiple() { TimeUtils.checkNonNegativeMultiple(TimeValue.timeValueMillis(1), TimeUnit.MILLISECONDS, new ParseField("foo")); } + + public void testParseToEpochMs() { + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); + assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); + assertEquals(0L, TimeUtils.parseToEpochMs(".005")); + } } From 19efa8cbcdf9a945b179b15b2241ccec997e23a7 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 27 May 2020 10:16:52 +0100 Subject: [PATCH 2/3] Revert TimeUtils --- .../xpack/core/ml/utils/time/TimeUtils.java | 30 ------------------- .../xpack/ml/utils/time/TimeUtilsTests.java | 6 ---- 2 files changed, 36 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java index 169a326402098..6434b0ff2b98d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java @@ -29,36 +29,6 @@ public static Date parseTimeField(XContentParser parser, String fieldName) throw "unexpected token [" + parser.currentToken() + "] for [" + fieldName + "]"); } - /** - * Safely parses a string epoch representation to a Long - * - * Commonly this function is used for parsing Date fields from doc values - * requested with the format "epoch_millis". - * - * Since nanosecond support was added epoch_millis timestamps may have a fractional component. - * We discard this, taking just whole milliseconds. Arguably it would be better to retain the - * precision here and let the downstream component decide whether it wants the accuracy, but - * that makes it hard to pass around the value as a number. The double type doesn't have - * enough digits of accuracy, and obviously long cannot store the fraction. BigDecimal would - * work, but that isn't supported by the JSON parser if the number gets round-tripped through - * JSON. So String is really the only format that could be used, but the consumers of time - * are expecting a number. - * - * @param epoch The epoch value as a string. This may contain a fractional component. - * @return The epoch value. - */ - public static long parseToEpochMs(String epoch) { - int dotPos = epoch.indexOf('.'); - if (dotPos == -1) { - return Long.parseLong(epoch); - } else if (dotPos > 0) { - return Long.parseLong(epoch.substring(0, dotPos)); - } else { - // The first character is '.' so round down to 0 - return 0L; - } - } - /** * First tries to parse the date first as a Long and convert that to an * epoch time. If the long number has more than 10 digits it is considered a diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java index b4f4d40cc53b3..d33968a37cfa7 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/time/TimeUtilsTests.java @@ -79,10 +79,4 @@ public void testCheckNonNegativeMultiple_GivenPositiveNonMultiple() { public void testCheckNonNegativeMultiple_GivenPositiveMultiple() { TimeUtils.checkNonNegativeMultiple(TimeValue.timeValueMillis(1), TimeUnit.MILLISECONDS, new ParseField("foo")); } - - public void testParseToEpochMs() { - assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000")); - assertEquals(1462096800000L, TimeUtils.parseToEpochMs("1462096800000.005")); - assertEquals(0L, TimeUtils.parseToEpochMs(".005")); - } } From 8a32d639a106c31ff0fb37dbe05131163eecbd44 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 27 May 2020 10:48:55 +0100 Subject: [PATCH 3/3] Parse date as long or string --- .../retention/ExpiredForecastsRemover.java | 25 ++++++++++++++++--- .../ExpiredForecastsRemoverTests.java | 20 +++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java index 827ff38912079..d82dd1a063dff 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemover.java @@ -14,6 +14,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.client.Client; +import org.elasticsearch.common.document.DocumentField; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -30,7 +31,6 @@ import org.elasticsearch.xpack.core.ml.job.results.Forecast; import org.elasticsearch.xpack.core.ml.job.results.ForecastRequestStats; import org.elasticsearch.xpack.core.ml.job.results.Result; -import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; import org.elasticsearch.xpack.ml.MachineLearning; import org.joda.time.DateTime; import org.joda.time.chrono.ISOChronology; @@ -134,13 +134,20 @@ private List findForecastsToDelete(SearchResponse searchResponse) } for (SearchHit hit : hits.getHits()) { - String expiryTime = stringFieldValueOrNull(hit, ForecastRequestStats.EXPIRY_TIME.getPreferredName()); - if (expiryTime == null) { + DocumentField docField = hit.field(ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + if (docField == null) { LOGGER.warn("Forecast request stats document [{}] has a null [{}] field", hit.getId(), ForecastRequestStats.EXPIRY_TIME.getPreferredName()); continue; } - long expiryMs = TimeUtils.parseToEpochMs(expiryTime); + + Long expiryMs = parseDateField(docField.getValue()); + if (expiryMs == null) { + LOGGER.warn("Forecast request stats document [{}] date field [{}] cannot be parsed", hit.getId(), + ForecastRequestStats.EXPIRY_TIME.getPreferredName()); + continue; + } + if (expiryMs < cutoffEpochMs) { JobForecastId idPair = new JobForecastId( stringFieldValueOrNull(hit, Job.ID.getPreferredName()), @@ -178,6 +185,16 @@ private DeleteByQueryRequest buildDeleteByQuery(List ids) { return request; } + static Long parseDateField(Object value) { + if (value instanceof String) { // doc_value field with the epoch_millis format + return Long.parseLong((String)value); + } else if (value instanceof Long) { // pre-6.0 field + return (Long)value; + } else { + return null; + } + } + private static class JobForecastId { private final String jobId; private final String forecastId; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java new file mode 100644 index 0000000000000..6ce7951374f34 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/retention/ExpiredForecastsRemoverTests.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.retention; + +import org.elasticsearch.test.ESTestCase; + +import java.util.Date; + +public class ExpiredForecastsRemoverTests extends ESTestCase { + + public void testDateParsing() { + assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField("1462096800000")); + assertEquals(Long.valueOf(1462096800000L), ExpiredForecastsRemover.parseDateField(1462096800000L)); + assertNull(ExpiredForecastsRemover.parseDateField(new Date())); + } +}