Skip to content

Commit

Permalink
Merge branch '2.17' into backport/backport-15637-to-2.17
Browse files Browse the repository at this point in the history
Signed-off-by: Harsha Vamsi Kalluri <harshavamsi096@gmail.com>
  • Loading branch information
harshavamsi authored Sep 5, 2024
2 parents e93b9fc + 218266d commit eb41bda
Show file tree
Hide file tree
Showing 23 changed files with 299 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public Composite99DocValuesReader(DocValuesProducer producer, SegmentReadState r

// adding metric fields
for (Metric metric : starTreeMetadata.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
fields.add(
fullyQualifiedFieldNameForStarTreeMetricsDocValues(
compositeFieldName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

Expand All @@ -23,10 +24,18 @@
public class Metric implements ToXContent {
private final String field;
private final List<MetricStat> metrics;
private final List<MetricStat> baseMetrics;

public Metric(String field, List<MetricStat> metrics) {
this.field = field;
this.metrics = metrics;
this.baseMetrics = new ArrayList<>();
for (MetricStat metricStat : metrics) {
if (metricStat.isDerivedMetric()) {
continue;
}
baseMetrics.add(metricStat);
}
}

public String getField() {
Expand All @@ -37,6 +46,13 @@ public List<MetricStat> getMetrics() {
return metrics;
}

/**
* Returns only the base metrics
*/
public List<MetricStat> getBaseMetrics() {
return baseMetrics;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,7 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
metricAggregatorInfos.add(metricAggregatorInfo);
continue;
}
for (MetricStat metricStat : metric.getMetrics()) {
if (metricStat.isDerivedMetric()) {
continue;
}
for (MetricStat metricStat : metric.getBaseMetrics()) {
FieldValueConverter fieldValueConverter;
Mapper fieldMapper = mapperService.documentMapper().mappers().getMapper(metric.getField());
if (fieldMapper instanceof FieldMapper && ((FieldMapper) fieldMapper).fieldType() instanceof FieldValueConverter) {
Expand Down Expand Up @@ -185,7 +182,7 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeValues.getStarTreeField().getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ StarTreeDocument[] getSegmentsStarTreeDocuments(List<StarTreeValues> starTreeVal
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeValues.getStarTreeField().getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,37 @@ private int readMetricsCount() throws IOException {
private List<Metric> readMetricEntries() throws IOException {
int metricCount = readMetricsCount();

Map<String, Metric> starTreeMetricMap = new LinkedHashMap<>();
Map<String, List<MetricStat>> starTreeMetricStatMap = new LinkedHashMap<>();
for (int i = 0; i < metricCount; i++) {
String metricName = meta.readString();
int metricStatOrdinal = meta.readVInt();
MetricStat metricStat = MetricStat.fromMetricOrdinal(metricStatOrdinal);
Metric metric = starTreeMetricMap.computeIfAbsent(metricName, field -> new Metric(field, new ArrayList<>()));
metric.getMetrics().add(metricStat);
List<MetricStat> metricStats = starTreeMetricStatMap.computeIfAbsent(metricName, field -> new ArrayList<>());
metricStats.add(metricStat);
}
List<Metric> starTreeMetricMap = new ArrayList<>();
for (Map.Entry<String, List<MetricStat>> metricStatsEntry : starTreeMetricStatMap.entrySet()) {
addEligibleDerivedMetrics(metricStatsEntry.getValue());
starTreeMetricMap.add(new Metric(metricStatsEntry.getKey(), metricStatsEntry.getValue()));

return new ArrayList<>(starTreeMetricMap.values());
}
return starTreeMetricMap;
}

/**
* Add derived metrics if all associated base metrics are present
*/
private void addEligibleDerivedMetrics(List<MetricStat> metricStatsList) {
Set<MetricStat> metricStatsSet = new HashSet<>(metricStatsList);
for (MetricStat metric : MetricStat.values()) {
if (metric.isDerivedMetric() && !metricStatsSet.contains(metric)) {
List<MetricStat> sourceMetrics = metric.getBaseMetrics();
if (metricStatsSet.containsAll(sourceMetrics)) {
metricStatsList.add(metric);
metricStatsSet.add(metric);
}
}
}
}

private int readSegmentAggregatedDocCount() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public StarTreeValues(

// get doc id set iterators for metrics
for (Metric metric : starTreeMetadata.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
String metricFullName = fullyQualifiedFieldNameForStarTreeMetricsDocValues(
starTreeField.getName(),
metric.getField(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,7 @@ public byte[] encodePoint(Number value) {

@Override
public double toDoubleValue(long value) {
byte[] bytes = new byte[8];
NumericUtils.longToSortableBytes(value, bytes, 0);
return NumericUtils.sortableLongToDouble(NumericUtils.sortableBytesToLong(bytes, 0));
return objectToDouble(value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,7 @@ private List<Metric> buildMetrics(String fieldName, Map<String, Object> map, Map
}
int numBaseMetrics = 0;
for (Metric metric : metrics) {
for (MetricStat metricStat : metric.getMetrics()) {
if (metricStat.isDerivedMetric() == false) {
numBaseMetrics++;
}
}
numBaseMetrics += metric.getBaseMetrics().size();
}
if (numBaseMetrics > context.getSettings()
.getAsInt(
Expand Down
8 changes: 4 additions & 4 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.search.query.QueryPhase;
Expand Down Expand Up @@ -1337,7 +1337,7 @@ protected Node(
circuitBreakerService,
searchModule.getIndexSearcherExecutor(threadPool),
taskResourceTrackingService,
searchModule.getConcurrentSearchDeciders()
searchModule.getConcurrentSearchRequestDeciderFactories()
);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class)
Expand Down Expand Up @@ -1987,7 +1987,7 @@ protected SearchService newSearchService(
CircuitBreakerService circuitBreakerService,
Executor indexSearcherExecutor,
TaskResourceTrackingService taskResourceTrackingService,
Collection<ConcurrentSearchDecider> concurrentSearchDecidersList
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) {
return new SearchService(
clusterService,
Expand All @@ -2001,7 +2001,7 @@ protected SearchService newSearchService(
circuitBreakerService,
indexSearcherExecutor,
taskResourceTrackingService,
concurrentSearchDecidersList
concurrentSearchDeciderFactories
);
}

Expand Down
10 changes: 5 additions & 5 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.highlight.Highlighter;
import org.opensearch.search.query.QueryPhaseSearcher;
Expand Down Expand Up @@ -141,12 +141,12 @@ default Map<String, Highlighter> getHighlighters() {
}

/**
* Allows plugins to register custom decider for concurrent search
* @return A {@link ConcurrentSearchDecider}
* Allows plugins to register a factory to create custom decider for concurrent search
* @return A {@link ConcurrentSearchRequestDecider.Factory}
*/
@ExperimentalApi
default ConcurrentSearchDecider getConcurrentSearchDecider() {
return null;
default Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
return Optional.empty();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchDecision;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.deciders.ConcurrentSearchVisitor;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -106,13 +106,14 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.stream.Collectors;

import static org.opensearch.search.SearchService.CARDINALITY_AGGREGATION_PRUNING_THRESHOLD;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_MODE;
Expand All @@ -137,7 +138,7 @@ final class DefaultSearchContext extends SearchContext {
private final ShardSearchRequest request;
private final SearchShardTarget shardTarget;
private final LongSupplier relativeTimeSupplier;
private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;
private SearchType searchType;
private final BigArrays bigArrays;
private final IndexShard indexShard;
Expand Down Expand Up @@ -223,7 +224,7 @@ final class DefaultSearchContext extends SearchContext {
boolean validate,
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder,
Collection<ConcurrentSearchDecider> concurrentSearchDeciders
Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -267,8 +268,8 @@ final class DefaultSearchContext extends SearchContext {

this.maxAggRewriteFilters = evaluateFilterRewriteSetting();
this.cardinalityAggregationPruningThreshold = evaluateCardinalityAggregationPruningThreshold();
this.concurrentSearchDeciders = concurrentSearchDeciders;
this.keywordIndexOrDocValuesEnabled = evaluateKeywordIndexOrDocValuesEnabled();
this.concurrentSearchDeciderFactories = concurrentSearchDeciderFactories;
}

@Override
Expand Down Expand Up @@ -932,14 +933,21 @@ public boolean shouldUseConcurrentSearch() {

private boolean evaluateAutoMode() {

// filter out deciders that want to opt-out of decision-making
final Set<ConcurrentSearchDecider> filteredDeciders = concurrentSearchDeciders.stream()
.filter(concurrentSearchDecider -> concurrentSearchDecider.canEvaluateForIndex(indexService.getIndexSettings()))
.collect(Collectors.toSet());
final Set<ConcurrentSearchRequestDecider> concurrentSearchRequestDeciders = new HashSet<>();

// create the ConcurrentSearchRequestDeciders using registered factories
for (ConcurrentSearchRequestDecider.Factory deciderFactory : concurrentSearchDeciderFactories) {
final Optional<ConcurrentSearchRequestDecider> concurrentSearchRequestDecider = deciderFactory.create(
indexService.getIndexSettings()
);
concurrentSearchRequestDecider.ifPresent(concurrentSearchRequestDeciders::add);

}

// evaluate based on concurrent search query visitor
if (filteredDeciders.size() > 0) {
if (concurrentSearchRequestDeciders.size() > 0) {
ConcurrentSearchVisitor concurrentSearchVisitor = new ConcurrentSearchVisitor(
filteredDeciders,
concurrentSearchRequestDeciders,
indexService.getIndexSettings()
);
if (request().source() != null && request().source().query() != null) {
Expand All @@ -949,7 +957,7 @@ private boolean evaluateAutoMode() {
}

final List<ConcurrentSearchDecision> decisions = new ArrayList<>();
for (ConcurrentSearchDecider decider : filteredDeciders) {
for (ConcurrentSearchRequestDecider decider : concurrentSearchRequestDeciders) {
ConcurrentSearchDecision decision = decider.getConcurrentSearchDecision();
if (decision != null) {
if (logger.isDebugEnabled()) {
Expand Down
24 changes: 11 additions & 13 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.deciders.ConcurrentSearchDecider;
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
import org.opensearch.search.fetch.subphase.ExplainPhase;
Expand Down Expand Up @@ -334,7 +334,7 @@ public class SearchModule {
private final QueryPhaseSearcher queryPhaseSearcher;
private final SearchPlugin.ExecutorServiceProvider indexSearcherExecutorProvider;

private final Collection<ConcurrentSearchDecider> concurrentSearchDeciders;
private final Collection<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories;

/**
* Constructs a new SearchModule object
Expand Down Expand Up @@ -364,25 +364,23 @@ public SearchModule(Settings settings, List<SearchPlugin> plugins) {
queryPhaseSearcher = registerQueryPhaseSearcher(plugins);
indexSearcherExecutorProvider = registerIndexSearcherExecutorProvider(plugins);
namedWriteables.addAll(SortValue.namedWriteables());
concurrentSearchDeciders = registerConcurrentSearchDeciders(plugins);
concurrentSearchDeciderFactories = registerConcurrentSearchDeciderFactories(plugins);
}

private Collection<ConcurrentSearchDecider> registerConcurrentSearchDeciders(List<SearchPlugin> plugins) {
List<ConcurrentSearchDecider> concurrentSearchDeciders = new ArrayList<>();
private Collection<ConcurrentSearchRequestDecider.Factory> registerConcurrentSearchDeciderFactories(List<SearchPlugin> plugins) {
List<ConcurrentSearchRequestDecider.Factory> concurrentSearchDeciderFactories = new ArrayList<>();
for (SearchPlugin plugin : plugins) {
ConcurrentSearchDecider decider = plugin.getConcurrentSearchDecider();
if (decider != null) {
concurrentSearchDeciders.add(decider);
}
final Optional<ConcurrentSearchRequestDecider.Factory> deciderFactory = plugin.getConcurrentSearchRequestDeciderFactory();
deciderFactory.ifPresent(concurrentSearchDeciderFactories::add);
}
return concurrentSearchDeciders;
return concurrentSearchDeciderFactories;
}

/**
* Returns the concurrent search deciders that the plugins have registered
* Returns the concurrent search decider factories that the plugins have registered
*/
public Collection<ConcurrentSearchDecider> getConcurrentSearchDeciders() {
return concurrentSearchDeciders;
public Collection<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactories() {
return concurrentSearchDeciderFactories;
}

public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
Expand Down
Loading

0 comments on commit eb41bda

Please sign in to comment.