Skip to content

Commit bc86e80

Browse files
committed
Streaming Aggregation
## Search Flow Separation - query param 'stream' in rest search action and stored in search request - On coordinator, we uses stream search transport action and search async action uses the new stream callback - On data node, stream transport action pass stream search flag to search context for shard search, aggregation - Reduce context has stream flag from search request ## Coordinator - Sync onPhaseDone for both result consumption callbacks, shard and stream - Result consumer (needs improve): expand array search phase results to hold all batches. The max size is hard coded to be shard size * max batches per shard as 100 ## Data Node - Data node aggregation stream segment aggregation results back, and complete stream by shard result. ## Memory - Optimize memory usage on coordinator - `reduce size = shard_number * ((1.5 * size) + 10)` (needs improve, big accuracy problem) - Remove the unnecessary memory allocation for handling sub aggregation when no sub aggregation exists - Only allocate doc counts per segment in Terms Bucket Aggregator - Remove the priority queue from Terms Bucket Aggregator, return all buckets in build aggregation ## Stream Listener API - Stream search callback - Stream channel listener ## Dev - Enable c2 compiler for local gradlew run - Disable filter optimization ## TODO - Seder at transport currently copy from arrow to native byte buffer Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
1 parent 4434068 commit bc86e80

File tree

43 files changed

+2207
-130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2207
-130
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
streaming-aggregation
2+
CLAUDE.md
3+
.cursor*
14

25
# intellij files
36
.idea/
@@ -64,4 +67,4 @@ testfixtures_shared/
6467
.ci/jobs/
6568

6669
# build files generated
67-
doc-tools/missing-doclet/bin/
70+
doc-tools/missing-doclet/bin/

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@ systemProp.org.gradle.warning.mode=fail
3131
systemProp.jdk.tls.client.protocols=TLSv1.2,TLSv1.3
3232

