Skip to content

Commit

Permalink
IGNITE-24291 Calcite reuse Row for non-storing accumulators (#11827)
Browse files Browse the repository at this point in the history
  • Loading branch information
timoninmaxim authored Feb 6, 2025
1 parent 32650bf commit fda9dbd
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ public ArrayConcatAggregateAccumulator(AggregateCall aggCall, RowHandler<Row> hn
}

/** */
private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> {
private static class DistinctAccumulator<Row> extends AbstractAccumulator<Row> implements StoringAccumulator {
/** */
private final Accumulator<Row> acc;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,17 @@ private WrapperPrototype(AggregateCall call) {
for (int i = 0; i < call.getArgList().size(); ++i)
argMapping[call.getArgList().get(i)] = i;

boolean createRow = StoringAccumulator.class.isAssignableFrom(accumulator.getClass());

return new Function<Row, Row>() {
final RowHandler<Row> hnd = ctx.rowHandler();

final RowHandler.RowFactory<Row> rowFac = hnd.factory(ctx.getTypeFactory(), inputRowType);

final Row reuseRow = createRow ? null : rowFac.create();

@Override public Row apply(Row in) {
Row out = rowFac.create();
Row out = createRow ? rowFac.create() : reuseRow;

for (int i = 0; i < hnd.columnCount(in); ++i) {
Object val = hnd.get(i, in);
Expand Down Expand Up @@ -319,11 +323,12 @@ private final class AccumulatorWrapperImpl implements AccumulatorWrapper<Row> {
if (filterArg >= 0 && Boolean.TRUE != handler.get(filterArg, row))
return;

Row newRow = inAdapter.apply(row);
if (newRow == null)
Row accRow = inAdapter.apply(row);

if (accRow == null)
return;

accumulator.add(newRow);
accumulator.add(accRow);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@

package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;

/**
* Interface for row storing accumulator.
*/
public interface IterableAccumulator<Row> extends Accumulator<Row>, Iterable<Row> {
/** */
public interface IterableAccumulator<Row> extends Accumulator<Row>, Iterable<Row>, StoringAccumulator {
// No-op.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.exp.agg;

/**
* Interface for row storing accumulator.
*/
public interface StoringAccumulator {
// No-op.
}

0 comments on commit fda9dbd

Please sign in to comment.