1414import org .elasticsearch .action .search .SearchResponse ;
1515import org .elasticsearch .action .support .ThreadedActionListener ;
1616import org .elasticsearch .client .OriginSettingClient ;
17- import org .elasticsearch .common .xcontent .LoggingDeprecationHandler ;
18- import org .elasticsearch .common .xcontent .NamedXContentRegistry ;
19- import org .elasticsearch .common .xcontent .XContentFactory ;
20- import org .elasticsearch .common .xcontent .XContentParser ;
21- import org .elasticsearch .common .xcontent .XContentType ;
2217import org .elasticsearch .index .query .BoolQueryBuilder ;
2318import org .elasticsearch .index .query .QueryBuilder ;
2419import org .elasticsearch .index .query .QueryBuilders ;
3025import org .elasticsearch .search .SearchHits ;
3126import org .elasticsearch .search .builder .SearchSourceBuilder ;
3227import org .elasticsearch .threadpool .ThreadPool ;
28+ import org .elasticsearch .xpack .core .common .time .TimeUtils ;
3329import org .elasticsearch .xpack .core .ml .job .config .Job ;
3430import org .elasticsearch .xpack .core .ml .job .persistence .AnomalyDetectorsIndex ;
3531import org .elasticsearch .xpack .core .ml .job .persistence .ElasticsearchMappings ;
3834import org .elasticsearch .xpack .core .ml .job .results .Result ;
3935import org .elasticsearch .xpack .ml .MachineLearning ;
4036
41- import java .io .IOException ;
42- import java .io .InputStream ;
4337import java .time .Clock ;
4438import java .time .Instant ;
4539import java .util .ArrayList ;
@@ -85,6 +79,11 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
8579 .filter (QueryBuilders .existsQuery (ForecastRequestStats .EXPIRY_TIME .getPreferredName ())));
8680 source .size (MAX_FORECASTS );
8781 source .trackTotalHits (true );
82+ source .fetchSource (false );
83+ source .docValueField (Job .ID .getPreferredName (), null );
84+ source .docValueField (ForecastRequestStats .FORECAST_ID .getPreferredName (), null );
85+ source .docValueField (ForecastRequestStats .EXPIRY_TIME .getPreferredName (), "epoch_millis" );
86+
8887
8988 // _doc is the most efficient sort order and will also disable scoring
9089 source .sort (ElasticsearchMappings .ES_DOC );
@@ -96,11 +95,9 @@ public void remove(ActionListener<Boolean> listener, Supplier<Boolean> isTimedOu
9695 }
9796
9897 private void deleteForecasts (SearchResponse searchResponse , ActionListener <Boolean > listener , Supplier <Boolean > isTimedOutSupplier ) {
99- List <ForecastRequestStats > forecastsToDelete ;
100- try {
101- forecastsToDelete = findForecastsToDelete (searchResponse );
102- } catch (IOException e ) {
103- listener .onFailure (e );
98+ List <JobForecastId > forecastsToDelete = findForecastsToDelete (searchResponse );
99+ if (forecastsToDelete .isEmpty ()) {
100+ listener .onResponse (true );
104101 return ;
105102 }
106103
@@ -131,39 +128,51 @@ public void onFailure(Exception e) {
131128 });
132129 }
133130
134- private List <ForecastRequestStats > findForecastsToDelete (SearchResponse searchResponse ) throws IOException {
135- List <ForecastRequestStats > forecastsToDelete = new ArrayList <>();
131+ private List <JobForecastId > findForecastsToDelete (SearchResponse searchResponse ) {
132+ List <JobForecastId > forecastsToDelete = new ArrayList <>();
136133
137134 SearchHits hits = searchResponse .getHits ();
138135 if (hits .getTotalHits ().value > MAX_FORECASTS ) {
139136 LOGGER .info ("More than [{}] forecasts were found. This run will only delete [{}] of them" , MAX_FORECASTS , MAX_FORECASTS );
140137 }
141138
142139 for (SearchHit hit : hits .getHits ()) {
143- try (InputStream stream = hit .getSourceRef ().streamInput ();
144- XContentParser parser = XContentFactory .xContent (XContentType .JSON ).createParser (
145- NamedXContentRegistry .EMPTY , LoggingDeprecationHandler .INSTANCE , stream )) {
146- ForecastRequestStats forecastRequestStats = ForecastRequestStats .LENIENT_PARSER .apply (parser , null );
147- if (forecastRequestStats .getExpiryTime ().toEpochMilli () < cutoffEpochMs ) {
148- forecastsToDelete .add (forecastRequestStats );
140+ String expiryTime = stringFieldValueOrNull (hit , ForecastRequestStats .EXPIRY_TIME .getPreferredName ());
141+ if (expiryTime == null ) {
142+ LOGGER .warn ("Forecast request stats document [{}] has a null [{}] field" , hit .getId (),
143+ ForecastRequestStats .EXPIRY_TIME .getPreferredName ());
144+ continue ;
145+ }
146+ long expiryMs = TimeUtils .parseToEpochMs (expiryTime );
147+ if (expiryMs < cutoffEpochMs ) {
148+ JobForecastId idPair = new JobForecastId (
149+ stringFieldValueOrNull (hit , Job .ID .getPreferredName ()),
150+ stringFieldValueOrNull (hit , Forecast .FORECAST_ID .getPreferredName ()));
151+
152+ if (idPair .hasNullValue () == false ) {
153+ forecastsToDelete .add (idPair );
149154 }
155+
150156 }
157+
151158 }
152159 return forecastsToDelete ;
153160 }
154161
155- private DeleteByQueryRequest buildDeleteByQuery (List <ForecastRequestStats > forecastsToDelete ) {
162+ private DeleteByQueryRequest buildDeleteByQuery (List <JobForecastId > ids ) {
156163 DeleteByQueryRequest request = new DeleteByQueryRequest ();
157164 request .setSlices (AbstractBulkByScrollRequest .AUTO_SLICES );
158165
159166 request .indices (RESULTS_INDEX_PATTERN );
160167 BoolQueryBuilder boolQuery = QueryBuilders .boolQuery ().minimumShouldMatch (1 );
161168 boolQuery .must (QueryBuilders .termsQuery (Result .RESULT_TYPE .getPreferredName (),
162169 ForecastRequestStats .RESULT_TYPE_VALUE , Forecast .RESULT_TYPE_VALUE ));
163- for (ForecastRequestStats forecastToDelete : forecastsToDelete ) {
164- boolQuery .should (QueryBuilders .boolQuery ()
165- .must (QueryBuilders .termQuery (Job .ID .getPreferredName (), forecastToDelete .getJobId ()))
166- .must (QueryBuilders .termQuery (Forecast .FORECAST_ID .getPreferredName (), forecastToDelete .getForecastId ())));
170+ for (JobForecastId jobForecastId : ids ) {
171+ if (jobForecastId .hasNullValue () == false ) {
172+ boolQuery .should (QueryBuilders .boolQuery ()
173+ .must (QueryBuilders .termQuery (Job .ID .getPreferredName (), jobForecastId .jobId ))
174+ .must (QueryBuilders .termQuery (Forecast .FORECAST_ID .getPreferredName (), jobForecastId .forecastId )));
175+ }
167176 }
168177 QueryBuilder query = QueryBuilders .boolQuery ().filter (boolQuery );
169178 request .setQuery (query );
@@ -173,4 +182,18 @@ private DeleteByQueryRequest buildDeleteByQuery(List<ForecastRequestStats> forec
173182
174183 return request ;
175184 }
185+
186+ private static class JobForecastId {
187+ private final String jobId ;
188+ private final String forecastId ;
189+
190+ private JobForecastId (String jobId , String forecastId ) {
191+ this .jobId = jobId ;
192+ this .forecastId = forecastId ;
193+ }
194+
195+ boolean hasNullValue () {
196+ return jobId == null || forecastId == null ;
197+ }
198+ }
176199}
0 commit comments