Skip to content

Commit

Permalink
Support scripting for composite aggs in concurrent segment search (op…
Browse files Browse the repository at this point in the history
…ensearch-project#15072)

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and wangdongyu.danny committed Aug 22, 2024
1 parent 237d530 commit f2a9b0a
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add basic aggregation support for derived fields ([#14618](https://github.com/opensearch-project/OpenSearch/pull/14618))
- Add ThreadContextPermission for markAsSystemContext and allow core to perform the method ([#15016](https://github.com/opensearch-project/OpenSearch/pull/15016))
- Add ThreadContextPermission for stashAndMergeHeaders and stashWithOrigin ([#15039](https://github.com/opensearch-project/OpenSearch/pull/15039))
- [Concurrent Segment Search] Support composite aggregations with scripting ([#15072](https://github.com/opensearch-project/OpenSearch/pull/15072))
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))

### Dependencies
Expand Down
1 change: 1 addition & 0 deletions modules/lang-painless/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.github.jengelman.gradle.plugins.shadow.ShadowBasePlugin

apply plugin: 'opensearch.validate-rest-spec'
apply plugin: 'opensearch.yaml-rest-test'
apply plugin: 'opensearch.internal-cluster-test'

opensearchplugin {
description 'An easy, safe and fast scripting language for OpenSearch'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.painless;

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.composite.InternalComposite;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.aggregations.bucket.terms.Terms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedStaticSettingsOpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.search.SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;

@OpenSearchIntegTestCase.SuiteScopeTestCase
public class SimplePainlessIT extends ParameterizedStaticSettingsOpenSearchIntegTestCase {

public SimplePainlessIT(Settings nodeSettings) {
super(nodeSettings);
}

@ParametersFactory
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), true).build() },
new Object[] { Settings.builder().put(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING.getKey(), false).build() }
);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(PainlessModulePlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), "4")
.build();
}

@Override
public void setupSuiteScopeCluster() throws Exception {
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("dynamic", "false")
.startObject("_meta")
.field("schema_version", 5)
.endObject()
.startObject("properties")
.startObject("entity")
.field("type", "nested")
.endObject()
.endObject()
.endObject();

assertAcked(
prepareCreate("test").setMapping(xContentBuilder)
.setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);

assertAcked(
prepareCreate("test-df").setSettings(
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
)
);

client().prepareIndex("test")
.setId("a")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.2.3.4\"},{\"name\":\"keyword-field\",\"value\":\"field-1\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("b")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"5.6.7.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("c")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"1.6.3.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test")
.setId("d")
.setSource(
"{\"entity\":[{\"name\":\"ip-field\",\"value\":\"2.6.4.8\"},{\"name\":\"keyword-field\",\"value\":\"field-2\"}]}",
MediaTypeRegistry.JSON
)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureSearchable("test");

client().prepareIndex("test-df")
.setId("a")
.setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test-df")
.setId("b")
.setSource("{\"field\":\"value2\"}", MediaTypeRegistry.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test-df")
.setId("c")
.setSource("{\"field\":\"value3\"}", MediaTypeRegistry.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
client().prepareIndex("test-df")
.setId("d")
.setSource("{\"field\":\"value1\"}", MediaTypeRegistry.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
ensureSearchable("test");
}

public void testTermsValuesSource() throws Exception {
AggregationBuilder agg = AggregationBuilders.composite(
"multi_buckets",
Collections.singletonList(
new TermsValuesSourceBuilder("keyword-field").script(
new Script(
ScriptType.INLINE,
"painless",
"String value = null; if (params == null || params._source == null || params._source.entity == null) { return \"\"; } for (item in params._source.entity) { if (item[\"name\"] == \"keyword-field\") { value = item['value']; break; } } return value;",
Collections.emptyMap()
)
)
)
);
SearchResponse response = client().prepareSearch("test").setQuery(matchAllQuery()).addAggregation(agg).get();

assertSearchResponse(response);
assertEquals(2, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().size());
assertEquals(
"field-1",
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getKey().get("keyword-field")
);
assertEquals(1, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(0).getDocCount());
assertEquals(
"field-2",
((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getKey().get("keyword-field")
);
assertEquals(3, ((InternalComposite) response.getAggregations().get("multi_buckets")).getBuckets().get(1).getDocCount());
}

public void testSimpleDerivedFieldsQuery() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
.fetchField("result")
.query(new TermsQueryBuilder("result", "value1"))
);
SearchResponse response = client().search(searchRequest).actionGet();
assertSearchResponse(response);
assertEquals(2, Objects.requireNonNull(response.getHits().getTotalHits()).value);
}

public void testSimpleDerivedFieldsAgg() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
.fetchField("result")
.aggregation(new TermsAggregationBuilder("derived-agg").field("result"))
);
SearchResponse response = client().search(searchRequest).actionGet();
assertSearchResponse(response);
Terms aggResponse = response.getAggregations().get("derived-agg");
assertEquals(3, aggResponse.getBuckets().size());
Terms.Bucket bucket = aggResponse.getBuckets().get(0);
assertEquals("value1", bucket.getKey());
assertEquals(2, bucket.getDocCount());
bucket = aggResponse.getBuckets().get(1);
assertEquals("value2", bucket.getKey());
assertEquals(1, bucket.getDocCount());
bucket = aggResponse.getBuckets().get(2);
assertEquals("value3", bucket.getKey());
assertEquals(1, bucket.getDocCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.search.internal.SearchContext;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

/**
Expand Down Expand Up @@ -81,7 +80,6 @@ protected Aggregator createInternal(

@Override
protected boolean supportsConcurrentSegmentSearch() {
// Disable concurrent search if any scripting is used. See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return Arrays.stream(sources).noneMatch(CompositeValuesSourceConfig::hasScript);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,25 @@ public final SearchLookup forkAndTrackFieldReferences(String field) {
return new SearchLookup(this, newFieldChain);
}

/**
* SourceLookup is not thread safe, so we create a new instance for each leaf to support concurrent segment search
*/
public LeafSearchLookup getLeafSearchLookup(LeafReaderContext context) {
return new LeafSearchLookup(context, docMap.getLeafDocLookup(context), sourceLookup, fieldsLookup.getLeafFieldsLookup(context));
return new LeafSearchLookup(
context,
docMap.getLeafDocLookup(context),
new SourceLookup(),
fieldsLookup.getLeafFieldsLookup(context)
);
}

public DocLookup doc() {
return docMap;
}

/**
* Returned SourceLookup will be unrelated to any created LeafSearchLookups. Instead, use {@link LeafSearchLookup#source()} to access the related {@link SearchLookup}.
*/
public SourceLookup source() {
return sourceLookup;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import static java.util.Collections.emptyMap;

/**
* Orchestrator class for source lookups
* Orchestrator class for source lookups. Not thread safe.
*
* @opensearch.api
*/
Expand Down

0 comments on commit f2a9b0a

Please sign in to comment.