Skip to content

Commit

Permalink
Fix DerivedFieldQuery to support concurrent search. (#15326) (#15454)
Browse files Browse the repository at this point in the history
* Fix DerivedFieldQuery to support concurrent search.

This change updates DerivedFieldQuery to create a separate ValueFetcher instance per thread.
The DerivedFieldValueFetcher is not thread safe in that it holds a ref to a compiled DerivedFieldScript that is
created per thread.  Each script also holds a SourceLookup object that is not thread safe.



* Fix broken cases relying on ObjectDerivedFieldValueFetcher.

DerivedFieldQuery will accept a supplier for a valueFetcher rather than constructing it.
This ensures that the DerivedFieldType creating the query (obj or regular) passes the correct supplier func.



* remove unused clone method



* Add changelog entry



* add an extra test for DerivedFieldType multiPhraseQuery



* more coverage



* add tests for normalizedWildcard and phrase prefix



---------


(cherry picked from commit c771bdd)

Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent e80c06e commit d0d7999
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Adding access to noSubMatches and noOverlappingMatches in Hyphenation ([#13895](https://github.com/opensearch-project/OpenSearch/pull/13895))
- Star tree mapping changes ([#14605](https://github.com/opensearch-project/OpenSearch/pull/14605))
- Add support for index level max slice count setting for concurrent segment search ([#15336](https://github.com/opensearch-project/OpenSearch/pull/15336))
- Add concurrent search support for Derived Fields ([#15326](https://github.com/opensearch-project/OpenSearch/pull/15326))

### Dependencies
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,6 @@ public void testTermsValuesSource() throws Exception {
}

public void testSimpleDerivedFieldsQuery() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand All @@ -204,10 +200,6 @@ public void testSimpleDerivedFieldsQuery() {
}

public void testSimpleDerivedFieldsAgg() {
assumeFalse(
"Derived fields do not support concurrent search https://github.com/opensearch-project/OpenSearch/issues/15007",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
SearchRequest searchRequest = new SearchRequest("test-df").source(
SearchSourceBuilder.searchSource()
.derivedField("result", "keyword", new Script("emit(params._source[\"field\"])"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ public IndexFieldData.Builder fielddataBuilder(String fullyQualifiedIndexName, S
@Override
public Query termQuery(Object value, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQuery(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -176,10 +175,9 @@ public Query termQuery(Object value, QueryShardContext context) {
@Override
public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termQueryCaseInsensitive(value, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -195,10 +193,9 @@ public Query termQueryCaseInsensitive(Object value, @Nullable QueryShardContext
@Override
public Query termsQuery(List<?> values, @Nullable QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.termsQuery(values, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -230,10 +227,9 @@ public Query rangeQuery(
parser,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -251,10 +247,9 @@ public Query fuzzyQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.fuzzyQuery(value, fuzziness, prefixLength, maxExpansions, transpositions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down Expand Up @@ -289,10 +284,9 @@ public Query fuzzyQuery(
method,
context
);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -316,10 +310,9 @@ public Query prefixQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.prefixQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -343,10 +336,9 @@ public Query wildcardQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.wildcardQuery(value, method, caseInsensitive, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -365,10 +357,9 @@ public Query wildcardQuery(
@Override
public Query normalizedWildcardQuery(String value, @Nullable MultiTermQuery.RewriteMethod method, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.normalizedWildcardQuery(value, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -394,10 +385,9 @@ public Query regexpQuery(
QueryShardContext context
) {
Query query = typeFieldMapper.mappedFieldType.regexpQuery(value, syntaxFlags, matchFlags, maxDeterminizedStates, method, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -416,10 +406,9 @@ public Query regexpQuery(
@Override
public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -441,10 +430,9 @@ public Query phraseQuery(TokenStream stream, int slop, boolean enablePositionInc
public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositionIncrements, QueryShardContext context)
throws IOException {
Query query = typeFieldMapper.mappedFieldType.multiPhraseQuery(stream, slop, enablePositionIncrements, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -465,10 +453,9 @@ public Query multiPhraseQuery(TokenStream stream, int slop, boolean enablePositi
@Override
public Query phrasePrefixQuery(TokenStream stream, int slop, int maxExpansions, QueryShardContext context) throws IOException {
Query query = typeFieldMapper.mappedFieldType.phrasePrefixQuery(stream, slop, maxExpansions, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
DerivedFieldQuery derivedFieldQuery = new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -493,10 +480,9 @@ public SpanQuery spanPrefixQuery(String value, SpanMultiTermQueryWrapper.SpanRew
@Override
public Query distanceFeatureQuery(Object origin, String pivot, float boost, QueryShardContext context) {
Query query = typeFieldMapper.mappedFieldType.distanceFeatureQuery(origin, pivot, boost, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand All @@ -507,10 +493,9 @@ public Query distanceFeatureQuery(Object origin, String pivot, float boost, Quer
@Override
public Query geoShapeQuery(Geometry shape, String fieldName, ShapeRelation relation, QueryShardContext context) {
Query query = ((GeoShapeQueryable) (typeFieldMapper.mappedFieldType)).geoShapeQuery(shape, fieldName, relation, context);
DerivedFieldValueFetcher valueFetcher = valueFetcher(context, context.lookup(), null);
return new DerivedFieldQuery(
query,
valueFetcher,
() -> valueFetcher(context, context.lookup(), null),
context.lookup(),
getIndexAnalyzer(),
indexableFieldGenerator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* DerivedFieldQuery used for querying derived fields. It contains the logic to execute an input lucene query against
* DerivedField. It also accepts DerivedFieldValueFetcher and SearchLookup as an input.
*/
public final class DerivedFieldQuery extends Query {
private final Query query;
private final DerivedFieldValueFetcher valueFetcher;
private final Supplier<DerivedFieldValueFetcher> valueFetcherSupplier;
private final SearchLookup searchLookup;
private final Analyzer indexAnalyzer;
private final boolean ignoreMalformed;
Expand All @@ -46,20 +47,19 @@ public final class DerivedFieldQuery extends Query {

/**
* @param query lucene query to be executed against the derived field
* @param valueFetcher DerivedFieldValueFetcher ValueFetcher to fetch the value of a derived field from _source
* using LeafSearchLookup
* @param valueFetcherSupplier Supplier of a DerivedFieldValueFetcher that will be reconstructed per leaf
* @param searchLookup SearchLookup to get the LeafSearchLookup look used by valueFetcher to fetch the _source
*/
public DerivedFieldQuery(
Query query,
DerivedFieldValueFetcher valueFetcher,
Supplier<DerivedFieldValueFetcher> valueFetcherSupplier,
SearchLookup searchLookup,
Analyzer indexAnalyzer,
Function<Object, IndexableField> indexableFieldGenerator,
boolean ignoreMalformed
) {
this.query = query;
this.valueFetcher = valueFetcher;
this.valueFetcherSupplier = valueFetcherSupplier;
this.searchLookup = searchLookup;
this.indexAnalyzer = indexAnalyzer;
this.indexableFieldGenerator = indexableFieldGenerator;
Expand All @@ -77,7 +77,15 @@ public Query rewrite(IndexSearcher indexSearcher) throws IOException {
if (rewritten == query) {
return this;
}
return new DerivedFieldQuery(rewritten, valueFetcher, searchLookup, indexAnalyzer, indexableFieldGenerator, ignoreMalformed);
;
return new DerivedFieldQuery(
rewritten,
valueFetcherSupplier,
searchLookup,
indexAnalyzer,
indexableFieldGenerator,
ignoreMalformed
);
}

@Override
Expand All @@ -88,6 +96,11 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo
public Scorer scorer(LeafReaderContext context) {
DocIdSetIterator approximation;
approximation = DocIdSetIterator.all(context.reader().maxDoc());

// Create a new ValueFetcher per thread.
// ValueFetcher.setNextReader creates a DerivedFieldScript and internally SourceLookup and these objects are not
// thread safe.
final DerivedFieldValueFetcher valueFetcher = valueFetcherSupplier.get();
valueFetcher.setNextReader(context);
LeafSearchLookup leafSearchLookup = searchLookup.getLeafSearchLookup(context);
TwoPhaseIterator twoPhase = new TwoPhaseIterator(approximation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TopDocs;
Expand All @@ -24,9 +25,12 @@
import org.opensearch.common.lucene.Lucene;
import org.opensearch.core.index.Index;
import org.opensearch.geometry.Rectangle;
import org.opensearch.index.query.MatchPhrasePrefixQueryBuilder;
import org.opensearch.index.query.MultiMatchQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.search.QueryStringQueryParser;
import org.opensearch.script.DerivedFieldScript;

import java.io.IOException;
Expand Down Expand Up @@ -435,7 +439,7 @@ public void execute() {
}
}

public void testObjectDerivedFields() throws IOException {
public void testObjectDerivedFields() throws IOException, ParseException {
MapperService mapperService = createMapperService(topMapping(b -> {
b.startObject("properties");
{
Expand Down Expand Up @@ -545,6 +549,17 @@ public void execute() {
topDocs = searcher.search(query, 10);
assertEquals(0, topDocs.totalHits.value);

query = new MatchPhrasePrefixQueryBuilder("object_field.text_field", "document number").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
assertEquals(0, topDocs.totalHits.value);

// Multi Phrase Query
query = QueryBuilders.multiMatchQuery("GET", "object_field.nested_field.sub_field_1", "object_field.keyword_field")
.type(MultiMatchQueryBuilder.Type.PHRASE)
.toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

// Range queries of types - date, long and double
query = QueryBuilders.rangeQuery("object_field.date_field").from("2024-03-20T14:20:50").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
Expand All @@ -567,6 +582,11 @@ public void execute() {
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

QueryStringQueryParser queryParser = new QueryStringQueryParser(queryShardContext, "object_field.keyword_field");
queryParser.parse("GE?");
topDocs = searcher.search(query, 10);
assertEquals(7, topDocs.totalHits.value);

// Regexp Query
query = QueryBuilders.regexpQuery("object_field.keyword_field", ".*let.*").toQuery(queryShardContext);
topDocs = searcher.search(query, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.memory.MemoryIndex;
import org.apache.lucene.queries.spans.SpanMultiTermQueryWrapper;
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchException;
import org.opensearch.common.collect.Tuple;
Expand Down Expand Up @@ -59,6 +60,7 @@ public void testBooleanType() {
assertTrue(dft.getFieldMapper() instanceof BooleanFieldMapper);
assertTrue(dft.getIndexableFieldGenerator().apply(true) instanceof Field);
assertTrue(dft.getIndexableFieldGenerator().apply(false) instanceof Field);
assertEquals("derived", dft.typeName());
}

public void testDateType() {
Expand Down Expand Up @@ -159,6 +161,22 @@ public void testGetAggregationScript_ip() throws IOException {
assertEquals(new BytesRef(InetAddressPoint.encode(InetAddresses.forString((String) expected.get(0)))), result.get(0));
}

public void testDerivedFieldValueFetcherDoesNotSupportCustomFormats() {
DerivedFieldType dft = createDerivedFieldType("boolean");
expectThrows(
IllegalArgumentException.class,
() -> dft.valueFetcher(mock(QueryShardContext.class), mock(SearchLookup.class), "yyyy-MM-dd")
);
}

public void testSpanPrefixQueryNotSupported() {
DerivedFieldType dft = createDerivedFieldType("boolean");
expectThrows(
IllegalArgumentException.class,
() -> dft.spanPrefixQuery("value", mock(SpanMultiTermQueryWrapper.SpanRewriteMethod.class), mock(QueryShardContext.class))
);
}

private static LeafSearchLookup mockValueFetcherForAggs(QueryShardContext mockContext, DerivedFieldType dft, List<Object> expected) {
SearchLookup searchLookup = mock(SearchLookup.class);
LeafSearchLookup leafLookup = mock(LeafSearchLookup.class);
Expand Down
Loading

0 comments on commit d0d7999

Please sign in to comment.