3333
# jvm args for faster test execution by default
34-
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m
34+
systemProp.tests.jvm.argline=-XX:TieredStopAtLevel=4 -XX:ReservedCodeCacheSize=64m
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.core.action;
10+
11+
import org.opensearch.common.annotation.ExperimentalApi;
12+
13+
/**
14+
* A listener for action responses that can handle streaming responses.
15+
* This interface extends ActionListener to add functionality for handling
16+
* responses that arrive in multiple batches as part of a stream.
17+
*/
18+
@ExperimentalApi
19+
public interface StreamActionListener<Response> extends ActionListener<Response> {
20+
/**
21+
* Handle an intermediate streaming response. This is called for all responses
22+
* that are not the final response in the stream.
23+
*
24+
* @param response An intermediate response in the stream
25+
*/
26+
void onStreamResponse(Response response);
27+
28+
/**
29+
* Handle the final response in the stream and complete the stream.
30+
* This is called exactly once when the stream is complete.
31+
*
32+
* @param response The final response in the stream
33+
*/
34+
void onCompleteResponse(Response response);
35+
36+
/**
37+
* Not supported for streaming listeners. Use onStreamResponse or onCompleteResponse instead.
38+
* @throws UnsupportedOperationException always
39+
*/
40+
@Override
41+
default void onResponse(Response response) {
42+
throw new UnsupportedOperationException(
43+
"StreamActionListener does not support onResponse. Use onStreamResponse or onCompleteResponse instead."
44+
);
45+
}
46+
}
Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.streaming.aggregation;
10+
11+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
12+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
13+
import org.opensearch.action.admin.indices.flush.FlushRequest;
14+
import org.opensearch.action.admin.indices.refresh.RefreshRequest;
15+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
16+
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
17+
import org.opensearch.action.bulk.BulkRequest;
18+
import org.opensearch.action.bulk.BulkResponse;
19+
import org.opensearch.action.index.IndexRequest;
20+
import org.opensearch.action.search.SearchResponse;
21+
import org.opensearch.arrow.flight.transport.FlightStreamPlugin;
22+
import org.opensearch.common.action.ActionFuture;
23+
import org.opensearch.common.settings.Settings;
24+
import org.opensearch.common.unit.TimeValue;
25+
import org.opensearch.common.xcontent.XContentType;
26+
import org.opensearch.plugins.Plugin;
27+
import org.opensearch.search.SearchHit;
28+
import org.opensearch.search.aggregations.AggregationBuilders;
29+
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
30+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
31+
import org.opensearch.search.aggregations.metrics.Max;
32+
import org.opensearch.test.OpenSearchIntegTestCase;
33+
34+
import java.util.Collection;
35+
import java.util.Collections;
36+
import java.util.Comparator;
37+
import java.util.List;
38+
39+
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
40+
import static org.opensearch.search.aggregations.AggregationBuilders.terms;
41+
42+
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, minNumDataNodes = 3, maxNumDataNodes = 3)
43+
public class SubAggregationIT extends OpenSearchIntegTestCase {
44+
45+
static final int NUM_SHARDS = 3;
46+
static final int MIN_SEGMENTS_PER_SHARD = 3;
47+
48+
@Override
49+
protected Collection<Class<? extends Plugin>> nodePlugins() {
50+
return Collections.singleton(FlightStreamPlugin.class);
51+
}
52+
53+
@Override
54+
public void setUp() throws Exception {
55+
super.setUp();
56+
internalCluster().ensureAtLeastNumDataNodes(3);
57+
58+
Settings indexSettings = Settings.builder()
59+
.put("index.number_of_shards", NUM_SHARDS) // Number of primary shards
60+
.put("index.number_of_replicas", 0) // Number of replica shards
61+
.put("index.search.concurrent_segment_search.mode", "none")
62+
// Disable segment merging to keep individual segments
63+
.put("index.merge.policy.max_merged_segment", "1kb") // Keep segments small
64+
.put("index.merge.policy.segments_per_tier", "20") // Allow many segments per tier
65+
.put("index.merge.scheduler.max_thread_count", "1") // Limit merge threads
66+
.build();
67+
68+
CreateIndexRequest createIndexRequest = new CreateIndexRequest("index").settings(indexSettings);
69+
createIndexRequest.mapping(
70+
"{\n"
71+
+ " \"properties\": {\n"
72+
+ " \"field1\": { \"type\": \"keyword\" },\n"
73+
+ " \"field2\": { \"type\": \"integer\" }\n"
74+
+ " }\n"
75+
+ "}",
76+
XContentType.JSON
77+
);
78+
CreateIndexResponse createIndexResponse = client().admin().indices().create(createIndexRequest).actionGet();
79+
assertTrue(createIndexResponse.isAcknowledged());
80+
client().admin().cluster().prepareHealth("index").setWaitForGreenStatus().setTimeout(TimeValue.timeValueSeconds(30)).get();
81+
BulkRequest bulkRequest = new BulkRequest();
82+
83+
// We'll create 3 segments per shard by indexing docs into each segment and forcing a flush
84+
// Segment 1 - we'll add docs with field2 values in 1-3 range
85+
for (int i = 0; i < 10; i++) {
86+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 1));
87+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 2));
88+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 3));
89+
}
90+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
91+
assertFalse(bulkResponse.hasFailures()); // Verify ingestion was successful
92+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
93+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
94+
95+
// Segment 2 - we'll add docs with field2 values in 11-13 range
96+
bulkRequest = new BulkRequest();
97+
for (int i = 0; i < 10; i++) {
98+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 11));
99+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 12));
100+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 13));
101+
}
102+
bulkResponse = client().bulk(bulkRequest).actionGet();
103+
assertFalse(bulkResponse.hasFailures());
104+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
105+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
106+
107+
// Segment 3 - we'll add docs with field2 values in 21-23 range
108+
bulkRequest = new BulkRequest();
109+
for (int i = 0; i < 10; i++) {
110+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value1", "field2", 21));
111+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value2", "field2", 22));
112+
bulkRequest.add(new IndexRequest("index").source(XContentType.JSON, "field1", "value3", "field2", 23));
113+
}
114+
bulkResponse = client().bulk(bulkRequest).actionGet();
115+
assertFalse(bulkResponse.hasFailures());
116+
client().admin().indices().flush(new FlushRequest("index").force(true)).actionGet();
117+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
118+
119+
client().admin().indices().refresh(new RefreshRequest("index")).actionGet();
120+
ensureSearchable("index");
121+
122+
// Verify that we have the expected number of shards and segments
123+
IndicesSegmentResponse segmentResponse = client().admin().indices().segments(new IndicesSegmentsRequest("index")).actionGet();
124+
assertEquals(NUM_SHARDS, segmentResponse.getIndices().get("index").getShards().size());
125+
126+
// Verify each shard has at least MIN_SEGMENTS_PER_SHARD segments
127+
segmentResponse.getIndices().get("index").getShards().values().forEach(indexShardSegments -> {
128+
assertTrue(
129+
"Expected at least "
130+
+ MIN_SEGMENTS_PER_SHARD
131+
+ " segments but found "
132+
+ indexShardSegments.getShards()[0].getSegments().size(),
133+
indexShardSegments.getShards()[0].getSegments().size() >= MIN_SEGMENTS_PER_SHARD
134+
);
135+
});
136+
}
137+
138+
@LockFeatureFlag(STREAM_TRANSPORT)
139+
public void testStreamingAggregation() throws Exception {
140+
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
141+
TermsAggregationBuilder agg = terms("agg1").field("field1").subAggregation(AggregationBuilders.max("agg2").field("field2"));
142+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
143+
.addAggregation(agg)
144+
.setSize(0)
145+
.setRequestCache(false)
146+
.execute();
147+
SearchResponse resp = future.actionGet();
148+
assertNotNull(resp);
149+
assertEquals(NUM_SHARDS, resp.getTotalShards());
150+
assertEquals(90, resp.getHits().getTotalHits().value());
151+
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
152+
List<StringTerms.Bucket> buckets = agg1.getBuckets();
153+
assertEquals(3, buckets.size());
154+
155+
// Validate all buckets - each should have 30 documents
156+
for (StringTerms.Bucket bucket : buckets) {
157+
assertEquals(30, bucket.getDocCount());
158+
assertNotNull(bucket.getAggregations().get("agg2"));
159+
}
160+
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));
161+
162+
StringTerms.Bucket bucket1 = buckets.get(0);
163+
assertEquals("value1", bucket1.getKeyAsString());
164+
assertEquals(30, bucket1.getDocCount());
165+
Max maxAgg1 = (Max) bucket1.getAggregations().get("agg2");
166+
assertEquals(21.0, maxAgg1.getValue(), 0.001);
167+
168+
StringTerms.Bucket bucket2 = buckets.get(1);
169+
assertEquals("value2", bucket2.getKeyAsString());
170+
assertEquals(30, bucket2.getDocCount());
171+
Max maxAgg2 = (Max) bucket2.getAggregations().get("agg2");
172+
assertEquals(22.0, maxAgg2.getValue(), 0.001);
173+
174+
StringTerms.Bucket bucket3 = buckets.get(2);
175+
assertEquals("value3", bucket3.getKeyAsString());
176+
assertEquals(30, bucket3.getDocCount());
177+
Max maxAgg3 = (Max) bucket3.getAggregations().get("agg2");
178+
assertEquals(23.0, maxAgg3.getValue(), 0.001);
179+
180+
for (SearchHit hit : resp.getHits().getHits()) {
181+
assertNotNull(hit.getSourceAsString());
182+
}
183+
}
184+
185+
@LockFeatureFlag(STREAM_TRANSPORT)
186+
public void testStreamingAggregationTerm() throws Exception {
187+
// This test validates streaming aggregation with 3 shards, each with at least 3 segments
188+
TermsAggregationBuilder agg = terms("agg1").field("field1");
189+
ActionFuture<SearchResponse> future = client().prepareStreamSearch("index")
190+
.addAggregation(agg)
191+
.setSize(0)
192+
.setRequestCache(false)
193+
.execute();
194+
SearchResponse resp = future.actionGet();
195+
assertNotNull(resp);
196+
assertEquals(NUM_SHARDS, resp.getTotalShards());
197+
assertEquals(90, resp.getHits().getTotalHits().value());
198+
StringTerms agg1 = (StringTerms) resp.getAggregations().asMap().get("agg1");
199+
List<StringTerms.Bucket> buckets = agg1.getBuckets();
200+
assertEquals(3, buckets.size());
201+
202+
// Validate all buckets - each should have 30 documents
203+
for (StringTerms.Bucket bucket : buckets) {
204+
assertEquals(30, bucket.getDocCount());
205+
}
206+
buckets.sort(Comparator.comparing(StringTerms.Bucket::getKeyAsString));
207+
208+
StringTerms.Bucket bucket1 = buckets.get(0);
209+
assertEquals("value1", bucket1.getKeyAsString());
210+
assertEquals(30, bucket1.getDocCount());
211+
212+
StringTerms.Bucket bucket2 = buckets.get(1);
213+
assertEquals("value2", bucket2.getKeyAsString());
214+
assertEquals(30, bucket2.getDocCount());
215+
216+
StringTerms.Bucket bucket3 = buckets.get(2);
217+
assertEquals("value3", bucket3.getKeyAsString());
218+
assertEquals(30, bucket3.getDocCount());
219+
220+
for (SearchHit hit : resp.getHits().getHits()) {
221+
assertNotNull(hit.getSourceAsString());
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)