From 0c58f88dedd3384b9bdb5b837f6c1d6b48147524 Mon Sep 17 00:00:00 2001 From: Sree Charan Manamala Date: Fri, 20 Sep 2024 15:04:35 +0530 Subject: [PATCH] Add serde for ColumnBasedRowsAndColumns to fix window queries without group by (#16658) (#17111) Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine. (cherry picked from commit bb1c3c174944460c22c6dd153579dd18994b1f60) --- .../DruidDefaultSerializersModule.java | 18 +-- ...WindowOperatorQueryQueryRunnerFactory.java | 18 +-- .../rowsandcols/AppendableMapOfColumns.java | 10 -- .../rowsandcols/ConcatRowsAndColumns.java | 7 -- .../CursorFactoryRowsAndColumns.java | 2 +- .../rowsandcols/EmptyRowsAndColumns.java | 8 -- .../LazilyDecoratedRowsAndColumns.java | 14 +-- .../rowsandcols/LimitedRowsAndColumns.java | 9 -- .../MapOfColumnsRowsAndColumns.java | 2 +- .../rowsandcols/RearrangedRowsAndColumns.java | 7 -- .../query/rowsandcols/RowsAndColumns.java | 86 +++++++++++++- .../concrete/AbstractFrameRowsAndColumns.java | 106 ++++++++++++++++++ .../ColumnBasedFrameRowsAndColumns.java | 54 ++------- .../FrameRowsAndColumns.java} | 18 +-- .../concrete/RowBasedFrameRowsAndColumns.java | 44 +------- .../jackson/DefaultObjectMapperTest.java | 26 +++++ .../query/rowsandcols/NoAsRowsAndColumns.java | 9 -- .../ColumnBasedFrameRowsAndColumnsTest.java | 10 +- .../semantic/RowsAndColumnsDecoratorTest.java | 37 ++++++ .../sql/calcite/run/NativeSqlEngine.java | 2 +- .../druid/sql/calcite/CalciteQueryTest.java | 1 + .../sql/calcite/CalciteWindowQueryTest.java | 2 +- .../druid/sql/calcite/NotYetSupported.java | 1 + .../wikipediaFramedAggregations.sqlTest | 2 +- 24 files changed, 296 insertions(+), 197 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java rename processing/src/main/java/org/apache/druid/query/rowsandcols/{semantic/WireTransferable.java => concrete/FrameRowsAndColumns.java} (67%) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 30cc388f1d9d..cad8fdfd8315 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -37,7 +37,6 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.joda.time.DateTimeZone; import java.io.IOException; @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws ); addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); - addSerializer(RowsAndColumns.class, new JsonSerializer() - { - @Override - public void serialize( - RowsAndColumns value, - JsonGenerator gen, - SerializerProvider serializers - ) throws IOException - { - // It would be really cool if jackson offered an output stream that would allow us to push bytes - // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute - // back to Jackson at some point. - gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); - } - }); + addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer()); + addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d18f6c252c1a..f86f91be18be 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import org.apache.druid.error.DruidException; -import org.apache.druid.frame.Frame; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -31,10 +30,8 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.segment.Segment; -import org.apache.druid.segment.column.RowSignature; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -100,19 +97,8 @@ public Sequence apply( @Override public RowsAndColumns apply(@Nullable RowsAndColumns input) { - // This is interim code to force a materialization by synthesizing the wire transfer - // that will need to naturally happen as we flesh out this code more. For now, we - // materialize the bytes on-heap and then read them back in as a frame. if (input instanceof LazilyDecoratedRowsAndColumns) { - final WireTransferable wire = WireTransferable.fromRAC(input); - final byte[] frameBytes = wire.bytesToTransfer(); - - RowSignature.Builder sigBob = RowSignature.builder(); - for (String column : input.getColumnNames()) { - sigBob.add(column, input.findColumn(column).toAccessor().getType()); - } - - return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + return input.as(FrameRowsAndColumns.class); } return input; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java index 61f6855cd01c..d83f56c7ba5f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java @@ -79,14 +79,4 @@ public Column findColumn(String name) } return retVal; } - - @Override - @SuppressWarnings("unchecked") - public T as(Class clazz) - { - if (AppendableRowsAndColumns.class.equals(clazz)) { - return (T) this; - } - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java index c6ced60849d4..3f70f82a2537 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java @@ -141,13 +141,6 @@ public Column findColumn(String name) } } - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - private class ConcatedidColumn implements Column { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java index 6fa74660f7df..46fda857516f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/CursorFactoryRowsAndColumns.java @@ -61,7 +61,7 @@ public T as(Class clazz) if (CursorFactory.class == clazz) { return (T) cursorFactory; } - return null; + return RowsAndColumns.super.as(clazz); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java index dd0c7dab1cda..56647e0f5687 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; @@ -44,11 +43,4 @@ public Column findColumn(String name) { return null; } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index a05b31dc2cb4..bb35f6837976 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -39,10 +39,10 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; @@ -150,16 +150,10 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() @SuppressWarnings("unused") @SemanticCreator - public WireTransferable toWireTransferable() + public FrameRowsAndColumns toFrameRowsAndColumns() { - return () -> { - final Pair materialized = materialize(); - if (materialized == null) { - return new byte[]{}; - } else { - return materialized.lhs; - } - }; + maybeMaterialize(); + return base.as(FrameRowsAndColumns.class); } private void maybeMaterialize() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java index abb3d4649b1a..8cfadecb4dd2 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java @@ -23,7 +23,6 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.LimitedColumn; -import javax.annotation.Nullable; import java.util.Collection; public class LimitedRowsAndColumns implements RowsAndColumns @@ -66,12 +65,4 @@ public Column findColumn(String name) return new LimitedColumn(column, start, end); } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java index 29f092f67440..d6bc1026a98d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java @@ -164,7 +164,7 @@ public T as(Class clazz) if (AppendableRowsAndColumns.class.equals(clazz)) { return (T) new AppendableMapOfColumns(this); } - return null; + return RowsAndColumns.super.as(clazz); } public static class Builder diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index f1793f8fd0e4..e64f086edd7f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum) ); } } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 7b6a1f6215d3..a34d0e463c07 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,12 +19,30 @@ package org.apache.druid.query.rowsandcols; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.channel.ByteTracker; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; import java.util.Collection; /** @@ -110,6 +128,72 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) * @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had * through a local implementation of the interface. */ + @SuppressWarnings("unchecked") @Nullable - T as(Class clazz); + default T as(Class clazz) + { + if (clazz.isInstance(this)) { + return (T) this; + } + return null; + } + + /** + * Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns} + */ + class RowsAndColumnsSerializer extends StdSerializer + { + public RowsAndColumnsSerializer() + { + super(RowsAndColumns.class); + } + + @Override + public void serialize( + RowsAndColumns rac, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class); + if (frameRAC == null) { + throw DruidException.defensive("Unable to serialize RAC"); + } + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); + + Frame frame = frameRAC.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker()); + + jsonGenerator.writeBinary(baos.toByteArray()); + } + } + + /** + * Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns} + */ + class RowsAndColumnsDeserializer extends StdDeserializer + { + public RowsAndColumnsDeserializer() + { + super(RowsAndColumns.class); + } + + @Override + public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + RowSignature sig = jsonParser.readValueAs(RowSignature.class); + jsonParser.nextValue(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jsonParser.readBinaryValue(baos); + Frame frame = Frame.wrap(baos.toByteArray()); + if (frame.type() == FrameType.COLUMNAR) { + return new ColumnBasedFrameRowsAndColumns(frame, sig); + } else { + return new RowBasedFrameRowsAndColumns(frame, sig); + } + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java new file mode 100644 index 000000000000..5295326c8622 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import com.google.common.base.Objects; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public abstract class AbstractFrameRowsAndColumns implements FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + final Frame frame; + final RowSignature signature; + final LinkedHashMap colCache = new LinkedHashMap<>(); + + public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = frame; + this.signature = signature; + } + + @Override + public Frame getFrame() + { + return frame; + } + + @Override + public RowSignature getSignature() + { + return signature; + } + + @Override + public Collection getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + if (CursorFactory.class.equals(clazz)) { + return (T) FrameReader.create(signature).makeCursorFactory(frame); + } + return FrameRowsAndColumns.super.as(clazz); + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFrameRowsAndColumns)) { + return false; + } + AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index e99a3f7f3139..c4a4577dc1af 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -19,44 +19,21 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.COLUMNAR.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.COLUMNAR.ensureType(frame), signature); } @Nullable @@ -71,28 +48,17 @@ public Column findColumn(String name) } else { final ColumnType columnType = signature .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + .orElseThrow( + () -> DruidException.defensive( + "just got the id [%s][%s], why is columnType not there?", + columnIndex, + name + ) + ); colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); } } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java similarity index 67% rename from processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java rename to processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index a7d55f599293..022a0f91ac16 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -17,21 +17,15 @@ * under the License. */ -package org.apache.druid.query.rowsandcols.semantic; +package org.apache.druid.query.rowsandcols.concrete; -import org.apache.druid.java.util.common.ISE; +import org.apache.druid.frame.Frame; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.segment.column.RowSignature; -public interface WireTransferable +public interface FrameRowsAndColumns extends RowsAndColumns { - static WireTransferable fromRAC(RowsAndColumns rac) - { - WireTransferable retVal = rac.as(WireTransferable.class); - if (retVal == null) { - throw new ISE("Rac[%s] cannot be transferred over the wire", rac.getClass()); - } - return retVal; - } + Frame getFrame(); - byte[] bytesToTransfer(); + RowSignature getSignature(); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 865a24e5d6da..c702c210775c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -24,40 +24,17 @@ import org.apache.druid.frame.FrameType; import org.apache.druid.frame.field.FieldReader; import org.apache.druid.frame.field.FieldReaders; -import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { - this.frame = FrameType.ROW_BASED.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); + super(FrameType.ROW_BASED.ensureType(frame), signature); } @Nullable @@ -86,21 +63,4 @@ public Column findColumn(String name) } return colCache.get(name); } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } - return null; - } - - @Override - public void close() - { - // nothing to close - } } diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 989d137770ee..92c8a2cb2989 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -22,12 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.query.Query; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assert; @@ -35,6 +41,8 @@ import java.util.Arrays; +import static org.junit.Assert.assertEquals; + /** * */ @@ -102,4 +110,22 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException } Assert.fail("We expect InvalidTypeIdException to be thrown"); } + + @Test + public void testColumnBasedFrameRowsAndColumns() throws Exception + { + DefaultObjectMapper om = new DefaultObjectMapper("test"); + + MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ))); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + byte[] bytes = om.writeValueAsBytes(frc); + + ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); + assertEquals(frc, frc2); + } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java index 422c87c8b7c6..16cd44e870ba 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.Collection; public class NoAsRowsAndColumns implements RowsAndColumns @@ -50,12 +49,4 @@ public Column findColumn(String name) { return rac.findColumn(name); } - - @Nullable - @Override - public T as(Class clazz) - { - // Pretend like this doesn't implement any semantic interfaces - return null; - } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index acfcbe6f83ed..f6a10e011464 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -37,7 +37,15 @@ public ColumnBasedFrameRowsAndColumnsTest() public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { - LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); + LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns( + input, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); rac.numRows(); // materialize return (ColumnBasedFrameRowsAndColumns) rac.getBase(); diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index f90c2ea19172..41295f480176 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.semantic; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -32,6 +33,9 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -214,6 +218,39 @@ public void testDecorationWithListOfResultRows() } } + @Test + public void testDecoratorWithColumnBasedFrameRAC() + { + RowSignature siggy = RowSignature.builder() + .add("colA", ColumnType.LONG) + .add("colB", ColumnType.LONG) + .build(); + + Object[][] vals = new Object[][]{ + {1L, 4L}, + {2L, -4L}, + {3L, 3L}, + {4L, -3L}, + {5L, 4L}, + {6L, 82L}, + {7L, -90L}, + {8L, 4L}, + {9L, 0L}, + {10L, 0L} + }; + + MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ) + ); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + + validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null); + } + private void validateDecorated( RowsAndColumns base, RowSignature siggy, diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 2477ac38dec1..d02d302437b8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -109,7 +109,6 @@ public boolean featureAvailable(EngineFeature feature) case ALLOW_TOP_LEVEL_UNION_ALL: case TIME_BOUNDARY_QUERY: case GROUPBY_IMPLICITLY_SORTS: - case WINDOW_LEAF_OPERATOR: return true; case CAN_INSERT: case CAN_REPLACE: @@ -117,6 +116,7 @@ public boolean featureAvailable(EngineFeature feature) case WRITE_EXTERNAL_DATA: case SCAN_ORDER_BY_NON_TIME: case SCAN_NEEDS_SIGNATURE: + case WINDOW_LEAF_OPERATOR: return false; default: throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index a8dcc35ea7ad..732681de238a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -16086,6 +16086,7 @@ public void testScanAndSortOnJoin() .run(); } + @NotYetSupported(Modes.UNSUPPORTED_DATASOURCE) @Test public void testWindowingOverJoin() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index 5850be0bd1c5..cd6aa514675f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -298,7 +298,7 @@ public void testFailure_partitionByMVD() ); assertEquals( - "Encountered a multi value column [v0]. Window processing does not support MVDs. " + "Encountered a multi value column. Window processing does not support MVDs. " + "Consider using UNNEST or MV_TO_ARRAY.", e.getMessage() ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index da1431f433d4..f0c48ff44f2f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -89,6 +89,7 @@ enum Modes RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"), LONG_CASTING(AssertionError.class, "expected: java.lang.Long"), UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"), + UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run on top of a query or inline data source"), UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"), UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"), JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"), diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index 87873d44c485..104cb0d2422d 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -2,7 +2,7 @@ type: "operatorValidation" sql: | SELECT - countryIsoCode, + countryIsoCode, CAST (FLOOR(__time TO HOUR) AS BIGINT) t, SUM(delta) delta, SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) ROWS BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta