Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remove] Deprecated serialization logic from pipeline aggs #4847

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Always auto release the flood stage block ([#4703](https://github.com/opensearch-project/OpenSearch/pull/4703))
- Remove LegacyESVersion.V_7_4_ and V_7_5_ Constants ([#4704](https://github.com/opensearch-project/OpenSearch/pull/4704))
- Remove Legacy Version support from Snapshot/Restore Service ([#4728](https://github.com/opensearch-project/OpenSearch/pull/4728))
- Remove deprecated serialization logic from pipeline aggs ([#4847](https://github.com/opensearch-project/OpenSearch/pull/4847))

### Fixed
- `opensearch-service.bat start` and `opensearch-service.bat manager` failing to run ([#4289](https://github.com/opensearch-project/OpenSearch/pull/4289))
Expand Down
74 changes: 1 addition & 73 deletions server/src/main/java/org/opensearch/plugins/SearchPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -621,52 +621,6 @@ class PipelineAggregationSpec extends SearchExtensionSpec<
PipelineAggregationBuilder,
ContextParser<String, ? extends PipelineAggregationBuilder>> {
private final Map<String, Writeable.Reader<? extends InternalAggregation>> resultReaders = new TreeMap<>();
/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
private final Writeable.Reader<? extends PipelineAggregator> aggregatorReader;

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name holds the names by which this aggregation might be parsed. The {@link ParseField#getPreferredName()} is special as it
* is the name by under which the readers are registered. So it is the name that the {@link PipelineAggregationBuilder} and
* {@link PipelineAggregator} should return from {@link NamedWriteable#getWriteableName()}. It is an error if
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
*
* @param name name by which this aggregation might be parsed or deserialized. Make sure it is the name that the
* {@link PipelineAggregationBuilder} and {@link PipelineAggregator} should return from
* {@link NamedWriteable#getWriteableName()}. It is an error if this name conflicts with another registered name, including
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param parser reads the aggregation builder from XContent
*/
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = null;
}

/**
* Specification of a {@link PipelineAggregator}.
Expand All @@ -677,20 +631,14 @@ public PipelineAggregationSpec(
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(ParseField, Writeable.Reader, ContextParser)} for
* pipelines implemented after 7.8.0
*/
@Deprecated
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -702,20 +650,15 @@ public PipelineAggregationSpec(
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated Use {@link PipelineAggregationSpec#PipelineAggregationSpec(String, Writeable.Reader, ContextParser)} for pipelines
* implemented after 7.8.0

*/
@Deprecated
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
ContextParser<String, ? extends PipelineAggregationBuilder> parser
) {
super(name, builderReader, parser);
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -727,19 +670,16 @@ public PipelineAggregationSpec(
* {@link ParseField#getPreferredName()} conflicts with another registered name, including names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @param parser reads the aggregation builder from XContent
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(
ParseField name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser
) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -751,18 +691,15 @@ public PipelineAggregationSpec(
* names from other plugins.
* @param builderReader the reader registered for this aggregation's builder. Typically, a reference to a constructor that takes a
* {@link StreamInput}
* @param aggregatorReader reads the {@link PipelineAggregator} from a stream
* @deprecated prefer the ctor that takes a {@link ContextParser}
*/
@Deprecated
public PipelineAggregationSpec(
String name,
Writeable.Reader<? extends PipelineAggregationBuilder> builderReader,
Writeable.Reader<? extends PipelineAggregator> aggregatorReader,
PipelineAggregator.Parser parser
) {
super(name, builderReader, (p, n) -> parser.parse(n, p));
this.aggregatorReader = aggregatorReader;
}

/**
Expand All @@ -781,15 +718,6 @@ public PipelineAggregationSpec addResultReader(String writeableName, Writeable.R
return this;
}

/**
* Read the aggregator from a stream.
* @deprecated Pipelines implemented after 7.8.0 do not need to be sent across the wire
*/
@Deprecated
public Writeable.Reader<? extends PipelineAggregator> getAggregatorReader() {
return aggregatorReader;
}

/**
* Get the readers that must be registered for this aggregation's results.
*/
Expand Down
36 changes: 0 additions & 36 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,14 @@
import org.opensearch.search.aggregations.metrics.ValueCountAggregationBuilder;
import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.AvgBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketScriptPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketSelectorPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.BucketSortPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.CumulativeSumPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.DerivativePipelineAggregator;
import org.opensearch.search.aggregations.pipeline.EwmaModel;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketParser;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.ExtendedStatsBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.HoltLinearModel;
import org.opensearch.search.aggregations.pipeline.HoltWintersModel;
import org.opensearch.search.aggregations.pipeline.InternalBucketMetricValue;
Expand All @@ -236,24 +229,15 @@
import org.opensearch.search.aggregations.pipeline.InternalStatsBucket;
import org.opensearch.search.aggregations.pipeline.LinearModel;
import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MaxBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MinBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MovAvgModel;
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MovAvgPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.MovFnPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.PercentilesBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SerialDiffPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SimpleModel;
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.StatsBucketPipelineAggregator;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregationBuilder;
import org.opensearch.search.aggregations.pipeline.SumBucketPipelineAggregator;
import org.opensearch.search.aggregations.support.ValuesSourceRegistry;
import org.opensearch.search.fetch.FetchPhase;
import org.opensearch.search.fetch.FetchSubPhase;
Expand Down Expand Up @@ -710,15 +694,13 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
DerivativePipelineAggregationBuilder.NAME,
DerivativePipelineAggregationBuilder::new,
DerivativePipelineAggregator::new,
DerivativePipelineAggregationBuilder::parse
).addResultReader(InternalDerivative::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MaxBucketPipelineAggregationBuilder.NAME,
MaxBucketPipelineAggregationBuilder::new,
MaxBucketPipelineAggregator::new,
MaxBucketPipelineAggregationBuilder.PARSER
)
// This bucket is used by many pipeline aggreations.
Expand All @@ -728,7 +710,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
MinBucketPipelineAggregationBuilder.NAME,
MinBucketPipelineAggregationBuilder::new,
MinBucketPipelineAggregator::new,
MinBucketPipelineAggregationBuilder.PARSER
)
/* Uses InternalBucketMetricValue */
Expand All @@ -737,7 +718,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
AvgBucketPipelineAggregationBuilder.NAME,
AvgBucketPipelineAggregationBuilder::new,
AvgBucketPipelineAggregator::new,
AvgBucketPipelineAggregationBuilder.PARSER
)
// This bucket is used by many pipeline aggreations.
Expand All @@ -747,7 +727,6 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
SumBucketPipelineAggregationBuilder.NAME,
SumBucketPipelineAggregationBuilder::new,
SumBucketPipelineAggregator::new,
SumBucketPipelineAggregationBuilder.PARSER
)
/* Uses InternalSimpleValue */
Expand All @@ -756,31 +735,27 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
StatsBucketPipelineAggregationBuilder.NAME,
StatsBucketPipelineAggregationBuilder::new,
StatsBucketPipelineAggregator::new,
StatsBucketPipelineAggregationBuilder.PARSER
).addResultReader(InternalStatsBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
ExtendedStatsBucketPipelineAggregationBuilder.NAME,
ExtendedStatsBucketPipelineAggregationBuilder::new,
ExtendedStatsBucketPipelineAggregator::new,
new ExtendedStatsBucketParser()
).addResultReader(InternalExtendedStatsBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
PercentilesBucketPipelineAggregationBuilder.NAME,
PercentilesBucketPipelineAggregationBuilder::new,
PercentilesBucketPipelineAggregator::new,
PercentilesBucketPipelineAggregationBuilder.PARSER
).addResultReader(InternalPercentilesBucket::new)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MovAvgPipelineAggregationBuilder.NAME,
MovAvgPipelineAggregationBuilder::new,
MovAvgPipelineAggregator::new,
(XContentParser parser, String name) -> MovAvgPipelineAggregationBuilder.parse(
movingAverageModelParserRegistry,
name,
Expand All @@ -792,47 +767,41 @@ private void registerPipelineAggregations(List<SearchPlugin> plugins) {
new PipelineAggregationSpec(
CumulativeSumPipelineAggregationBuilder.NAME,
CumulativeSumPipelineAggregationBuilder::new,
CumulativeSumPipelineAggregator::new,
CumulativeSumPipelineAggregationBuilder.PARSER
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketScriptPipelineAggregationBuilder.NAME,
BucketScriptPipelineAggregationBuilder::new,
BucketScriptPipelineAggregator::new,
BucketScriptPipelineAggregationBuilder.PARSER
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketSelectorPipelineAggregationBuilder.NAME,
BucketSelectorPipelineAggregationBuilder::new,
BucketSelectorPipelineAggregator::new,
BucketSelectorPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
BucketSortPipelineAggregationBuilder.NAME,
BucketSortPipelineAggregationBuilder::new,
BucketSortPipelineAggregator::new,
BucketSortPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
SerialDiffPipelineAggregationBuilder.NAME,
SerialDiffPipelineAggregationBuilder::new,
SerialDiffPipelineAggregator::new,
SerialDiffPipelineAggregationBuilder::parse
)
);
registerPipelineAggregation(
new PipelineAggregationSpec(
MovFnPipelineAggregationBuilder.NAME,
MovFnPipelineAggregationBuilder::new,
MovFnPipelineAggregator::new,
MovFnPipelineAggregationBuilder.PARSER
)
);
Expand All @@ -847,11 +816,6 @@ private void registerPipelineAggregation(PipelineAggregationSpec spec) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregationBuilder.class, spec.getName().getPreferredName(), spec.getReader())
);
if (spec.getAggregatorReader() != null) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(PipelineAggregator.class, spec.getName().getPreferredName(), spec.getAggregatorReader())
);
}
for (Map.Entry<String, Writeable.Reader<? extends InternalAggregation>> resultReader : spec.getResultReaders().entrySet()) {
namedWriteables.add(
new NamedWriteableRegistry.Entry(InternalAggregation.class, resultReader.getKey(), resultReader.getValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ public void execute(SearchContext context) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
context.queryResult()
.aggregations(new InternalAggregations(aggregations, context.request().source().aggregations()::buildPipelineTree));
context.queryResult().aggregations(new InternalAggregations(aggregations));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Loading