Skip to content

Commit

Permalink
Enhance Point In Time support with APIs to list active point-in-time …
Browse files Browse the repository at this point in the history
…searches

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
reta committed Jan 11, 2024
1 parent fc05e07 commit 61b6cd3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand All @@ -45,6 +46,7 @@
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.data.core.OpenSearchOperations;
import org.opensearch.index.query.MoreLikeThisQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
Expand Down Expand Up @@ -80,7 +82,7 @@
* OpenSearchRestTemplate
* @since 0.1
*/
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate {
public class OpenSearchRestTemplate extends AbstractElasticsearchTemplate implements OpenSearchOperations {

private static final Log LOGGER = LogFactory.getLog(OpenSearchRestTemplate.class);

Expand Down Expand Up @@ -468,6 +470,11 @@ public Boolean closePointInTime(String pit) {
return false;
}

@Override
public List<ListPitInfo> listPointInTime() {
return execute(client -> client.getAllPits(RequestOptions.DEFAULT)).getPitInfos();
}

public SearchResponse suggest(SuggestBuilder suggestion, IndexCoordinates index) {
SearchRequest searchRequest = requestFactory.searchRequest(suggestion, index);
return execute(client -> client.search(searchRequest, RequestOptions.DEFAULT));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.data.core;

import java.util.List;
import org.opensearch.action.search.ListPitInfo;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

/**
* The extension over {@link ElasticsearchOperations} with OpenSearch specific operations.
*/
public interface OpenSearchOperations extends ElasticsearchOperations {
/**
* Return all active point in time searches
* @return all active point in time searches
*/
List<ListPitInfo> listPointInTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand Down Expand Up @@ -40,6 +40,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -52,6 +54,7 @@
import org.junit.jupiter.api.Test;
import org.opensearch.data.client.EnabledIfOpenSearchVersion;
import org.opensearch.data.client.orhlc.NativeSearchQueryBuilder;
import org.opensearch.data.core.OpenSearchOperations;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
Expand Down Expand Up @@ -129,7 +132,7 @@ public abstract class ElasticsearchIntegrationTests {
private static final String MULTI_INDEX_2_NAME = MULTI_INDEX_PREFIX + "-2";
private static final String MULTI_INDEX_3_NAME = MULTI_INDEX_PREFIX + "-3";

@Autowired protected ElasticsearchOperations operations;
@Autowired protected OpenSearchOperations operations;
private IndexOperations indexOperations;

@Autowired protected IndexNameProvider indexNameProvider;
Expand Down Expand Up @@ -2746,16 +2749,23 @@ public void testPointInTimeKeepAliveExpired() throws InterruptedException {
SearchHits<SampleEntity> results = operations.search(query,SampleEntity.class);
assertThat(results.getSearchHits().size()).isEqualTo(2);

// There may be a better way to do it, but Opensearch by default waits for up-to a minute to clear expired pits
Thread.sleep(120000);
final Query searchAfterQuery = getBuilderWithMatchAllQuery() //
.withSort(Sort.by(Sort.Order.desc("message"))) //
.withPointInTime(qpit)
.withSearchAfter(List.of(Objects.requireNonNull(results.getSearchHit(1).getContent().getMessage())))
.build();
assertThatExceptionOfType(UncategorizedElasticsearchException.class).isThrownBy(
()-> operations.search(searchAfterQuery,SampleEntity.class)
);

final long started = System.nanoTime();
while ((System.nanoTime() - started) < TimeUnit.SECONDS.toNanos(120)) {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
if (operations.listPointInTime().isEmpty()) {
break;
}
}

assertThatExceptionOfType(UncategorizedElasticsearchException.class)
.isThrownBy(()-> operations.search(searchAfterQuery,SampleEntity.class));

Boolean pitResult = operations.closePointInTime(pit);
Assertions.assertTrue(pitResult);
}
Expand Down

0 comments on commit 61b6cd3

Please sign in to comment.