Skip to content

Commit fe61ff2

Browse files
bowenlan-amznrishabhmauryaharshavamsi
authored andcommitted
Streaming Aggregation (opensearch-project#18874)
* Streaming Aggregation - query param 'stream' in rest search action and stored in search request - On coordinator, we uses stream search transport action and search async action uses the new stream callback - On data node, stream transport action pass stream search flag to search context for shard search, aggregation - Reduce context has stream flag from search request - Sync onPhaseDone for both result consumption callbacks, shard and stream - Result consumer separation between stream and shard - Data node aggregation stream segment aggregation results back, and complete stream by shard result. - Optimize memory usage on coordinator - `reduce size = shard_number * ((1.5 * size) + 10)` (needs improve, big accuracy problem) - Remove the unnecessary memory allocation for handling sub aggregation when no sub aggregation exists - Only allocate doc counts per segment in Terms Bucket Aggregator - Remove the priority queue from Terms Bucket Aggregator, return all buckets in build aggregation - Stream search callback - Stream channel listener - Enable c2 compiler for local gradlew run - Disable filter optimization * Add mock stream transport for testing Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * innerOnResponse delegate to innerOnCompleteResponse for compatibility Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor the streaming interface for streaming search Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Separating out stream from regular Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> * Fix aggregator and split sendBatch Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> * refactor and fix some bugs Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * buildAggBatch return list of internal aggregations Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * batch reduce size for stream search Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Remove stream execution hint Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Clean up InternalTerms Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Clean up Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Refactor duplication in search service Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Update change log Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * clean up Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Add tests for StreamingStringTermsAggregator and SendBatch Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * Clean up and address comments Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * experimental api annotation Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * change sendBatch to package private Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> * add type Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> --------- Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com> Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com> Co-authored-by: Rishabh Maurya <rishabhmaurya05@gmail.com> Co-authored-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
1 parent 8ab7da7 commit fe61ff2

File tree

42 files changed

+4169
-114
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+4169
-114
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
.claude
2+
CLAUDE.md
3+
.cursor*
14

25
# intellij files
36
.idea/
@@ -64,4 +67,4 @@ testfixtures_shared/
6467
.ci/jobs/
6568

6669
# build files generated
67-
doc-tools/missing-doclet/bin/
70+
doc-tools/missing-doclet/bin/

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4848
- Prevent shard initialization failure due to streaming consumer errors ([#18877](https://github.com/opensearch-project/OpenSearch/pull/18877))
4949
- APIs for stream transport and new stream-based search api action ([#18722](https://github.com/opensearch-project/OpenSearch/pull/18722))
5050
- Added the core process for warming merged segments in remote-store enabled domains ([#18683](https://github.com/opensearch-project/OpenSearch/pull/18683))
51+
- Streaming aggregation ([#18874](https://github.com/opensearch-project/OpenSearch/pull/18874))
5152
- Optimize Composite Aggregations by removing unnecessary object allocations ([#18531](https://github.com/opensearch-project/OpenSearch/pull/18531))
5253
- [Star-Tree] Add search support for ip field type ([#18671](https://github.com/opensearch-project/OpenSearch/pull/18671))
5354

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.streaming.aggregation;
10+
11+
import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
12+
13+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
14+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
15+
import org.opensearch.action.admin.indices.flush.FlushRequest;
16+
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
17+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
18+
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
19+
import org.opensearch.action.bulk.BulkRequest;
20+
import org.opensearch.action.bulk.BulkResponse;
21+
import org.opensearch.action.index.IndexRequest;
22+
import org.opensearch.action.search.SearchResponse;
23+
import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
24+
import org.opensearch.common.action.ActionFuture;
25+
import org.opensearch.common.settings.Settings;
26+
import org.opensearch.common.unit.TimeValue;
27+
import org.opensearch.common.xcontent.XContentType;
28+
import org.opensearch.plugins.Plugin;
29+
import org.opensearch.search.SearchHit;
30+
import org.opensearch.search.aggregations.AggregationBuilders;
31+
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
32+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
33+
import org.opensearch.search.aggregations.metrics.Max;
34+
import org.opensearch.test.OpenSearchIntegTestCase;
35+
import org.opensearch.test.ParameterizedDynamicSettingsOpenSearchIntegTestCase;
36+
37+
import java.util.Arrays;
38+
import java.util.Collection;
39+
import java.util.Collections;
40+
import java.util.Comparator;
41+
import java.util.List;
42+
43+
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
44+
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
45+
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
46+
47+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 3, maxNumDataNodes = 3)
48+
public class SubAggregationIT extends ParameterizedDynamicSettingsOpenSearchIntegTestCase {
49+
50+
public SubAggregationIT(Settings dynamicSettings) {
51+
super(dynamicSettings);
52+
}
53+
54+
@ParametersFactory
55+
public static Collection<Object[]> parameters() {
56+
return Arrays.asList(
57+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() },
58+
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() }
59+
);
60+
}
61+
62+
static final int NUM_SHARDS = 3;
63+
static final int MIN_SEGMENTS_PER_SHARD = 3;
64+
65+
@Override
66+
protected Collection<Class<? extends Plugin>> nodePlugins() {
67+
return Collections.singleton(FlightStreamPlugin.class);
68+
}
69+
70+
@Override
71+
public void setUp() throws Exception {
72+
super.setUp();
73+
internalCluster().ensureAtLeastNumDataNodes(3);
74+
75+
Settings indexSettings = Settings.builder()
76+
.put("index.number_of_shards", NUM_SHARDS) // Number of primary shards
77+
.put("index.number_of_replicas", 0) // Number of replica shards
78+
.put("index.search.concurrent_segment_search.mode", "none")
79+
// Disable segment merging to keep individual segments
80+
.put("index.merge.policy.max_merged_segment", "1kb") // Keep segments small
81+
.put("index.merge.policy.segments_per_tier", "20") // Allow many segments per tier
82+
.put("index.merge.scheduler.max_thread_count", "1") // Limit merge threads
83+
.build();
84+
85+
CreateIndexRequest createIndexRequest = new CreateIndexRequest("index").settings(indexSettings);
86+
createIndexRequest.mapping(
87+
"{\n"
88+
+ " \"properties\": {\n"
89+
+ " \"field1\": { \"type\": \"keyword\" },\n"
90+
+ " \"field2\": { \"type\": \"integer\" }\n"
91+
+ " }\n"
92+
+ "}",
93+
XContentType.JSON
94+
);
95+
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest).actionGet();
96+
assertTrue(createIndexResponse.isAcknowledged());
97+
client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(30)).get();
98+
BulkRequest bulkRequest = new BulkRequest();
99+
100+
// We'll create 3 segments per shard by indexing docs into each segment and forcing a flush
101+
// Segment 1 - we'll add docs with field2 values in 1-3 range
102+
for (int i = 0; i < 10; i++) {
103+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 1));
104+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 2));
105+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 3));
106+
}
107+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
108+
assertFalse(bulkResponse.hasFailures()); // Verify ingestion was successful
109+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
110+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
111+
112+
// Segment 2 - we'll add docs with field2 values in 11-13 range
113+
bulkRequest = new BulkRequest();
114+
for (int i = 0; i < 10; i++) {
115+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 11));
116+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 12));
117+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 13));
118+
}
119+
bulkResponse = client().bulk(bulkRequest).actionGet();
120+
assertFalse(bulkResponse.hasFailures());
121+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
122+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
123+
124+
// Segment 3 - we'll add docs with field2 values in 21-23 range
125+
bulkRequest = new BulkRequest();
126+
for (int i = 0; i < 10; i++) {
127+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 21));
128+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 22));
129+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 23));
130+
}
131+
bulkResponse = client().bulk(bulkRequest).actionGet();
132+
assertFalse(bulkResponse.hasFailures());
133+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
134+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
135+
136+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
137+
ensureSearchable("index");
138+
139+
// Verify that we have the expected number of shards and segments
140+
IndicesSegmentResponse segmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest("index")).actionGet();
141+
assertEquals(NUM_SHARDS, segmentResponse.getIndices().get("index").getShards().size());
142+
143+
// Verify each shard has at least MIN_SEGMENTS_PER_SHARD segments
144+
segmentResponse.getIndices().get("index").getShards().values().forEach(indexShardSegments -> {
145+
assertTrue(
146+
"Expected at least "
147+
+ MIN_SEGMENTS_PER_SHARD
148+
+ " segments but found "
149+
+ indexShardSegments.getShards()[0].getSegments().size(),
150+
indexShardSegments.getShards()[0].getSegments().size() >= MIN_SEGMENTS_PER_SHARD
151+
);
152+
});
153+
}
154+
155+
@LockFeatureFlag(STREAM_TRANSPORT)
156+
public void testStreamingAggregation() throws Exception {
157+
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
158+
TermsAggregationBuilder agg = terms("agg1").field("field1").subAggregation(AggregationBuilders.max("agg2").field("field2"));
159+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
160+
.addAggregation(agg)
161+
.setSize(0)
162+
.setRequestCache(false)
163+
.execute();
164+
SearchResponse resp = future.actionGet();
165+
assertNotNull(resp);
166+
assertEquals(NUM_SHARDS, resp.getTotalShards());
167+
assertEquals(90, resp.getHits().getTotalHits().value());
168+
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
169+
List<StringTerms.Bucket> buckets = agg1.getBuckets();
170+
assertEquals(3, buckets.size());
171+
172+
// Validate all buckets - each should have 30 documents
173+
for (StringTerms.Bucket bucket : buckets) {
174+
assertEquals(30, bucket.getDocCount());
175+
assertNotNull(bucket.getAggregations().get("agg2"));
176+
}
177+
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));
178+
179+
StringTerms.Bucket bucket1 = buckets.get(0);
180+
assertEquals("value1", bucket1.getKeyAsString());
181+
assertEquals(30, bucket1.getDocCount());
182+
Max maxAgg1 = (Max) bucket1.getAggregations().get("agg2");
183+
assertEquals(21.0, maxAgg1.getValue(), 0.001);
184+
185+
StringTerms.Bucket bucket2 = buckets.get(1);
186+
assertEquals("value2", bucket2.getKeyAsString());
187+
assertEquals(30, bucket2.getDocCount());
188+
Max maxAgg2 = (Max) bucket2.getAggregations().get("agg2");
189+
assertEquals(22.0, maxAgg2.getValue(), 0.001);
190+
191+
StringTerms.Bucket bucket3 = buckets.get(2);
192+
assertEquals("value3", bucket3.getKeyAsString());
193+
assertEquals(30, bucket3.getDocCount());
194+
Max maxAgg3 = (Max) bucket3.getAggregations().get("agg2");
195+
assertEquals(23.0, maxAgg3.getValue(), 0.001);
196+
197+
for (SearchHit hit : resp.getHits().getHits()) {
198+
assertNotNull(hit.getSourceAsString());
199+
}
200+
}
201+
202+
@LockFeatureFlag(STREAM_TRANSPORT)
203+
public void testStreamingAggregationTerm() throws Exception {
204+
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
205+
TermsAggregationBuilder agg = terms("agg1").field("field1");
206+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
207+
.addAggregation(agg)
208+
.setSize(0)
209+
.setRequestCache(false)
210+
.execute();
211+
SearchResponse resp = future.actionGet();
212+
assertNotNull(resp);
213+
assertEquals(NUM_SHARDS, resp.getTotalShards());
214+
assertEquals(90, resp.getHits().getTotalHits().value());
215+
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
216+
List<StringTerms.Bucket> buckets = agg1.getBuckets();
217+
assertEquals(3, buckets.size());
218+
219+
// Validate all buckets - each should have 30 documents
220+
for (StringTerms.Bucket bucket : buckets) {
221+
assertEquals(30, bucket.getDocCount());
222+
}
223+
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));
224+
225+
StringTerms.Bucket bucket1 = buckets.get(0);
226+
assertEquals("value1", bucket1.getKeyAsString());
227+
assertEquals(30, bucket1.getDocCount());
228+
229+
StringTerms.Bucket bucket2 = buckets.get(1);
230+
assertEquals("value2", bucket2.getKeyAsString());
231+
assertEquals(30, bucket2.getDocCount());
232+
233+
StringTerms.Bucket bucket3 = buckets.get(2);
234+
assertEquals("value3", bucket3.getKeyAsString());
235+
assertEquals(30, bucket3.getDocCount());
236+
237+
for (SearchHit hit : resp.getHits().getHits()) {
238+
assertNotNull(hit.getSourceAsString());
239+
}
240+
}
241+
}

0 commit comments

Comments
 (0)