Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding search processor for score normalization and combination #227

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ testClusters.integTest {
// Increase heap size from default of 512mb to 1gb. When heap size is 512mb, our integ tests sporadically fail due
// to ml-commons memory circuit breaker exception
jvmArgs("-Xms1g", "-Xmx1g")
// enable hybrid search for testing

// enable features for testing
// hybrid search
systemProperty('neural_search_hybrid_search_enabled', 'true')
// search pipelines
systemProperty('opensearch.experimental.feature.search_pipeline.enabled', 'true')
}

// Remote Integration Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import java.util.Optional;
import java.util.function.Supplier;

import lombok.extern.log4j.Log4j2;

import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -24,18 +26,25 @@
import org.opensearch.ingest.Processor;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.neuralsearch.ml.MLCommonsClientAccessor;
import org.opensearch.neuralsearch.processor.NormalizationProcessor;
import org.opensearch.neuralsearch.processor.NormalizationProcessorWorkflow;
import org.opensearch.neuralsearch.processor.TextEmbeddingProcessor;
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner;
import org.opensearch.neuralsearch.processor.factory.NormalizationProcessorFactory;
import org.opensearch.neuralsearch.processor.factory.TextEmbeddingProcessorFactory;
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer;
import org.opensearch.neuralsearch.query.HybridQueryBuilder;
import org.opensearch.neuralsearch.query.NeuralQueryBuilder;
import org.opensearch.neuralsearch.search.query.HybridQueryPhaseSearcher;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.IngestPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPipelinePlugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.query.QueryPhaseSearcher;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.watcher.ResourceWatcherService;
Expand All @@ -45,7 +54,8 @@
/**
* Neural Search plugin class
*/
public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin, IngestPlugin, ExtensiblePlugin {
@Log4j2
public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin, IngestPlugin, ExtensiblePlugin, SearchPipelinePlugin {
/**
* Gates the functionality of hybrid search
* Currently query phase searcher added with hybrid search will conflict with concurrent search in core.
Expand All @@ -54,6 +64,7 @@ public class NeuralSearch extends Plugin implements ActionPlugin, SearchPlugin,
@VisibleForTesting
public static final String NEURAL_SEARCH_HYBRID_SEARCH_ENABLED = "neural_search_hybrid_search_enabled";
private MLCommonsClientAccessor clientAccessor;
private NormalizationProcessorWorkflow normalizationProcessorWorkflow;

@Override
public Collection<Object> createComponents(
Expand All @@ -70,6 +81,7 @@ public Collection<Object> createComponents(
final Supplier<RepositoriesService> repositoriesServiceSupplier
) {
NeuralQueryBuilder.initialize(clientAccessor);
normalizationProcessorWorkflow = new NormalizationProcessorWorkflow(new ScoreNormalizer(), new ScoreCombiner());
return List.of(clientAccessor);
}

Expand All @@ -90,9 +102,18 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
if (FeatureFlags.isEnabled(NEURAL_SEARCH_HYBRID_SEARCH_ENABLED)) {
log.info("Registering hybrid query phase searcher with feature flag [%]", NEURAL_SEARCH_HYBRID_SEARCH_ENABLED);
return Optional.of(new HybridQueryPhaseSearcher());
}
log.info("Not registering hybrid query phase searcher because feature flag [%] is disabled", NEURAL_SEARCH_HYBRID_SEARCH_ENABLED);
// we want feature be disabled by default due to risk of colliding and breaking concurrent search in core
return Optional.empty();
}

@Override
public Map<String, org.opensearch.search.pipeline.Processor.Factory<SearchPhaseResultsProcessor>> getSearchPhaseResultsProcessors(
Parameters parameters
) {
return Map.of(NormalizationProcessor.TYPE, new NormalizationProcessorFactory(normalizationProcessorWorkflow));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor;

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import org.opensearch.action.search.QueryPhaseResultConsumer;
import org.opensearch.action.search.SearchPhaseContext;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique;
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique;
import org.opensearch.neuralsearch.search.CompoundTopDocs;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor;
import org.opensearch.search.query.QuerySearchResult;

import com.google.common.annotations.VisibleForTesting;

/**
* Processor for score normalization and combination on post query search results. Updates query results with
* normalized and combined scores for next phase (typically it's FETCH)
*/
@Log4j2
@AllArgsConstructor
public class NormalizationProcessor implements SearchPhaseResultsProcessor {
public static final String TYPE = "normalization-processor";
public static final String NORMALIZATION_CLAUSE = "normalization";
public static final String COMBINATION_CLAUSE = "combination";
public static final String TECHNIQUE = "technique";
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved

private final String tag;
private final String description;
@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
final ScoreNormalizationTechnique normalizationTechnique;
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
@Getter(AccessLevel.PACKAGE)
final ScoreCombinationTechnique combinationTechnique;
final NormalizationProcessorWorkflow normalizationWorkflow;

/**
* Method abstracts functional aspect of score normalization and score combination. Exact methods for each processing stage
* are set as part of class constructor
* @param searchPhaseResult {@link SearchPhaseResults} DTO that has query search results. Results will be mutated as part of this method execution
* @param searchPhaseContext {@link SearchContext}
*/
@Override
public <Result extends SearchPhaseResult> void process(
final SearchPhaseResults<Result> searchPhaseResult,
final SearchPhaseContext searchPhaseContext
) {
if (shouldSearchResultsBeIgnored(searchPhaseResult)) {
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
return;
}
List<QuerySearchResult> querySearchResults = getQuerySearchResults(searchPhaseResult);
normalizationWorkflow.execute(querySearchResults, normalizationTechnique, combinationTechnique);
}

@Override
public SearchPhaseName getBeforePhase() {
return SearchPhaseName.QUERY;
}

@Override
public SearchPhaseName getAfterPhase() {
return SearchPhaseName.FETCH;
}

@Override
public String getType() {
return TYPE;
}

@Override
public String getTag() {
return tag;
}

@Override
public String getDescription() {
return description;
}

@Override
public boolean isIgnoreFailure() {
return true;
}
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved

private <Result extends SearchPhaseResult> boolean shouldSearchResultsBeIgnored(SearchPhaseResults<Result> searchPhaseResult) {
if (Objects.isNull(searchPhaseResult) || !(searchPhaseResult instanceof QueryPhaseResultConsumer)) {
return true;
}

QueryPhaseResultConsumer queryPhaseResultConsumer = (QueryPhaseResultConsumer) searchPhaseResult;
Optional<SearchPhaseResult> maybeResult = queryPhaseResultConsumer.getAtomicArray()
.asList()
.stream()
.filter(Objects::nonNull)
.findFirst();
return isNotHybridQuery(maybeResult);
}

private boolean isNotHybridQuery(final Optional<SearchPhaseResult> maybeResult) {
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
return maybeResult.isEmpty()
|| Objects.isNull(maybeResult.get().queryResult())
|| Objects.isNull(maybeResult.get().queryResult().topDocs())
|| !(maybeResult.get().queryResult().topDocs().topDocs instanceof CompoundTopDocs);
heemin32 marked this conversation as resolved.
Show resolved Hide resolved
}

private <Result extends SearchPhaseResult> List<QuerySearchResult> getQuerySearchResults(final SearchPhaseResults<Result> results) {
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
return results.getAtomicArray()
.asList()
.stream()
.map(result -> result == null ? null : result.queryResult())
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;

import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique;
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner;
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique;
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer;
import org.opensearch.neuralsearch.search.CompoundTopDocs;
import org.opensearch.search.query.QuerySearchResult;

import com.google.common.annotations.VisibleForTesting;

/**
* Class abstracts steps required for score normalization and combination, this includes pre-processing of income data
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
* and post-processing for final results
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
*/
@AllArgsConstructor
public class NormalizationProcessorWorkflow {

private final ScoreNormalizer scoreNormalizer;
private final ScoreCombiner scoreCombiner;

/**
* Start execution of this workflow
* @param querySearchResults input data with QuerySearchResult from multiple shards
* @param normalizationTechnique technique for score normalization
* @param combinationTechnique technique for score combination
*/
public void execute(
final List<QuerySearchResult> querySearchResults,
final ScoreNormalizationTechnique normalizationTechnique,
final ScoreCombinationTechnique combinationTechnique
) {
// pre-process data
List<CompoundTopDocs> queryTopDocs = getQueryTopDocs(querySearchResults);

// normalize
scoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique);

// combine
List<Float> combinedMaxScores = scoreCombiner.combineScores(queryTopDocs, combinationTechnique);

// post-process data
updateOriginalQueryResults(querySearchResults, queryTopDocs, combinedMaxScores);
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
}

private List<CompoundTopDocs> getQueryTopDocs(final List<QuerySearchResult> querySearchResults) {
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
List<CompoundTopDocs> queryTopDocs = querySearchResults.stream()
.filter(searchResult -> Objects.nonNull(searchResult.topDocs()))
.filter(searchResult -> searchResult.topDocs().topDocs instanceof CompoundTopDocs)
heemin32 marked this conversation as resolved.
Show resolved Hide resolved
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
.map(searchResult -> (CompoundTopDocs) searchResult.topDocs().topDocs)
.collect(Collectors.toList());
return queryTopDocs;
}

@VisibleForTesting
martin-gaievski marked this conversation as resolved.
Show resolved Hide resolved
protected void updateOriginalQueryResults(
final List<QuerySearchResult> querySearchResults,
final List<CompoundTopDocs> queryTopDocs,
final List<Float> combinedMaxScores
) {
for (int i = 0; i < querySearchResults.size(); i++) {
QuerySearchResult querySearchResult = querySearchResults.get(i);
if (!(querySearchResult.topDocs().topDocs instanceof CompoundTopDocs) || Objects.isNull(queryTopDocs.get(i))) {
continue;
}
CompoundTopDocs updatedTopDocs = queryTopDocs.get(i);
float maxScore = updatedTopDocs.totalHits.value > 0 ? updatedTopDocs.scoreDocs[0].score : 0.0f;
TopDocsAndMaxScore updatedTopDocsAndMaxScore = new TopDocsAndMaxScore(updatedTopDocs, maxScore);
querySearchResult.topDocs(updatedTopDocsAndMaxScore, null);
querySearchResults.get(i).topDocs().maxScore = combinedMaxScores.get(i);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor.combination;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

/**
* Abstracts combination of scores based on arithmetic mean method
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ArithmeticMeanScoreCombinationMethod implements ScoreCombinationMethod {

private static final ArithmeticMeanScoreCombinationMethod INSTANCE = new ArithmeticMeanScoreCombinationMethod();
private static final Float ZERO_SCORE = 0.0f;

public static ArithmeticMeanScoreCombinationMethod getInstance() {
return INSTANCE;
}

/**
* Arithmetic mean method for combining scores.
* cscore = (score1 + score2 +...+ scoreN)/N
*
* Zero (0.0) scores are excluded from number of scores N
*/
@Override
public float combine(final float[] scores) {
float combinedScore = 0.0f;
int count = 0;
for (float score : scores) {
if (score >= 0.0) {
combinedScore += score;
count++;
}
}
if (count == 0) {
return ZERO_SCORE;
}
return combinedScore / count;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a unit test where count is zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack, I'll add more tests in next PRs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm. I thought that divide by zero will throw an exception but I was wrong. For floating point number, it will be Infinity, -Infinity, or NaN. If those values are okay, I am fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such case we need to return 0.0, as this is the case when, if there are scores, we're skipping some of them and not increasing count. Let me update the method.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor.combination;

public interface ScoreCombinationMethod {
/**
* Defines combination function specific to this technique
* @param scores array of collected original scores
* @return combined score
*/
float combine(final float[] scores);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.neuralsearch.processor.combination;

import lombok.AllArgsConstructor;

/**
* Collection of techniques for score combination
*/
@AllArgsConstructor
public enum ScoreCombinationTechnique {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this enum really necessary? Why not just pass the ArithmeticMeanScoreCombinationMethod instance itself and call method.combine directly instead of wrapping it with enum?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String for technique passed by user at factory level, we can pass enum variable safer between that many layers. Plus it's a single place that encapsulates technique and method abstractions. I switched to this approach after trying technique and method or string and then method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't all you need is just calling combine method in the end?
Then, can we just pass ArithmeticMeanScoreCombinationMethod instance at factory level? I don't think passing instance is not safer than passing enum.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my concern is more to keep factory light. We're going to add more methods and other configurations (like weights for sub-queries) later, and in such case factory became more responsible for kicking off calculations, while it's intended design was to parse user input and construct processor. Enum layer is taking that preparation for calculations responsibility so factory can stay light

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to convert string to enum in factory anyway. The suggested method is same, you just need to convert string to instance in factory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that now it's a simple conversion String -> enum, and it's same as String -> method. But with planned next PR changes (and we know we'll be doing this as per HLD RFC) it will be string + map_of_params -> method, and I'd like to keep it String -> enum. And also to keep knowledge of the actual computation method away from the factory and keep it at lower levels, factory knows just the technique, technique knows about method specifics .


/**
* Arithmetic mean method for combining scores.
*/
ARITHMETIC_MEAN(ArithmeticMeanScoreCombinationMethod.getInstance());

public static final ScoreCombinationTechnique DEFAULT = ARITHMETIC_MEAN;
private final ScoreCombinationMethod method;

public float combine(final float[] scores) {
return method.combine(scores);
}
}
Loading