Skip to content

Commit fa5481c

Browse files
committed
Merge remote-tracking branch 'origin/main' into dev/support_merged_segment_warmer
# Conflicts: # CHANGELOG.md # server/src/main/java/org/opensearch/common/util/FeatureFlags.java
2 parents a6157f0 + 1be3e46 commit fa5481c

File tree

44 files changed

+355
-221
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+355
-221
lines changed

.github/workflows/gradle-check.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
- uses: actions/checkout@v4
2525
- name: Get changed files
2626
id: changed-files-specific
27-
uses: tj-actions/changed-files@v46.0.4
27+
uses: tj-actions/changed-files@v46.0.5
2828
with:
2929
files_ignore: |
3030
release-notes/*.md

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4040
- Add TermsQuery support to Search GRPC endpoint ([#17888](https://github.com/opensearch-project/OpenSearch/pull/17888))
4141
- Support sub agg in filter rewrite optimization ([#17447](https://github.com/opensearch-project/OpenSearch/pull/17447)
4242
- Disable scoring of keyword term search by default, fallback logic with new use_similarity:true parameter ([#17889](https://github.com/opensearch-project/OpenSearch/pull/17889))
43+
- Add versioning support in pull-based ingestion ([#17918](https://github.com/opensearch-project/OpenSearch/pull/17918))
4344
- Introducing MergedSegmentWarmerFactory to support the extension of IndexWriter.IndexReaderWarmer ([#17881](https://github.com/opensearch-project/OpenSearch/pull/17881))
4445

4546
### Changed
@@ -51,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
5152
- Avoid skewed segment replication lag metric ([#17831](https://github.com/opensearch-project/OpenSearch/pull/17831))
5253
- Increase the default segment counter step size when replica promoting ([#17568](https://github.com/opensearch-project/OpenSearch/pull/17568))
5354
- [WLM] Rename QueryGroup to WorkloadGroup ([#17901](https://github.com/opensearch-project/OpenSearch/pull/17901))
55+
- Relaxes jarHell check for optionally extended plugins([#17893](https://github.com/opensearch-project/OpenSearch/pull/17893)))
5456

5557
### Dependencies
5658
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
@@ -60,8 +62,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6062
- Bump `dangoslen/dependabot-changelog-helper` from 3 to 4 ([#17498](https://github.com/opensearch-project/OpenSearch/pull/17498))
6163
- Bump `com.google.api:gax` from 2.35.0 to 2.63.1 ([#17465](https://github.com/opensearch-project/OpenSearch/pull/17465))
6264
- Bump `com.azure:azure-storage-blob` from 12.29.1 to 12.30.0 ([#17667](https://github.com/opensearch-project/OpenSearch/pull/17667))
63-
- Bump `tj-actions/changed-files` from 46.0.1 to 46.0.4 ([#17666](https://github.com/opensearch-project/OpenSearch/pull/17666), [#17813](https://github.com/opensearch-project/OpenSearch/pull/17813))
64-
- Bump `com.google.code.gson:gson` from 2.11.0 to 2.12.1 ([#17668](https://github.com/opensearch-project/OpenSearch/pull/17668))
65+
- Bump `tj-actions/changed-files` from 46.0.1 to 46.0.5 ([#17666](https://github.com/opensearch-project/OpenSearch/pull/17666), [#17813](https://github.com/opensearch-project/OpenSearch/pull/17813), [#17920](https://github.com/opensearch-project/OpenSearch/pull/17920))
66+
- Bump `com.google.code.gson:gson` from 2.11.0 to 2.13.0 ([#17668](https://github.com/opensearch-project/OpenSearch/pull/17668), [#17921](https://github.com/opensearch-project/OpenSearch/pull/17921)), [#17926](https://github.com/opensearch-project/OpenSearch/pull/17926))
6567
- Bump `com.github.luben:zstd-jni` from 1.5.5-1 to 1.5.6-1 ([#17674](https://github.com/opensearch-project/OpenSearch/pull/17674))
6668
- Bump `lycheeverse/lychee-action` from 2.3.0 to 2.4.0 ([#17731](https://github.com/opensearch-project/OpenSearch/pull/17731))
6769
- Bump `com.netflix.nebula.ospackage-base` from 11.11.1 to 11.11.2 ([#17734](https://github.com/opensearch-project/OpenSearch/pull/17734))

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ protected void produceData(String id, String name, String age, long timestamp, S
108108
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
109109
}
110110

111+
protected void produceDataWithExternalVersion(String id, long version, String name, String age, long timestamp, String opType) {
112+
String payload = String.format(
113+
Locale.ROOT,
114+
"{\"_id\":\"%s\", \"_version\":\"%d\", \"_op_type\":\"%s\",\"_source\":{\"name\":\"%s\", \"age\": %s}}",
115+
id,
116+
version,
117+
opType,
118+
name,
119+
age
120+
);
121+
producer.send(new ProducerRecord<>(topicName, null, timestamp, "null", payload));
122+
}
123+
111124
protected void produceData(String payload) {
112125
producer.send(new ProducerRecord<>(topicName, null, defaultMessageTimestamp, "null", payload));
113126
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import org.opensearch.cluster.metadata.IndexMetadata;
2020
import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
2121
import org.opensearch.common.settings.Settings;
22+
import org.opensearch.index.query.BoolQueryBuilder;
2223
import org.opensearch.index.query.RangeQueryBuilder;
24+
import org.opensearch.index.query.TermQueryBuilder;
2325
import org.opensearch.test.InternalTestCluster;
2426
import org.opensearch.test.OpenSearchIntegTestCase;
2527
import org.opensearch.transport.client.Requests;
@@ -310,6 +312,122 @@ public void testPaginatedGetIngestionState() throws ExecutionException, Interrup
310312
}));
311313
}
312314

315+
public void testExternalVersioning() throws Exception {
316+
// setup nodes and index
317+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
318+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
319+
internalCluster().startClusterManagerOnlyNode();
320+
final String nodeA = internalCluster().startDataOnlyNode();
321+
final String nodeB = internalCluster().startDataOnlyNode();
322+
323+
createIndexWithDefaultSettings(1, 1);
324+
ensureGreen(indexName);
325+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
326+
327+
// validate next version docs get indexed
328+
produceDataWithExternalVersion("1", 2, "name1", "30", defaultMessageTimestamp, "index");
329+
produceDataWithExternalVersion("2", 2, "name2", "30", defaultMessageTimestamp, "index");
330+
waitForState(() -> {
331+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
332+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
333+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
334+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
335+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
336+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
337+
return 30 == (Integer) response1.getHits().getHits()[0].getSourceAsMap().get("age")
338+
&& 30 == (Integer) response2.getHits().getHits()[0].getSourceAsMap().get("age");
339+
});
340+
341+
// test out-of-order updates
342+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
343+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
344+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
345+
waitForSearchableDocs(3, Arrays.asList(nodeA, nodeB));
346+
347+
BoolQueryBuilder query1 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 1));
348+
SearchResponse response1 = client().prepareSearch(indexName).setQuery(query1).get();
349+
assertThat(response1.getHits().getTotalHits().value(), is(1L));
350+
assertEquals(30, response1.getHits().getHits()[0].getSourceAsMap().get("age"));
351+
352+
BoolQueryBuilder query2 = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 2));
353+
SearchResponse response2 = client().prepareSearch(indexName).setQuery(query2).get();
354+
assertThat(response2.getHits().getTotalHits().value(), is(1L));
355+
assertEquals(30, response2.getHits().getHits()[0].getSourceAsMap().get("age"));
356+
357+
// test deletes with smaller version
358+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "delete");
359+
produceDataWithExternalVersion("4", 1, "name4", "25", defaultMessageTimestamp, "index");
360+
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
361+
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(23);
362+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
363+
assertThat(response.getHits().getTotalHits().value(), is(4L));
364+
365+
// test deletes with correct version
366+
produceDataWithExternalVersion("1", 3, "name1", "30", defaultMessageTimestamp, "delete");
367+
produceDataWithExternalVersion("2", 3, "name2", "30", defaultMessageTimestamp, "delete");
368+
waitForState(() -> {
369+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
370+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
371+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
372+
return true;
373+
});
374+
}
375+
376+
public void testExternalVersioningWithDisabledGCDeletes() throws Exception {
377+
// setup nodes and index
378+
internalCluster().startClusterManagerOnlyNode();
379+
final String nodeA = internalCluster().startDataOnlyNode();
380+
final String nodeB = internalCluster().startDataOnlyNode();
381+
382+
createIndex(
383+
indexName,
384+
Settings.builder()
385+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
386+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
387+
.put("ingestion_source.type", "kafka")
388+
.put("ingestion_source.pointer.init.reset", "earliest")
389+
.put("ingestion_source.param.topic", topicName)
390+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
391+
.put("index.replication.type", "SEGMENT")
392+
.put("index.gc_deletes", "0")
393+
.build(),
394+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
395+
);
396+
397+
// insert documents
398+
produceDataWithExternalVersion("1", 1, "name1", "25", defaultMessageTimestamp, "index");
399+
produceDataWithExternalVersion("2", 1, "name2", "25", defaultMessageTimestamp, "index");
400+
waitForState(() -> {
401+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(23);
402+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
403+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
404+
return true;
405+
});
406+
407+
// delete documents 1 and 2
408+
produceDataWithExternalVersion("1", 2, "name1", "25", defaultMessageTimestamp, "delete");
409+
produceDataWithExternalVersion("2", 2, "name2", "25", defaultMessageTimestamp, "delete");
410+
produceDataWithExternalVersion("3", 1, "name3", "25", defaultMessageTimestamp, "index");
411+
waitForState(() -> {
412+
BoolQueryBuilder query = new BoolQueryBuilder().must(new TermQueryBuilder("_id", 3));
413+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
414+
assertThat(response.getHits().getTotalHits().value(), is(1L));
415+
return 25 == (Integer) response.getHits().getHits()[0].getSourceAsMap().get("age");
416+
});
417+
waitForSearchableDocs(1, Arrays.asList(nodeA, nodeB));
418+
419+
// validate index operation with lower version creates new document
420+
produceDataWithExternalVersion("1", 1, "name1", "35", defaultMessageTimestamp, "index");
421+
produceDataWithExternalVersion("4", 1, "name4", "35", defaultMessageTimestamp, "index");
422+
waitForState(() -> {
423+
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("age").gte(34);
424+
SearchResponse rangeQueryResponse = client().prepareSearch(indexName).setQuery(rangeQuery).get();
425+
assertThat(rangeQueryResponse.getHits().getTotalHits().value(), is(2L));
426+
return true;
427+
});
428+
429+
}
430+
313431
private void verifyRemoteStoreEnabled(String node) {
314432
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
315433
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

plugins/repository-gcs/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ dependencies {
6666
api 'com.google.cloud:google-cloud-core-http:2.47.0'
6767
api 'com.google.cloud:google-cloud-storage:1.113.1'
6868

69-
api 'com.google.code.gson:gson:2.12.1'
69+
api 'com.google.code.gson:gson:2.13.0'
7070

7171
runtimeOnly "com.google.guava:guava:${versions.guava}"
7272
api 'com.google.guava:failureaccess:1.0.1'

plugins/repository-gcs/licenses/gson-2.12.1.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
111ac98ad3d2d099d81d53b0549748144a8d2659

plugins/repository-hdfs/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ dependencies {
6767
api 'org.apache.htrace:htrace-core4:4.2.0-incubating'
6868
api "org.apache.logging.log4j:log4j-core:${versions.log4j}"
6969
api 'org.apache.avro:avro:1.12.0'
70-
api 'com.google.code.gson:gson:2.12.1'
70+
api 'com.google.code.gson:gson:2.13.0'
7171
runtimeOnly "com.google.guava:guava:${versions.guava}"
7272
api "commons-logging:commons-logging:${versions.commonslogging}"
7373
api 'commons-cli:commons-cli:1.9.0'

plugins/repository-hdfs/licenses/gson-2.12.1.jar.sha1

Lines changed: 0 additions & 1 deletion
This file was deleted.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
111ac98ad3d2d099d81d53b0549748144a8d2659

0 commit comments

Comments
 (0)