Skip to content

Commit 0c1a170

Browse files
committed
Modifying derived source related ITs in RelocationIT to make it more deterministic
Signed-off-by: Tanik Pansuriya <panbhai@amazon.com>
1 parent 0257190 commit 0c1a170

File tree

2 files changed

+29
-78
lines changed

2 files changed

+29
-78
lines changed

CHANGELOG.md

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,43 +45,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4545
- Added approximation support for range queries with now in date field ([#18511](https://github.com/opensearch-project/OpenSearch/pull/18511))
4646
- Upgrade to protobufs 0.6.0 and clean up deprecated TermQueryProtoUtils code ([#18880](https://github.com/opensearch-project/OpenSearch/pull/18880))
4747
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
48-
- Add support for linux riscv64 platform ([#18156](https://github.com/opensearch-project/OpenSearch/pull/18156))
49-
- [Rule based auto-tagging] Add get rule API ([#17336](https://github.com/opensearch-project/OpenSearch/pull/17336))
50-
- [Rule based auto-tagging] Add Delete Rule API ([#18184](https://github.com/opensearch-project/OpenSearch/pull/18184))
51-
- Add paginated wlm/stats API ([#17638](https://github.com/opensearch-project/OpenSearch/pull/17638))
52-
- [Rule based auto-tagging] Add Create rule API ([#17792](https://github.com/opensearch-project/OpenSearch/pull/17792))
53-
- Implement parallel shard refresh behind cluster settings ([#17782](https://github.com/opensearch-project/OpenSearch/pull/17782))
54-
- Bump OpenSearch Core main branch to 3.0.0 ([#18039](https://github.com/opensearch-project/OpenSearch/pull/18039))
55-
- [Rule based Auto-tagging] Add wlm `ActionFilter` ([#17791](https://github.com/opensearch-project/OpenSearch/pull/17791))
56-
- Update API of Message in index to add the timestamp for lag calculation in ingestion polling ([#17977](https://github.com/opensearch-project/OpenSearch/pull/17977/))
57-
- Add Warm Disk Threshold Allocation Decider for Warm shards ([#18082](https://github.com/opensearch-project/OpenSearch/pull/18082))
58-
- Add composite directory factory ([#17988](https://github.com/opensearch-project/OpenSearch/pull/17988))
59-
- [Rule based auto-tagging] Add refresh based synchronization service for `Rule`s ([#18128](https://github.com/opensearch-project/OpenSearch/pull/18128))
60-
- Add pull-based ingestion error metrics and make internal queue size configurable ([#18088](https://github.com/opensearch-project/OpenSearch/pull/18088))
61-
- [Derive Source] Adding support for derive source feature and implementing it for various type of field mappers ([#17759](https://github.com/opensearch-project/OpenSearch/pull/17759))
62-
- [Derive Source] Adding integration of derived source feature across diff paths ([#18054](https://github.com/opensearch-project/OpenSearch/pull/18054))
63-
- [Security Manager Replacement] Enhance Java Agent to intercept newByteChannel ([#17989](https://github.com/opensearch-project/OpenSearch/pull/17989))
64-
- Enabled Async Shard Batch Fetch by default ([#18139](https://github.com/opensearch-project/OpenSearch/pull/18139))
65-
- Allow to get the search request from the QueryCoordinatorContext ([#17818](https://github.com/opensearch-project/OpenSearch/pull/17818))
66-
- Reject close index requests, while remote store migration is in progress.([#18327](https://github.com/opensearch-project/OpenSearch/pull/18327))
67-
- Improve sort-query performance by retaining the default `totalHitsThreshold` for approximated `match_all` queries ([#18189](https://github.com/opensearch-project/OpenSearch/pull/18189))
68-
- Enable testing for ExtensiblePlugins using classpath plugins ([#16908](https://github.com/opensearch-project/OpenSearch/pull/16908))
69-
- Introduce system generated ingest pipeline ([#17817](https://github.com/opensearch-project/OpenSearch/pull/17817)))
70-
- Apply cluster state metadata and routing table diff when building cluster state from remote([#18256](https://github.com/opensearch-project/OpenSearch/pull/18256))
71-
- Support create mode in pull-based ingestion and add retries for transient failures ([#18250](https://github.com/opensearch-project/OpenSearch/pull/18250)))
72-
- Decouple the init of Crypto Plugin and KeyProvider in CryptoRegistry ([18270](https://github.com/opensearch-project/OpenSearch/pull18270)))
73-
- Support cluster write block in pull-based ingestion ([#18280](https://github.com/opensearch-project/OpenSearch/pull/18280)))
74-
- Use QueryCoordinatorContext for the rewrite in validate API. ([#18272](https://github.com/opensearch-project/OpenSearch/pull/18272))
75-
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
76-
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
77-
- Add BooleanQuery rewrite for must_not RangeQuery clauses ([#17655](https://github.com/opensearch-project/OpenSearch/pull/17655))
78-
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))
79-
- Optimize gRPC perf by passing by reference ([#18303](https://github.com/opensearch-project/OpenSearch/pull/18303))
80-
- Added File Cache Stats - Involves Block level as well as full file level stats ([#17538](https://github.com/opensearch-project/OpenSearch/issues/17479))
81-
- Added time_in_execution attribute to /_cluster/pending_tasks response ([#17780](https://github.com/opensearch-project/OpenSearch/pull/17780))
82-
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
83-
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
84-
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
8548
- [Derived Source] Add integration of derived source feature across various paths like get/search/recovery ([#18565](https://github.com/opensearch-project/OpenSearch/pull/18565))
8649

8750
### Changed

server/src/internalClusterTest/java/org/opensearch/recovery/RelocationIT.java

Lines changed: 29 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -97,14 +97,14 @@
9797
import java.util.ArrayList;
9898
import java.util.Arrays;
9999
import java.util.Collection;
100+
import java.util.HashSet;
100101
import java.util.List;
101102
import java.util.Map;
102103
import java.util.Set;
103104
import java.util.concurrent.CountDownLatch;
104105
import java.util.concurrent.Semaphore;
105106
import java.util.concurrent.TimeUnit;
106107
import java.util.concurrent.atomic.AtomicBoolean;
107-
import java.util.concurrent.atomic.AtomicInteger;
108108
import java.util.stream.Collectors;
109109
import java.util.stream.Stream;
110110

@@ -916,24 +916,23 @@ public void testRelocationWithDerivedSourceAndConcurrentIndexing() throws Except
916916
).setMapping(mapping)
917917
);
918918

919-
// Start background indexing
920-
AtomicBoolean stopIndexing = new AtomicBoolean(false);
921-
AtomicInteger docCount = new AtomicInteger(0);
919+
// Start indexing
920+
int docCount = randomIntBetween(10, 100);
921+
CountDownLatch docCountLatch = new CountDownLatch(docCount);
922922
Thread indexingThread = new Thread(() -> {
923-
while (stopIndexing.get() == false) {
923+
while (docCountLatch.getCount() > 0) {
924924
try {
925-
int id = docCount.incrementAndGet();
925+
long id = docCountLatch.getCount();
926926
client().prepareIndex("test").setId(String.valueOf(id)).setSource("name", "test" + id, "value", id).get();
927+
docCountLatch.countDown();
927928
Thread.sleep(10); // Small delay to prevent overwhelming
928929
} catch (Exception e) {
929930
logger.error("Error in background indexing", e);
930931
}
931932
}
932933
});
933-
indexingThread.start();
934934

935-
// Let it index some documents
936-
Thread.sleep(2000);
935+
indexingThread.start();
937936

938937
// Start relocation
939938
String node2 = internalCluster().startNode();
@@ -943,20 +942,18 @@ public void testRelocationWithDerivedSourceAndConcurrentIndexing() throws Except
943942
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node1, node2)).get();
944943
ensureGreen(TimeValue.timeValueMinutes(2));
945944

946-
// Stop indexing
947-
stopIndexing.set(true);
945+
// Let it index docCount documents
946+
docCountLatch.await();
948947
indexingThread.join();
949948

950-
// Verify all documents
951-
int finalDocCount = docCount.get();
952949
assertBusy(() -> {
953950
refresh();
954951
SearchResponse response = client().prepareSearch("test")
955952
.setQuery(matchAllQuery())
956-
.setSize(finalDocCount)
953+
.setSize(docCount)
957954
.addSort("value", SortOrder.ASC)
958955
.get();
959-
assertHitCount(response, finalDocCount);
956+
assertHitCount(response, docCount);
960957

961958
int expectedId = 1;
962959
for (SearchHit hit : response.getHits()) {
@@ -1006,37 +1003,28 @@ public void testRelocationWithDerivedSourceWithUpdates() throws Exception {
10061003
logger.info("--> relocate the shard from node1 to node2");
10071004
client().admin().cluster().prepareReroute().add(new MoveAllocationCommand("test", 0, node1, node2)).get();
10081005

1009-
AtomicBoolean stop = new AtomicBoolean(false);
1006+
final Set<Integer> docsToUpdate = new HashSet<>();
10101007
final Set<Integer> updatedDocs = ConcurrentCollections.newConcurrentSet();
1008+
int updateCount = randomIntBetween(10, numDocs / 2);
1009+
for (int i = 0; i < updateCount; i++) {
1010+
docsToUpdate.add(randomIntBetween(0, numDocs - 1));
1011+
}
10111012

1012-
// Start doc update thread
1013-
Thread updateThread = new Thread(() -> {
1014-
while (stop.get() == false) {
1015-
try {
1016-
int docId = randomIntBetween(0, numDocs - 1);
1017-
client().prepareUpdate("test", String.valueOf(docId))
1018-
.setRetryOnConflict(3)
1019-
.setDoc("value", docId * 2)
1020-
.execute()
1021-
.actionGet();
1022-
updatedDocs.add(docId);
1023-
Thread.sleep(10);
1024-
} catch (Exception e) {
1025-
if (e instanceof InterruptedException) {
1026-
break;
1027-
}
1028-
logger.warn("Error in update thread", e);
1029-
}
1013+
docsToUpdate.stream().forEach(docId -> {
1014+
try {
1015+
client().prepareUpdate("test", String.valueOf(docId))
1016+
.setRetryOnConflict(3)
1017+
.setDoc("value", docId * 2)
1018+
.execute()
1019+
.actionGet();
1020+
updatedDocs.add(docId);
1021+
Thread.sleep(10);
1022+
} catch (Exception e) {
1023+
logger.warn("Error while updating doc with id = {}", docId, e);
10301024
}
10311025
});
1032-
updateThread.start();
1033-
1034-
// Wait for some updates to occur
1035-
Thread.sleep(2000);
1036-
1037-
stop.set(true);
1038-
updateThread.join();
10391026

1027+
assertEquals(docsToUpdate.size(), updatedDocs.size());
10401028
refresh("test");
10411029
ensureGreen(TimeValue.timeValueMinutes(2));
10421030

0 commit comments

Comments
 (0)