Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deprecate span collector #990

Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.opensearch.sql.ast.expression.SpanUnit;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.planner.physical.collector.Rounding;

@RequiredArgsConstructor
@Getter
@ToString
@EqualsAndHashCode
Expand All @@ -25,9 +24,19 @@ public class SpanExpression implements Expression {
private final Expression value;
private final SpanUnit unit;

/**
* Construct a span expression by field and span interval expression.
*/
public SpanExpression(Expression field, Expression value, SpanUnit unit) {
this.field = field;
this.value = value;
this.unit = unit;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
return value.valueOf(valueEnv);
Rounding<?> rounding = Rounding.createRounding(this); //TODO: will integrate with WindowAssigner
return rounding.round(field.valueOf(valueEnv));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public class AggregationOperator extends PhysicalPlan {
private final List<NamedAggregator> aggregatorList;
@Getter
private final List<NamedExpression> groupByExprList;
@Getter
private final NamedExpression span;

/**
* {@link BindingTuple} Collector.
*/
Expand All @@ -56,18 +55,7 @@ public AggregationOperator(PhysicalPlan input, List<NamedAggregator> aggregatorL
this.input = input;
this.aggregatorList = aggregatorList;
this.groupByExprList = groupByExprList;
if (hasSpan(groupByExprList)) {
// span expression is always the first expression in group list if exist.
this.span = groupByExprList.get(0);
this.collector =
Collector.Builder.build(
this.span, groupByExprList.subList(1, groupByExprList.size()), this.aggregatorList);

} else {
this.span = null;
this.collector =
Collector.Builder.build(this.span, this.groupByExprList, this.aggregatorList);
}
this.collector = Collector.Builder.build(groupByExprList, this.aggregatorList);
}

@Override
Expand Down Expand Up @@ -99,9 +87,4 @@ public void open() {
}
iterator = collector.results().iterator();
}

private boolean hasSpan(List<NamedExpression> namedExpressionList) {
return !namedExpressionList.isEmpty()
&& namedExpressionList.get(0).getDelegated() instanceof SpanExpression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.RequiredArgsConstructor;
Expand All @@ -38,9 +38,10 @@ public class BucketCollector implements Collector {
private final Supplier<Collector> supplier;

/**
* Map between bucketKey and collector in the bucket.
* Map from bucketKey to nested collector sorted by key to make sure
* final result is in order after traversal.
*/
private final Map<ExprValue, Collector> collectorMap = new HashMap<>();
private final Map<ExprValue, Collector> collectorMap = new TreeMap<>();

/**
* Bucket Index.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,14 @@ class Builder {
/**
* build {@link Collector}.
*/
public static Collector build(
NamedExpression span, List<NamedExpression> buckets, List<NamedAggregator> aggregators) {
if (span == null && buckets.isEmpty()) {
public static Collector build(List<NamedExpression> buckets,
List<NamedAggregator> aggregators) {
if (buckets.isEmpty()) {
return new MetricCollector(aggregators);
} else if (span != null) {
return new SpanCollector(span, () -> build(null, buckets, aggregators));
} else {
return new BucketCollector(
buckets.get(0),
() ->
build(null, ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators));
() -> build(ImmutableList.copyOf(buckets.subList(1, buckets.size())), aggregators));
}
}
}
Expand Down
Loading