-
Notifications
You must be signed in to change notification settings - Fork 70
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added Score Normalization and Combination feature (#241)
* Added Score Normalization and Combination feature Signed-off-by: Martin Gaievski <gaievski@amazon.com>
- Loading branch information
1 parent
2ff58a9
commit 61e6e98
Showing
57 changed files
with
8,464 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessor.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.neuralsearch.processor; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.AllArgsConstructor; | ||
import lombok.extern.log4j.Log4j2; | ||
|
||
import org.opensearch.action.search.QueryPhaseResultConsumer; | ||
import org.opensearch.action.search.SearchPhaseContext; | ||
import org.opensearch.action.search.SearchPhaseName; | ||
import org.opensearch.action.search.SearchPhaseResults; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.neuralsearch.search.CompoundTopDocs; | ||
import org.opensearch.search.SearchPhaseResult; | ||
import org.opensearch.search.internal.SearchContext; | ||
import org.opensearch.search.pipeline.SearchPhaseResultsProcessor; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
/** | ||
* Processor for score normalization and combination on post query search results. Updates query results with | ||
* normalized and combined scores for next phase (typically it's FETCH) | ||
*/ | ||
@Log4j2 | ||
@AllArgsConstructor | ||
public class NormalizationProcessor implements SearchPhaseResultsProcessor { | ||
public static final String TYPE = "normalization-processor"; | ||
|
||
private final String tag; | ||
private final String description; | ||
private final ScoreNormalizationTechnique normalizationTechnique; | ||
private final ScoreCombinationTechnique combinationTechnique; | ||
private final NormalizationProcessorWorkflow normalizationWorkflow; | ||
|
||
/** | ||
* Method abstracts functional aspect of score normalization and score combination. Exact methods for each processing stage | ||
* are set as part of class constructor | ||
* @param searchPhaseResult {@link SearchPhaseResults} DTO that has query search results. Results will be mutated as part of this method execution | ||
* @param searchPhaseContext {@link SearchContext} | ||
*/ | ||
@Override | ||
public <Result extends SearchPhaseResult> void process( | ||
final SearchPhaseResults<Result> searchPhaseResult, | ||
final SearchPhaseContext searchPhaseContext | ||
) { | ||
if (shouldRunProcessor(searchPhaseResult)) { | ||
return; | ||
} | ||
List<QuerySearchResult> querySearchResults = getQueryPhaseSearchResults(searchPhaseResult); | ||
normalizationWorkflow.execute(querySearchResults, normalizationTechnique, combinationTechnique); | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getBeforePhase() { | ||
return SearchPhaseName.QUERY; | ||
} | ||
|
||
@Override | ||
public SearchPhaseName getAfterPhase() { | ||
return SearchPhaseName.FETCH; | ||
} | ||
|
||
@Override | ||
public String getType() { | ||
return TYPE; | ||
} | ||
|
||
@Override | ||
public String getTag() { | ||
return tag; | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
return description; | ||
} | ||
|
||
@Override | ||
public boolean isIgnoreFailure() { | ||
return false; | ||
} | ||
|
||
private <Result extends SearchPhaseResult> boolean shouldRunProcessor(SearchPhaseResults<Result> searchPhaseResult) { | ||
if (Objects.isNull(searchPhaseResult) || !(searchPhaseResult instanceof QueryPhaseResultConsumer)) { | ||
return true; | ||
} | ||
|
||
QueryPhaseResultConsumer queryPhaseResultConsumer = (QueryPhaseResultConsumer) searchPhaseResult; | ||
Optional<SearchPhaseResult> optionalSearchPhaseResult = queryPhaseResultConsumer.getAtomicArray() | ||
.asList() | ||
.stream() | ||
.filter(Objects::nonNull) | ||
.findFirst(); | ||
return isNotHybridQuery(optionalSearchPhaseResult); | ||
} | ||
|
||
private boolean isNotHybridQuery(final Optional<SearchPhaseResult> optionalSearchPhaseResult) { | ||
return optionalSearchPhaseResult.isEmpty() | ||
|| Objects.isNull(optionalSearchPhaseResult.get().queryResult()) | ||
|| Objects.isNull(optionalSearchPhaseResult.get().queryResult().topDocs()) | ||
|| !(optionalSearchPhaseResult.get().queryResult().topDocs().topDocs instanceof CompoundTopDocs); | ||
} | ||
|
||
private <Result extends SearchPhaseResult> List<QuerySearchResult> getQueryPhaseSearchResults( | ||
final SearchPhaseResults<Result> results | ||
) { | ||
return results.getAtomicArray() | ||
.asList() | ||
.stream() | ||
.map(result -> result == null ? null : result.queryResult()) | ||
.collect(Collectors.toList()); | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
src/main/java/org/opensearch/neuralsearch/processor/NormalizationProcessorWorkflow.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.neuralsearch.processor; | ||
|
||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.AllArgsConstructor; | ||
|
||
import org.opensearch.common.lucene.search.TopDocsAndMaxScore; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombinationTechnique; | ||
import org.opensearch.neuralsearch.processor.combination.ScoreCombiner; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizationTechnique; | ||
import org.opensearch.neuralsearch.processor.normalization.ScoreNormalizer; | ||
import org.opensearch.neuralsearch.search.CompoundTopDocs; | ||
import org.opensearch.search.query.QuerySearchResult; | ||
|
||
/** | ||
* Class abstracts steps required for score normalization and combination, this includes pre-processing of incoming data | ||
* and post-processing of final results | ||
*/ | ||
@AllArgsConstructor | ||
public class NormalizationProcessorWorkflow { | ||
|
||
private final ScoreNormalizer scoreNormalizer; | ||
private final ScoreCombiner scoreCombiner; | ||
|
||
/** | ||
* Start execution of this workflow | ||
* @param querySearchResults input data with QuerySearchResult from multiple shards | ||
* @param normalizationTechnique technique for score normalization | ||
* @param combinationTechnique technique for score combination | ||
*/ | ||
public void execute( | ||
final List<QuerySearchResult> querySearchResults, | ||
final ScoreNormalizationTechnique normalizationTechnique, | ||
final ScoreCombinationTechnique combinationTechnique | ||
) { | ||
// pre-process data | ||
List<CompoundTopDocs> queryTopDocs = getQueryTopDocs(querySearchResults); | ||
|
||
// normalize | ||
scoreNormalizer.normalizeScores(queryTopDocs, normalizationTechnique); | ||
|
||
// combine | ||
scoreCombiner.combineScores(queryTopDocs, combinationTechnique); | ||
|
||
// post-process data | ||
updateOriginalQueryResults(querySearchResults, queryTopDocs); | ||
} | ||
|
||
/** | ||
* Getting list of CompoundTopDocs from list of QuerySearchResult. Each CompoundTopDocs is for individual shard | ||
* @param querySearchResults collection of QuerySearchResult for all shards | ||
* @return collection of CompoundTopDocs, one object for each shard | ||
*/ | ||
private List<CompoundTopDocs> getQueryTopDocs(final List<QuerySearchResult> querySearchResults) { | ||
List<CompoundTopDocs> queryTopDocs = querySearchResults.stream() | ||
.filter(searchResult -> Objects.nonNull(searchResult.topDocs())) | ||
.filter(searchResult -> searchResult.topDocs().topDocs instanceof CompoundTopDocs) | ||
.map(searchResult -> (CompoundTopDocs) searchResult.topDocs().topDocs) | ||
.collect(Collectors.toList()); | ||
return queryTopDocs; | ||
} | ||
|
||
private void updateOriginalQueryResults(final List<QuerySearchResult> querySearchResults, final List<CompoundTopDocs> queryTopDocs) { | ||
for (int i = 0; i < querySearchResults.size(); i++) { | ||
QuerySearchResult querySearchResult = querySearchResults.get(i); | ||
if (!(querySearchResult.topDocs().topDocs instanceof CompoundTopDocs) || Objects.isNull(queryTopDocs.get(i))) { | ||
continue; | ||
} | ||
CompoundTopDocs updatedTopDocs = queryTopDocs.get(i); | ||
float maxScore = updatedTopDocs.totalHits.value > 0 ? updatedTopDocs.scoreDocs[0].score : 0.0f; | ||
TopDocsAndMaxScore updatedTopDocsAndMaxScore = new TopDocsAndMaxScore(updatedTopDocs, maxScore); | ||
querySearchResult.topDocs(updatedTopDocsAndMaxScore, null); | ||
} | ||
} | ||
} |
54 changes: 54 additions & 0 deletions
54
...pensearch/neuralsearch/processor/combination/ArithmeticMeanScoreCombinationTechnique.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.neuralsearch.processor.combination; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* Abstracts combination of scores based on arithmetic mean method | ||
*/ | ||
public class ArithmeticMeanScoreCombinationTechnique implements ScoreCombinationTechnique { | ||
|
||
public static final String TECHNIQUE_NAME = "arithmetic_mean"; | ||
public static final String PARAM_NAME_WEIGHTS = "weights"; | ||
private static final Set<String> SUPPORTED_PARAMS = Set.of(PARAM_NAME_WEIGHTS); | ||
private static final Float ZERO_SCORE = 0.0f; | ||
private final List<Float> weights; | ||
private final ScoreCombinationUtil scoreCombinationUtil; | ||
|
||
public ArithmeticMeanScoreCombinationTechnique(final Map<String, Object> params, final ScoreCombinationUtil combinationUtil) { | ||
scoreCombinationUtil = combinationUtil; | ||
scoreCombinationUtil.validateParams(params, SUPPORTED_PARAMS); | ||
weights = scoreCombinationUtil.getWeights(params); | ||
} | ||
|
||
/** | ||
* Arithmetic mean method for combining scores. | ||
* score = (weight1*score1 + weight2*score2 +...+ weightN*scoreN)/(weight1 + weight2 + ... + weightN) | ||
* | ||
* Zero (0.0) scores are excluded from number of scores N | ||
*/ | ||
@Override | ||
public float combine(final float[] scores) { | ||
float combinedScore = 0.0f; | ||
float sumOfWeights = 0; | ||
for (int indexOfSubQuery = 0; indexOfSubQuery < scores.length; indexOfSubQuery++) { | ||
float score = scores[indexOfSubQuery]; | ||
if (score >= 0.0) { | ||
float weight = scoreCombinationUtil.getWeightForSubQuery(weights, indexOfSubQuery); | ||
score = score * weight; | ||
combinedScore += score; | ||
sumOfWeights += weight; | ||
} | ||
} | ||
if (sumOfWeights == 0.0f) { | ||
return ZERO_SCORE; | ||
} | ||
return combinedScore / sumOfWeights; | ||
} | ||
} |
Oops, something went wrong.