From 61b6cd383cd75ca478101bcb58e576ea534648ca Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Thu, 11 Jan 2024 14:53:25 -0500 Subject: [PATCH] Enhance Point In Time support with APIs to list active point-in-time searches Signed-off-by: Andriy Redko --- .../client/orhlc/OpenSearchRestTemplate.java | 9 ++++++- .../data/core/OpenSearchOperations.java | 16 +++++++++++++ .../core/ElasticsearchIntegrationTests.java | 24 +++++++++++++------ 3 files changed, 41 insertions(+), 8 deletions(-) create mode 100644 spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java index f0ce796..3ec90be 100644 --- a/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/client/orhlc/OpenSearchRestTemplate.java @@ -35,6 +35,7 @@ import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.ListPitInfo; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -45,6 +46,7 @@ import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; import org.opensearch.common.unit.TimeValue; +import org.opensearch.data.core.OpenSearchOperations; import org.opensearch.index.query.MoreLikeThisQueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.reindex.BulkByScrollResponse; @@ -80,7 +82,7 @@ * OpenSearchRestTemplate * @since 0.1 */ -public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate { +public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations { private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class); @@ -468,6 +470,11 @@ public Boolean closePointInTime(String pit) { return false; } + @Override + public List listPointInTime() { + return execute(client -> client.getAllPits(RequestOptions.DEFAULT)).getPitInfos(); + } + public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) { SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index); return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT)); diff --git a/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java new file mode 100644 index 0000000..0688b14 --- /dev/null +++ b/spring-data-opensearch/src/main/java/org/opensearch/data/core/OpenSearchOperations.java @@ -0,0 +1,16 @@ +package org.opensearch.data.core; + +import java.util.List; +import org.opensearch.action.search.ListPitInfo; +import org.springframework.data.elasticsearch.core.ElasticsearchOperations; + +/** + * The extension over {@link ElasticsearchOperations} with OpenSearch specific operations. + */ +public interface OpenSearchOperations extends ElasticsearchOperations { + /** + * Return all active point in time searches + * @return all active point in time searches + */ + List listPointInTime(); +} diff --git a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java index 90ce42a..bcebd1e 100755 --- a/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java +++ b/spring-data-opensearch/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchIntegrationTests.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * https://www.apache.org/licenses/LICENSE-2.0 + * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -40,6 +40,8 @@ import java.util.Map; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -52,6 +54,7 @@ import org.junit.jupiter.api.Test; import org.opensearch.data.client.EnabledIfOpenSearchVersion; import org.opensearch.data.client.orhlc.NativeSearchQueryBuilder; +import org.opensearch.data.core.OpenSearchOperations; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DataAccessException; import org.springframework.dao.InvalidDataAccessApiUsageException; @@ -129,7 +132,7 @@ public abstract class ElasticsearchIntegrationTests { private static final String MULTI_INDEX_2_NAME = MULTI_INDEX_PREFIX + "-2"; private static final String MULTI_INDEX_3_NAME = MULTI_INDEX_PREFIX + "-3"; - @Autowired protected ElasticsearchOperations operations; + @Autowired protected OpenSearchOperations operations; private IndexOperations indexOperations; @Autowired protected IndexNameProvider indexNameProvider; @@ -2746,16 +2749,23 @@ public void testPointInTimeKeepAliveExpired() throws InterruptedException { SearchHits results = operations.search(query,SampleEntity.class); assertThat(results.getSearchHits().size()).isEqualTo(2); - // There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits - Thread.sleep(120000); final Query searchAfterQuery = getBuilderWithMatchAllQuery() // .withSort(Sort.by(Sort.Order.desc("message"))) // .withPointInTime(qpit) .withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage()))) .build(); - assertThatExceptionOfType(UncategorizedElasticsearchException.class).isThrownBy( - ()-> operations.search(searchAfterQuery,SampleEntity.class) - ); + + final long started = System.nanoTime(); + while ((System.nanoTime() - started) < TimeUnit.SECONDS.toNanos(120)) { + LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1)); + if (operations.listPointInTime().isEmpty()) { + break; + } + } + + assertThatExceptionOfType(UncategorizedElasticsearchException.class) + .isThrownBy(()-> operations.search(searchAfterQuery,SampleEntity.class)); + Boolean pitResult = operations.closePointInTime(pit); Assertions.assertTrue(pitResult); }