From 4e61156bcd1a049962dabaacb7b3f25271a9d049 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Mon, 23 Oct 2023 08:53:37 +0000 Subject: [PATCH 01/22] existing stuff --- .../DruidDefaultSerializersModule.java | 12 +++++++ .../WindowOperatorQueryQueryToolChest.java | 14 +++++++- .../concrete/FrameRowsAndColumns.java | 32 ++++++++++++++++++- .../concrete/FrameRowsAndColumnsTest.java | 11 +++++++ .../semantic/RowsAndColumnsDecoratorTest.java | 31 ++++++++++++++++++ .../calcite/tests/window/simpleSum.sqlTest | 4 ++- 6 files changed, 101 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 30cc388f1d9d..6fa0c3056652 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -204,5 +204,17 @@ public void serialize( gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); } }); + + addDeserializer(RowsAndColumns.class, new JsonDeserializer() + { + @Override + public RowsAndColumns deserialize( + JsonParser p, + DeserializationContext ctxt + ) throws IOException + { + throw new RuntimeException("Unimplemented!"); + } + }); } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index bec529eedefa..849127dbe8a3 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -20,6 +20,7 @@ package org.apache.druid.query.operator; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableMap; @@ -36,6 +37,7 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -45,7 +47,6 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest { - @Override @SuppressWarnings("unchecked") public QueryRunner mergeResults(QueryRunner runner) @@ -196,4 +197,15 @@ public Sequence run( return baseQueryRunner.run(queryPlus, responseContext); } } + + @Override + public ObjectMapper decorateObjectMapper(ObjectMapper objectMapper, WindowOperatorQuery query) + { + ObjectMapper om = super.decorateObjectMapper(objectMapper, query).copy(); + + om.registerSubtypes(FrameRowsAndColumns.class); + + return om; + } + } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 59a97f12bf51..9bb1fe5cc0a3 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -19,6 +19,8 @@ package org.apache.druid.query.rowsandcols.concrete; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.read.FrameReader; @@ -28,6 +30,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -42,7 +45,10 @@ public class FrameRowsAndColumns implements RowsAndColumns private final RowSignature signature; private final LinkedHashMap colCache = new LinkedHashMap<>(); - public FrameRowsAndColumns(Frame frame, RowSignature signature) + @JsonCreator + public FrameRowsAndColumns( + @JsonProperty("frame") Frame frame, + @JsonProperty("signature") RowSignature signature) { this.frame = FrameType.COLUMNAR.ensureType(frame); this.signature = signature; @@ -54,6 +60,18 @@ public Collection getColumnNames() return signature.getColumnNames(); } + @JsonProperty("frame") + public Frame getFrame() + { + return frame; + } + + @JsonProperty("signature") + public RowSignature getSignature() + { + return signature; + } + @Override public int numRows() { @@ -89,6 +107,18 @@ public T as(Class clazz) if (StorageAdapter.class.equals(clazz)) { return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); } + if (WireTransferable.class.equals(clazz)) { + return (T) new WireTransferable() { + + @Override + public byte[] bytesToTransfer() + { + throw new RuntimeException(); + //return frame; + } + + }; + } return null; } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java index 837e30185e2e..e590dcef9789 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java @@ -44,4 +44,15 @@ private static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) return (FrameRowsAndColumns) rac.getBase(); } + +// @Test +// public void testSerde() throws Exception +// { +// final FrameRowsAndColumns f = +// final ObjectMapper objectMapper = TestHelper.makeJsonMapper(); +// Assert.assertEquals(row, objectMapper.readValue("[1, 2, 3]", ResultRow.class)); +// Assert.assertEquals(row, objectMapper.readValue(objectMapper.writeValueAsBytes(row), ResultRow.class)); +// } +// +// } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index 3426dd009463..8a4c466e250b 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,6 +19,8 @@ package org.apache.druid.query.rowsandcols.semantic; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -32,9 +34,11 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -240,4 +244,31 @@ private void validateDecorated( } } } + + @Test + public void asd() throws Exception { + RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ) + )); + +// @Test +// public void testSerde() throws Exception +// { +// FrameRowsAndColumns f = + ObjectMapper objectMapper = TestHelper.makeJsonMapper(); + byte[] arr = objectMapper.writeValueAsBytes(rac); + +// Assert.assertEquals(rac, objectMapper.readValue(arr, FrameRowsAndColumns.class)); +// Assert.assertEquals(rac, objectMapper.readValue(objectMapper.writeValueAsBytes(rac), ResultRow.class)); +// } + + + + + + } + } diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index d4affc6ec563..0f2d22b63df0 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -4,7 +4,9 @@ sql: | SELECT FLOOR(__time TO DAY) t, SUM(cnt) c, - SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc + SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc, + APPROX_COUNT_DISTINCT_DS_THETA(DS_THETA(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc2, + HLL_SKETCH_ESTIMATE(DS_HLL(dim1)) OVER (ORDER BY FLOOR(__time TO DAY)) cc3 FROM foo GROUP BY FLOOR(__time TO DAY) From 0a07180f6cbe1c1d69c6fc4261d86897c85ffbfe Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 24 Oct 2023 12:50:47 +0000 Subject: [PATCH 02/22] hint --- .../org/apache/druid/jackson/DruidDefaultSerializersModule.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 6fa0c3056652..6f0c84650767 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -213,6 +213,7 @@ public RowsAndColumns deserialize( DeserializationContext ctxt ) throws IOException { + // FIXME use SerializerUtils throw new RuntimeException("Unimplemented!"); } }); From 05b0d4d0dd4e116c074f612a453f9a819006e4aa Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Wed, 25 Oct 2023 16:31:52 +0000 Subject: [PATCH 03/22] yesterday stuff --- .../DruidDefaultSerializersModule.java | 42 ++++++++++++++++-- .../concrete/FrameRowsAndColumns.java | 43 +++++++++++++++---- .../semantic/WireTransferable.java | 4 ++ .../jackson/DefaultObjectMapperTest.java | 28 ++++++++++++ .../concrete/FrameRowsAndColumnsTest.java | 2 +- .../druid/server/QueryResultPusher.java | 6 ++- .../calcite/tests/window/simpleSum.sqlTest | 8 +--- 7 files changed, 112 insertions(+), 21 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 6f0c84650767..8aae7f905aa6 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -27,6 +27,9 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ByteTracker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; @@ -37,11 +40,15 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTimeZone; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.Channels; /** * @@ -201,7 +208,29 @@ public void serialize( // It would be really cool if jackson offered an output stream that would allow us to push bytes // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute // back to Jackson at some point. - gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); + + if(value instanceof FrameRowsAndColumns) { + + FrameRowsAndColumns frc = (FrameRowsAndColumns) value; + + gen.writeObject(frc.getSignature()); + + Frame frame = frc.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo( + Channels.newChannel(baos), + false, + ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), + ByteTracker.unboundedTracker() + ); + gen.writeBinary(baos.toByteArray()); +// byte[] b = new byte[] {1, 2, 3}; +// gen.writeBinary(b); + + } else { + throw DruidException.defensive("expected frame"); + } + } }); @@ -213,8 +242,13 @@ public RowsAndColumns deserialize( DeserializationContext ctxt ) throws IOException { - // FIXME use SerializerUtils - throw new RuntimeException("Unimplemented!"); + RowSignature sig = p.readValueAs(RowSignature.class); + p.nextValue(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); +// ByteArrayOutputStream baos2 = new ByteArrayOutputStream(); + p.readBinaryValue(baos); +// p.readBinaryValue(baos2); + return new FrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); } }); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 9bb1fe5cc0a3..05ef870bd59c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; import org.apache.druid.frame.read.FrameReader; @@ -36,6 +37,9 @@ import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; + +import java.io.InputStream; +import java.io.OutputStream; import java.util.Collection; import java.util.LinkedHashMap; @@ -108,17 +112,38 @@ public T as(Class clazz) return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); } if (WireTransferable.class.equals(clazz)) { - return (T) new WireTransferable() { + return (T) this; + } + return null; + } + + public void writeTo(OutputStream os) + { + throw new RuntimeException("Unimplemented!"); + } + + public static FrameRowsAndColumns readFrom(InputStream is) + { + throw new RuntimeException("Unimplemented!"); + } - @Override - public byte[] bytesToTransfer() - { - throw new RuntimeException(); - //return frame; - } + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } - }; + @Override + public boolean equals(Object o) + { + if(this == o ) { + return true; } - return null; + if (!(o instanceof FrameRowsAndColumns) || o == null) { + return false; + } + FrameRowsAndColumns otherFrame = (FrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java index a7d55f599293..2b1a099f03c9 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java @@ -34,4 +34,8 @@ static WireTransferable fromRAC(RowsAndColumns rac) } byte[] bytesToTransfer(); + +// +// void writeTo(OutputStream os); +// Object readFrom(InputStream is); } diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 989d137770ee..c2e1172afe70 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -22,12 +22,18 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.exc.InvalidTypeIdException; +import com.google.common.collect.ImmutableMap; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.java.util.common.guava.Yielders; import org.apache.druid.query.Query; +import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumnsTest; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.junit.Assert; @@ -35,6 +41,8 @@ import java.util.Arrays; +import static org.junit.Assert.assertEquals; + /** * */ @@ -102,4 +110,24 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException } Assert.fail("We expect InvalidTypeIdException to be thrown"); } + + @Test + public void testRowsAndColumns() throws Exception + { + DefaultObjectMapper om = new DefaultObjectMapper("test"); + + MapOfColumnsRowsAndColumns input = (MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[] {4, -4, 3, -3, 4, 82, -90, 4, 0, 0})))); + + FrameRowsAndColumns frame = FrameRowsAndColumnsTest.buildFrame(input); + byte[] bytes = om.writeValueAsBytes(frame); + + System.out.println(new String(bytes)); + FrameRowsAndColumns frame2 = (FrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); + + assertEquals(frame, frame2); + } + } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java index e590dcef9789..19aa67321bc4 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumnsTest.java @@ -36,7 +36,7 @@ public FrameRowsAndColumnsTest() return buildFrame(input); }; - private static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) + public static FrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, Integer.MAX_VALUE, null, null); diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index 074beb545b43..df319aa57c2c 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -24,6 +24,8 @@ import com.google.common.io.CountingOutputStream; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidException.Category; +import org.apache.druid.error.DruidException.Persona; import org.apache.druid.error.ErrorResponse; import org.apache.druid.error.QueryExceptionCompat; import org.apache.druid.java.util.common.ISE; @@ -450,7 +452,9 @@ public Response accumulate(Response retVal, Object in) } catch (IOException ex) { QueryResource.NO_STACK_LOGGER.warn(ex, "Unable to write query response."); - throw new RuntimeException(ex); + throw DruidException.forPersona(Persona.DEVELOPER) + .ofCategory(Category.UNCATEGORIZED) + .build(ex, "Unable to write query response."); } return null; } diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index 0f2d22b63df0..4a860d997336 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -3,12 +3,8 @@ type: "operatorValidation" sql: | SELECT FLOOR(__time TO DAY) t, - SUM(cnt) c, - SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc, - APPROX_COUNT_DISTINCT_DS_THETA(DS_THETA(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc2, - HLL_SKETCH_ESTIMATE(DS_HLL(dim1)) OVER (ORDER BY FLOOR(__time TO DAY)) cc3 - FROM foo - GROUP BY FLOOR(__time TO DAY) + COUNT(1) OVER () + FROM ( select * from foo offset 3 ) f expectedOperators: - { type: "naivePartition", partitionColumns: [ ] } From e9342cb3e9ae2b3be4972ec159bcc1b0ae0a752d Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Tue, 7 Nov 2023 12:49:22 +0000 Subject: [PATCH 04/22] some remaining changes --- .../window/value/WindowOffsetProcessor.java | 3 ++- .../druid/sql/calcite/rel/Windowing.java | 17 ++++++++++-- .../calcite/tests/window/simpleSum.sqlTest | 26 +++++++++++++++++-- .../wikipediaFramedAggregations.sqlTest | 2 +- 4 files changed, 42 insertions(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowOffsetProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowOffsetProcessor.java index ffd44e33fe5a..39301f15e5ce 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowOffsetProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowOffsetProcessor.java @@ -33,7 +33,8 @@ public class WindowOffsetProcessor extends WindowValueProcessorBase public WindowOffsetProcessor( @JsonProperty("inputColumn") String inputColumn, @JsonProperty("outputColumn") String outputColumn, - @JsonProperty("offset") int offset + @JsonProperty("offset") int offset, + @JsonProperty("offset") DruidExpression defaultExpression ) { super(inputColumn, outputColumn); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java index 9f8d3d091b5d..ec6d2270ea15 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/Windowing.java @@ -32,6 +32,7 @@ import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.rex.RexUtil; import org.apache.calcite.rex.RexWindowBound; import org.apache.calcite.util.mapping.Mappings; import org.apache.druid.java.util.common.ISE; @@ -88,9 +89,9 @@ public class Windowing private static final ImmutableMap KNOWN_WINDOW_FNS = ImmutableMap .builder() .put("LAG", (agg) -> - new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), -agg.getConstantInt(1, 1))) + new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), -agg.getConstantInt(1, 1), agg.getConstantArgument(2, null))) .put("LEAD", (agg) -> - new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), agg.getConstantInt(1, 1))) + new WindowOffsetProcessor(agg.getColumn(0), agg.getOutputName(), agg.getConstantInt(1, 1), agg.getConstantArgument(2, null))) .put("FIRST_VALUE", (agg) -> new WindowFirstProcessor(agg.getColumn(0), agg.getOutputName())) .put("LAST_VALUE", (agg) -> @@ -440,6 +441,18 @@ public RexLiteral getConstantArgument(int argPosition) return constants.get(call.getArgList().get(argPosition) - sig.size()); } + public DruidExpression getConstantArgument(int argPosition, RexLiteral defaultValue) + { + final RexLiteral arg; + if (argPosition >= call.getArgList().size()) { + arg = defaultValue; + } else { + arg = getConstantArgument(argPosition); + } + DruidExpression expr = Expressions.toDruidExpression(context, sig, arg); + return expr; + } + public int getConstantInt(int argPosition) { return ((Number) getConstantArgument(argPosition).getValue()).intValue(); diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index 4a860d997336..f077855eb268 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -3,8 +3,14 @@ type: "operatorValidation" sql: | SELECT FLOOR(__time TO DAY) t, - COUNT(1) OVER () - FROM ( select * from foo offset 3 ) f + cnt, + FLOOR(m1/3), + COUNT(1) OVER (ORDER BY FLOOR(m1/3) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), --ok + COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), -- bad + LAST_VALUE(cnt) OVER (ORDER BY __time), -- last value of the window + LAG(cnt) OVER (), -- ok + LAG(cnt,1,999) OVER () -- default value ignored + FROM foo expectedOperators: - { type: "naivePartition", partitionColumns: [ ] } @@ -22,3 +28,19 @@ expectedResults: - [ 978307200000, 1, 4 ] - [ 978393600000, 1, 5 ] - [ 978480000000, 1, 6 ] + + + '{"type":"inline","data":" + channel,time,delta\n + \"a\",\"2023-10-01 00:00:00\",5\n + \"a\",\"2023-10-01 00:00:00\",5\n + \"a\",\"2023-10-01 00:00:00\",5\n + \"a\",\"2023-10-01 00:00:01\",10\n + \"a\",\"2023-10-01 00:00:02\",10\n + \"b\",\"2023-10-01 00:00:00\",5\n + \"b\",\"2023-10-01 00:00:01\",5\n + \"b\",\"2023-10-01 00:00:02\",5\n + \"b\",\"2023-10-01 00:00:03\",10\n + \"b\",\"2023-10-01 00:00:04\",10\n + "}', + '{"type":"csv","findColumnsFromHeader":true}' \ No newline at end of file diff --git a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest index 0644c9a354d7..941626eaf991 100644 --- a/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/wikipediaFramedAggregations.sqlTest @@ -2,7 +2,7 @@ type: "operatorValidation" sql: | SELECT - countryIsoCode, + countryIsoCode, CAST (FLOOR(__time TO HOUR) AS BIGINT) t, SUM(delta) delta, SUM(SUM(delta)) OVER (PARTITION BY countryIsoCode ORDER BY CAST (FLOOR(__time TO HOUR) AS BIGINT) RANGE BETWEEN 3 PRECEDING AND 2 FOLLOWING) windowedDelta From b9156756eb88e4e6ef90a270c27621f2df45a8df Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 26 Jun 2024 11:00:15 +0530 Subject: [PATCH 05/22] refactor --- .../calcite/tests/window/simpleSum.sqlTest | 28 +++---------------- 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index 79f6310d09d3..150c9225ee6e 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -3,14 +3,10 @@ type: "operatorValidation" sql: | SELECT FLOOR(__time TO DAY) t, - cnt, - FLOOR(m1/3), - COUNT(1) OVER (ORDER BY FLOOR(m1/3) ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), --ok - COUNT(1) OVER (ORDER BY FLOOR(m1/3) RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), -- bad - LAST_VALUE(cnt) OVER (ORDER BY __time), -- last value of the window - LAG(cnt) OVER (), -- ok - LAG(cnt,1,999) OVER () -- default value ignored + SUM(cnt) c, + SUM(SUM(cnt)) OVER (ORDER BY FLOOR(__time TO DAY)) cc FROM foo + GROUP BY FLOOR(__time TO DAY) expectedOperators: - { type: "naivePartition", partitionColumns: [ ] } @@ -33,20 +29,4 @@ expectedResults: - [ 946857600000, 1, 3 ] - [ 978307200000, 1, 4 ] - [ 978393600000, 1, 5 ] - - [ 978480000000, 1, 6 ] - - - '{"type":"inline","data":" - channel,time,delta\n - \"a\",\"2023-10-01 00:00:00\",5\n - \"a\",\"2023-10-01 00:00:00\",5\n - \"a\",\"2023-10-01 00:00:00\",5\n - \"a\",\"2023-10-01 00:00:01\",10\n - \"a\",\"2023-10-01 00:00:02\",10\n - \"b\",\"2023-10-01 00:00:00\",5\n - \"b\",\"2023-10-01 00:00:01\",5\n - \"b\",\"2023-10-01 00:00:02\",5\n - \"b\",\"2023-10-01 00:00:03\",10\n - \"b\",\"2023-10-01 00:00:04\",10\n - "}', - '{"type":"csv","findColumnsFromHeader":true}' \ No newline at end of file + - [ 978480000000, 1, 6 ] \ No newline at end of file From 67b2e44ec38dbb9f7a9b8e2980d29ff83ec0d0b9 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 26 Jun 2024 11:16:20 +0530 Subject: [PATCH 06/22] checkstyle --- .../DruidDefaultSerializersModule.java | 3 +-- .../ColumnBasedFrameRowsAndColumns.java | 2 -- .../concrete/RowBasedFrameRowsAndColumns.java | 2 -- .../semantic/RowsAndColumnsDecoratorTest.java | 26 ------------------- 4 files changed, 1 insertion(+), 32 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index e9c99e8c8c74..519cd88134dc 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -41,7 +41,6 @@ import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTimeZone; @@ -210,7 +209,7 @@ public void serialize( // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute // back to Jackson at some point. - if(value instanceof ColumnBasedFrameRowsAndColumns) { + if (value instanceof ColumnBasedFrameRowsAndColumns) { ColumnBasedFrameRowsAndColumns frc = (ColumnBasedFrameRowsAndColumns) value; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index b145adf6e157..32056051cb9d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -38,8 +38,6 @@ import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Collection; import java.util.LinkedHashMap; diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index e3c45d443dfa..1f4644730ea4 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -39,8 +39,6 @@ import javax.annotation.Nullable; -import java.io.InputStream; -import java.io.OutputStream; import java.util.Collection; import java.util.LinkedHashMap; diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index 7972f73386be..65bfa7eddfcd 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,8 +19,6 @@ package org.apache.druid.query.rowsandcols.semantic; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -36,11 +34,9 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; -import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; -import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -332,26 +328,4 @@ private void validateDecorated( } } - @Test - public void asd() throws Exception { - RowsAndColumns rac = make(MapOfColumnsRowsAndColumns.fromMap( - ImmutableMap.of( - "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), - "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) - ) - )); - -// @Test -// public void testSerde() throws Exception -// { -// FrameRowsAndColumns f = - ObjectMapper objectMapper = TestHelper.makeJsonMapper(); - byte[] arr = objectMapper.writeValueAsBytes(rac); - -// Assert.assertEquals(rac, objectMapper.readValue(arr, FrameRowsAndColumns.class)); -// Assert.assertEquals(rac, objectMapper.readValue(objectMapper.writeValueAsBytes(rac), ResultRow.class)); -// } - - } - } From 94aa07e1ab9e8cf2044924651f06ff4655b6a4d8 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 26 Jun 2024 11:38:23 +0530 Subject: [PATCH 07/22] refactor --- .../org/apache/druid/jackson/DruidDefaultSerializersModule.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 519cd88134dc..816fe77761cb 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -213,7 +213,7 @@ public void serialize( ColumnBasedFrameRowsAndColumns frc = (ColumnBasedFrameRowsAndColumns) value; - gen.writeObject(frc.getSignature()); + JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); Frame frame = frc.getFrame(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); From f6863a8f63a2db7457fd3b057e08511fa01626d4 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 26 Jun 2024 13:55:41 +0530 Subject: [PATCH 08/22] test & refactor --- .../DruidDefaultSerializersModule.java | 38 ++++++++++++------- .../WindowOperatorQueryQueryToolChest.java | 2 + .../jackson/DefaultObjectMapperTest.java | 12 +++--- .../semantic/RowsAndColumnsDecoratorTest.java | 37 ++++++++++++++++++ .../druid/server/QueryResultPusher.java | 6 +-- 5 files changed, 70 insertions(+), 25 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 816fe77761cb..89e6da14295b 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; import org.apache.druid.frame.channel.ByteTracker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Accumulator; @@ -41,6 +42,7 @@ import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTimeZone; @@ -212,25 +214,32 @@ public void serialize( if (value instanceof ColumnBasedFrameRowsAndColumns) { ColumnBasedFrameRowsAndColumns frc = (ColumnBasedFrameRowsAndColumns) value; - JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); - Frame frame = frc.getFrame(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - frame.writeTo( - Channels.newChannel(baos), - false, - ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), - ByteTracker.unboundedTracker() - ); - - gen.writeBinary(baos.toByteArray()); + this.writeFrameToGenerator(frc.getFrame(), gen); + } else if (value instanceof RowBasedFrameRowsAndColumns) { + RowBasedFrameRowsAndColumns frc = (RowBasedFrameRowsAndColumns) value; + JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); + this.writeFrameToGenerator(frc.getFrame(), gen); } else { - throw DruidException.defensive("expected frame"); + throw DruidException.defensive("expected frame RowsAndColumns"); } } + + private void writeFrameToGenerator(Frame frame, JsonGenerator generator) throws IOException + { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo( + Channels.newChannel(baos), + false, + ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), + ByteTracker.unboundedTracker() + ); + + generator.writeBinary(baos.toByteArray()); + } }); addDeserializer(RowsAndColumns.class, new JsonDeserializer() @@ -245,7 +254,10 @@ public RowsAndColumns deserialize( p.nextValue(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); p.readBinaryValue(baos); - return new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); + Frame frame = Frame.wrap(baos.toByteArray()); + return (frame.type() == FrameType.COLUMNAR) + ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) + : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); } }); } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 883ea07ffdec..49b2f3453f71 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -38,6 +38,7 @@ import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -204,6 +205,7 @@ public ObjectMapper decorateObjectMapper(ObjectMapper objectMapper, WindowOperat ObjectMapper om = super.decorateObjectMapper(objectMapper, query).copy(); om.registerSubtypes(ColumnBasedFrameRowsAndColumns.class); + om.registerSubtypes(RowBasedFrameRowsAndColumns.class); return om; } diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 51079cf1b4b3..4f8f7b3ddbdd 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -112,7 +112,7 @@ public void testUnknownTypeWithUnknownService() throws JsonProcessingException } @Test - public void testRowsAndColumns() throws Exception + public void testColumnBasedFrameRowsAndColumns() throws Exception { DefaultObjectMapper om = new DefaultObjectMapper("test"); @@ -122,13 +122,11 @@ public void testRowsAndColumns() throws Exception "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) ))); - ColumnBasedFrameRowsAndColumns frame = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); - byte[] bytes = om.writeValueAsBytes(frame); + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + byte[] bytes = om.writeValueAsBytes(frc); - System.out.println(new String(bytes)); - ColumnBasedFrameRowsAndColumns frame2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); - - assertEquals(frame, frame2); + ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); + assertEquals(frc, frc2); } } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index 65bfa7eddfcd..25ba69856689 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.semantic; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; @@ -34,6 +35,9 @@ import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumnsTest; import org.apache.druid.segment.ArrayListSegment; import org.apache.druid.segment.ColumnValueSelector; import org.apache.druid.segment.Cursor; @@ -215,6 +219,39 @@ public void testDecorationWithListOfResultRows() } } + @Test + public void asd() throws Exception + { + RowSignature siggy = RowSignature.builder() + .add("colA", ColumnType.LONG) + .add("colB", ColumnType.LONG) + .build(); + + Object[][] vals = new Object[][]{ + {1L, 4L}, + {2L, -4L}, + {3L, 3L}, + {4L, -3L}, + {5L, 4L}, + {6L, 82L}, + {7L, -90L}, + {8L, 4L}, + {9L, 0L}, + {10L, 0L} + }; + + MapOfColumnsRowsAndColumns input = MapOfColumnsRowsAndColumns.fromMap( + ImmutableMap.of( + "colA", new IntArrayColumn(new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}), + "colB", new IntArrayColumn(new int[]{4, -4, 3, -3, 4, 82, -90, 4, 0, 0}) + ) + ); + + ColumnBasedFrameRowsAndColumns frc = ColumnBasedFrameRowsAndColumnsTest.buildFrame(input); + + validateDecorated(frc, siggy, vals, null, null, OffsetLimit.NONE, null); + } + private void validateDecorated( RowsAndColumns base, RowSignature siggy, diff --git a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java index df319aa57c2c..074beb545b43 100644 --- a/server/src/main/java/org/apache/druid/server/QueryResultPusher.java +++ b/server/src/main/java/org/apache/druid/server/QueryResultPusher.java @@ -24,8 +24,6 @@ import com.google.common.io.CountingOutputStream; import org.apache.druid.client.DirectDruidClient; import org.apache.druid.error.DruidException; -import org.apache.druid.error.DruidException.Category; -import org.apache.druid.error.DruidException.Persona; import org.apache.druid.error.ErrorResponse; import org.apache.druid.error.QueryExceptionCompat; import org.apache.druid.java.util.common.ISE; @@ -452,9 +450,7 @@ public Response accumulate(Response retVal, Object in) } catch (IOException ex) { QueryResource.NO_STACK_LOGGER.warn(ex, "Unable to write query response."); - throw DruidException.forPersona(Persona.DEVELOPER) - .ofCategory(Category.UNCATEGORIZED) - .build(ex, "Unable to write query response."); + throw new RuntimeException(ex); } return null; } From c70b7e400e12ab924c69e1a0e35f38fd8d6d5830 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 26 Jun 2024 15:15:02 +0530 Subject: [PATCH 09/22] fix inspection --- .../query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index 25ba69856689..44ffe928997a 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -220,7 +220,7 @@ public void testDecorationWithListOfResultRows() } @Test - public void asd() throws Exception + public void testDecoratorWithColumnBasedFrameRAC() { RowSignature siggy = RowSignature.builder() .add("colA", ColumnType.LONG) From 0cd28bf87e440084e8a5c65cb4c01e7484369bda Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Fri, 5 Jul 2024 11:51:31 +0530 Subject: [PATCH 10/22] FramedRowsAndColumns --- .../DruidDefaultSerializersModule.java | 52 +++---- .../ColumnBasedFrameRowsAndColumns.java | 94 +----------- .../concrete/FrameRowsAndColumns.java | 142 ++++++++++++++++++ .../concrete/RowBasedFrameRowsAndColumns.java | 100 +----------- 4 files changed, 174 insertions(+), 214 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 89e6da14295b..5767a269a7ed 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -42,7 +42,9 @@ import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTimeZone; @@ -210,35 +212,27 @@ public void serialize( // It would be really cool if jackson offered an output stream that would allow us to push bytes // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute // back to Jackson at some point. - - if (value instanceof ColumnBasedFrameRowsAndColumns) { - - ColumnBasedFrameRowsAndColumns frc = (ColumnBasedFrameRowsAndColumns) value; - JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); - - this.writeFrameToGenerator(frc.getFrame(), gen); - } else if (value instanceof RowBasedFrameRowsAndColumns) { - RowBasedFrameRowsAndColumns frc = (RowBasedFrameRowsAndColumns) value; - JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); - - this.writeFrameToGenerator(frc.getFrame(), gen); - } else { - throw DruidException.defensive("expected frame RowsAndColumns"); + if (value instanceof FrameRowsAndColumns) { + gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); } - - } - - private void writeFrameToGenerator(Frame frame, JsonGenerator generator) throws IOException - { - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - frame.writeTo( - Channels.newChannel(baos), - false, - ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), - ByteTracker.unboundedTracker() - ); - - generator.writeBinary(baos.toByteArray()); +// if (value instanceof FrameRowsAndColumns) { +// +// FrameRowsAndColumns frc = (FrameRowsAndColumns) value; +// JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); +// +// Frame frame = frc.getFrame(); +// final ByteArrayOutputStream baos = new ByteArrayOutputStream(); +// frame.writeTo( +// Channels.newChannel(baos), +// false, +// ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), +// ByteTracker.unboundedTracker() +// ); +// +// gen.writeBinary(baos.toByteArray()); +// } else { +// throw DruidException.defensive("expected frame RowsAndColumns"); +// } } }); @@ -250,6 +244,8 @@ public RowsAndColumns deserialize( DeserializationContext ctxt ) throws IOException { + // WireTransferable.deserialzied(RowsAndColumns.class, p, ctxt); + RowSignature sig = p.readValueAs(RowSignature.class); p.nextValue(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index 32056051cb9d..350a8fa80610 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -41,103 +41,13 @@ import java.util.Collection; import java.util.LinkedHashMap; -public class ColumnBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class ColumnBasedFrameRowsAndColumns extends FrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - @JsonCreator public ColumnBasedFrameRowsAndColumns( @JsonProperty("frame") Frame frame, @JsonProperty("signature") RowSignature signature) { - this.frame = FrameType.COLUMNAR.ensureType(frame); - this.signature = signature; - } - - @JsonProperty("frame") - public Frame getFrame() - { - return frame; - } - - @JsonProperty("signature") - public RowSignature getSignature() - { - return signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); - } - - @Nullable - @Override - public Column findColumn(String name) - { - // Use contains so that we can negative cache. - if (!colCache.containsKey(name)) { - final int columnIndex = signature.indexOf(name); - if (columnIndex < 0) { - colCache.put(name, null); - } else { - final ColumnType columnType = signature - .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); - - colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); - } - } - return colCache.get(name); - - } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (StorageAdapter.class.equals(clazz)) { - return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); - } - if (WireTransferable.class.equals(clazz)) { - return (T) this; - } - return null; - } - - @Override - public void close() - { - // nothing to close - } - - @Override - public int hashCode() - { - return Objects.hashCode(frame, signature); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (!(o instanceof ColumnBasedFrameRowsAndColumns)) { - return false; - } - ColumnBasedFrameRowsAndColumns otherFrame = (ColumnBasedFrameRowsAndColumns) o; - - return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + super(FrameType.COLUMNAR.ensureType(frame), signature); } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java new file mode 100644 index 000000000000..7046527e1c37 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Objects; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.read.columnar.FrameColumnReaders; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.StorageAdapter; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public abstract class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + final Frame frame; + final RowSignature signature; + final LinkedHashMap colCache = new LinkedHashMap<>(); + + public FrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = frame; + this.signature = signature; + } + + public Frame getFrame() + { + return frame; + } + + public RowSignature getSignature() + { + return signature; + } + + @Override + public Collection getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @Nullable + @Override + public Column findColumn(String name) + { + // Use contains so that we can negative cache. + if (!colCache.containsKey(name)) { + final int columnIndex = signature.indexOf(name); + if (columnIndex < 0) { + colCache.put(name, null); + } else { + final ColumnType columnType = signature + .getColumnType(columnIndex) + .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); + + colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); + } + } + return colCache.get(name); + } + + public boolean isColumnBasedFrame() + { + return frame.type() == FrameType.COLUMNAR; + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + if (StorageAdapter.class.equals(clazz)) { + return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); + } + if (WireTransferable.class.equals(clazz)) { + return (T) this; + } + return null; + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof FrameRowsAndColumns)) { + return false; + } + FrameRowsAndColumns otherFrame = (FrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 1f4644730ea4..03fe932325c2 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -42,101 +42,13 @@ import java.util.Collection; import java.util.LinkedHashMap; -public class RowBasedFrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public class RowBasedFrameRowsAndColumns extends FrameRowsAndColumns { - private final Frame frame; - private final RowSignature signature; - private final LinkedHashMap colCache = new LinkedHashMap<>(); - - @JsonCreator - public RowBasedFrameRowsAndColumns( - @JsonProperty("frame") Frame frame, - @JsonProperty("signature") RowSignature signature) - { - this.frame = FrameType.ROW_BASED.ensureType(frame); - this.signature = signature; - } - - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @JsonProperty("frame") - public Frame getFrame() - { - return frame; - } - - @JsonProperty("signature") - public RowSignature getSignature() - { - return signature; - } - - @Override - public int numRows() - { - return frame.numRows(); - } - - @Nullable - @Override - public Column findColumn(String name) - { - if (!colCache.containsKey(name)) { - final int columnIndex = signature.indexOf(name); - if (columnIndex < 0) { - colCache.put(name, null); - } else { - final ColumnType columnType = signature - .getColumnType(columnIndex) - .orElseThrow(() -> new ISE("just got the id, why is columnType not there?")); - - colCache.put(name, FrameColumnReaders.create(name, columnIndex, columnType).readRACColumn(frame)); - } - } - return colCache.get(name); - } - - @SuppressWarnings("unchecked") - @Nullable - @Override - public T as(Class clazz) - { - if (StorageAdapter.class.equals(clazz)) { - return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); - } - if (WireTransferable.class.equals(clazz)) { - return (T) this; - } - return null; - } - - @Override - public void close() + //@JsonCreator + public RowBasedFrameRowsAndColumns( Frame frame, RowSignature signature) + //@JsonProperty("frame") Frame frame, + //@JsonProperty("signature") RowSignature signature) { - // nothing to close - } - - @Override - public int hashCode() - { - return Objects.hashCode(frame, signature); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (!(o instanceof RowBasedFrameRowsAndColumns)) { - return false; - } - RowBasedFrameRowsAndColumns otherFrame = (RowBasedFrameRowsAndColumns) o; - - return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + super(FrameType.ROW_BASED.ensureType(frame), signature); } } From 6c42fd03cf6a3d82afddc559f71848a8cab03167 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 24 Jul 2024 19:48:58 +0530 Subject: [PATCH 11/22] Frame RAC serializers --- .../DruidDefaultSerializersModule.java | 58 +------------- .../concrete/FrameRowsAndColumns.java | 75 ++++++++++++++++--- 2 files changed, 66 insertions(+), 67 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index 5767a269a7ed..a0e2406b1c33 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -200,61 +200,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws ); addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); - addSerializer(RowsAndColumns.class, new JsonSerializer() - { - @Override - public void serialize( - RowsAndColumns value, - JsonGenerator gen, - SerializerProvider serializers - ) throws IOException - { - // It would be really cool if jackson offered an output stream that would allow us to push bytes - // through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute - // back to Jackson at some point. - if (value instanceof FrameRowsAndColumns) { - gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer()); - } -// if (value instanceof FrameRowsAndColumns) { -// -// FrameRowsAndColumns frc = (FrameRowsAndColumns) value; -// JacksonUtils.writeObjectUsingSerializerProvider(gen, serializers, frc.getSignature()); -// -// Frame frame = frc.getFrame(); -// final ByteArrayOutputStream baos = new ByteArrayOutputStream(); -// frame.writeTo( -// Channels.newChannel(baos), -// false, -// ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), -// ByteTracker.unboundedTracker() -// ); -// -// gen.writeBinary(baos.toByteArray()); -// } else { -// throw DruidException.defensive("expected frame RowsAndColumns"); -// } - } - }); - - addDeserializer(RowsAndColumns.class, new JsonDeserializer() - { - @Override - public RowsAndColumns deserialize( - JsonParser p, - DeserializationContext ctxt - ) throws IOException - { - // WireTransferable.deserialzied(RowsAndColumns.class, p, ctxt); - - RowSignature sig = p.readValueAs(RowSignature.class); - p.nextValue(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - p.readBinaryValue(baos); - Frame frame = Frame.wrap(baos.toByteArray()); - return (frame.type() == FrameType.COLUMNAR) - ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) - : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); - } - }); + addSerializer(FrameRowsAndColumns.class, new FrameRowsAndColumns.FrameRACSerializer()); + addDeserializer(FrameRowsAndColumns.class, new FrameRowsAndColumns.FrameRACDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 7046527e1c37..68545f3ab5f0 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -19,25 +19,34 @@ package org.apache.druid.query.rowsandcols.concrete; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import com.google.common.base.Objects; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.channel.ByteTracker; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; import org.apache.druid.frame.segment.FrameStorageAdapter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.util.Collection; import java.util.LinkedHashMap; @@ -95,11 +104,6 @@ public Column findColumn(String name) return colCache.get(name); } - public boolean isColumnBasedFrame() - { - return frame.type() == FrameType.COLUMNAR; - } - @SuppressWarnings("unchecked") @Nullable @Override @@ -108,9 +112,6 @@ public T as(Class clazz) if (StorageAdapter.class.equals(clazz)) { return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); } - if (WireTransferable.class.equals(clazz)) { - return (T) this; - } return null; } @@ -139,4 +140,56 @@ public boolean equals(Object o) return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); } + + public static class FrameRACSerializer extends StdSerializer + { + public FrameRACSerializer() + { + super(FrameRowsAndColumns.class); + } + + @Override + public void serialize( + FrameRowsAndColumns frameRAC, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); + + Frame frame = frameRAC.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo( + Channels.newChannel(baos), + false, + ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), + ByteTracker.unboundedTracker() + ); + + jsonGenerator.writeBinary(baos.toByteArray()); + } + } + + public static class FrameRACDeserializer extends StdDeserializer + { + public FrameRACDeserializer() + { + super(FrameRowsAndColumns.class); + } + + @Override + public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + RowSignature sig = jsonParser.readValueAs(RowSignature.class); + jsonParser.nextValue(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jsonParser.readBinaryValue(baos); + Frame frame = Frame.wrap(baos.toByteArray()); + return (frame.type() == FrameType.COLUMNAR) + ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) + : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); + } + } } From 9c6cecb629aca1fff1ab484f654f3f4012842868 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Wed, 24 Jul 2024 23:10:01 +0530 Subject: [PATCH 12/22] refactor --- .../DruidDefaultSerializersModule.java | 16 +--- .../WindowOperatorQueryQueryToolChest.java | 15 ---- .../query/rowsandcols/RowsAndColumns.java | 80 +++++++++++++++++++ .../ColumnBasedFrameRowsAndColumns.java | 23 +----- .../concrete/FrameRowsAndColumns.java | 68 +--------------- .../concrete/RowBasedFrameRowsAndColumns.java | 24 +----- 6 files changed, 87 insertions(+), 139 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java index a0e2406b1c33..cad8fdfd8315 100644 --- a/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java +++ b/processing/src/main/java/org/apache/druid/jackson/DruidDefaultSerializersModule.java @@ -27,10 +27,6 @@ import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; -import org.apache.druid.error.DruidException; -import org.apache.druid.frame.Frame; -import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.channel.ByteTracker; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.guava.Accumulator; import org.apache.druid.java.util.common.guava.Sequence; @@ -41,18 +37,10 @@ import org.apache.druid.query.context.ResponseContext; import org.apache.druid.query.context.ResponseContextDeserializer; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; -import org.apache.druid.segment.column.RowSignature; import org.joda.time.DateTimeZone; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.channels.Channels; /** * @@ -200,7 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws ); addDeserializer(ResponseContext.class, new ResponseContextDeserializer()); - addSerializer(FrameRowsAndColumns.class, new FrameRowsAndColumns.FrameRACSerializer()); - addDeserializer(FrameRowsAndColumns.class, new FrameRowsAndColumns.FrameRACDeserializer()); + addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer()); + addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer()); } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 49b2f3453f71..60c618fa7285 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -20,7 +20,6 @@ package org.apache.druid.query.operator; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Functions; import com.google.common.collect.ImmutableMap; @@ -37,8 +36,6 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.column.NullColumn; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; @@ -198,16 +195,4 @@ public Sequence run( return baseQueryRunner.run(queryPlus, responseContext); } } - - @Override - public ObjectMapper decorateObjectMapper(ObjectMapper objectMapper, WindowOperatorQuery query) - { - ObjectMapper om = super.decorateObjectMapper(objectMapper, query).copy(); - - om.registerSubtypes(ColumnBasedFrameRowsAndColumns.class); - om.registerSubtypes(RowBasedFrameRowsAndColumns.class); - - return om; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index d139265d147d..5981ae7d37eb 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,15 +19,33 @@ package org.apache.druid.query.rowsandcols; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.channel.ByteTracker; +import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; +import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -143,4 +161,66 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) */ @Nullable T as(Class clazz); + + /** + * Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns} + */ + class RowsAndColumnsSerializer extends StdSerializer + { + public RowsAndColumnsSerializer() + { + super(RowsAndColumns.class); + } + + @Override + public void serialize( + RowsAndColumns rac, + JsonGenerator jsonGenerator, + SerializerProvider serializerProvider + ) throws IOException + { + FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class); + if (frameRAC == null) { + throw DruidException.defensive("Unable to serialize RAC"); + } + JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); + + Frame frame = frameRAC.getFrame(); + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + frame.writeTo( + Channels.newChannel(baos), + false, + ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), + ByteTracker.unboundedTracker() + ); + + jsonGenerator.writeBinary(baos.toByteArray()); + } + } + + /** + * Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns} + */ + class RowsAndColumnsDeserializer extends StdDeserializer + { + public RowsAndColumnsDeserializer() + { + super(RowsAndColumns.class); + } + + @Override + public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) + throws IOException + { + RowSignature sig = jsonParser.readValueAs(RowSignature.class); + jsonParser.nextValue(); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + jsonParser.readBinaryValue(baos); + Frame frame = Frame.wrap(baos.toByteArray()); + return (frame.type() == FrameType.COLUMNAR) + ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) + : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index 350a8fa80610..79039dc291fd 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -19,34 +19,13 @@ package org.apache.druid.query.rowsandcols.concrete; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.frame.segment.FrameStorageAdapter; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; - public class ColumnBasedFrameRowsAndColumns extends FrameRowsAndColumns { - @JsonCreator - public ColumnBasedFrameRowsAndColumns( - @JsonProperty("frame") Frame frame, - @JsonProperty("signature") RowSignature signature) + public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { super(FrameType.COLUMNAR.ensureType(frame), signature); } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 68545f3ab5f0..57ae6d05106c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -19,22 +19,13 @@ package org.apache.druid.query.rowsandcols.concrete; -import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.SerializerProvider; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import com.fasterxml.jackson.databind.ser.std.StdSerializer; import com.google.common.base.Objects; import org.apache.druid.frame.Frame; -import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.channel.ByteTracker; import org.apache.druid.frame.read.FrameReader; import org.apache.druid.frame.read.columnar.FrameColumnReaders; import org.apache.druid.frame.segment.FrameStorageAdapter; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.CloseableShapeshifter; @@ -43,10 +34,6 @@ import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; import java.util.Collection; import java.util.LinkedHashMap; @@ -112,6 +99,9 @@ public T as(Class clazz) if (StorageAdapter.class.equals(clazz)) { return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); } + if (FrameRowsAndColumns.class.equals(clazz)) { + return (T) this; + } return null; } @@ -140,56 +130,4 @@ public boolean equals(Object o) return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); } - - public static class FrameRACSerializer extends StdSerializer - { - public FrameRACSerializer() - { - super(FrameRowsAndColumns.class); - } - - @Override - public void serialize( - FrameRowsAndColumns frameRAC, - JsonGenerator jsonGenerator, - SerializerProvider serializerProvider - ) throws IOException - { - JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature()); - - Frame frame = frameRAC.getFrame(); - final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - frame.writeTo( - Channels.newChannel(baos), - false, - ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), - ByteTracker.unboundedTracker() - ); - - jsonGenerator.writeBinary(baos.toByteArray()); - } - } - - public static class FrameRACDeserializer extends StdDeserializer - { - public FrameRACDeserializer() - { - super(FrameRowsAndColumns.class); - } - - @Override - public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) - throws IOException - { - RowSignature sig = jsonParser.readValueAs(RowSignature.class); - jsonParser.nextValue(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - jsonParser.readBinaryValue(baos); - Frame frame = Frame.wrap(baos.toByteArray()); - return (frame.type() == FrameType.COLUMNAR) - ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) - : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); - } - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 03fe932325c2..06f50452ef5b 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -19,35 +19,13 @@ package org.apache.druid.query.rowsandcols.concrete; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Objects; import org.apache.druid.frame.Frame; import org.apache.druid.frame.FrameType; -import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.frame.read.columnar.FrameColumnReaders; -import org.apache.druid.frame.segment.FrameStorageAdapter; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; -import javax.annotation.Nullable; - -import java.util.Collection; -import java.util.LinkedHashMap; - public class RowBasedFrameRowsAndColumns extends FrameRowsAndColumns { - //@JsonCreator - public RowBasedFrameRowsAndColumns( Frame frame, RowSignature signature) - //@JsonProperty("frame") Frame frame, - //@JsonProperty("signature") RowSignature signature) + public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { super(FrameType.ROW_BASED.ensureType(frame), signature); } From 16f44b21b2fb005bc9e658258148b8ee66893ef3 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Thu, 25 Jul 2024 08:18:40 +0530 Subject: [PATCH 13/22] refactor --- .../concrete/ColumnBasedFrameRowsAndColumnsTest.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java index f0afbb6ab5b9..d1e9d6aae4d6 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumnsTest.java @@ -34,17 +34,23 @@ public ColumnBasedFrameRowsAndColumnsTest() } public static Function MAKER = input -> { - return buildFrame(input); }; public static ColumnBasedFrameRowsAndColumns buildFrame(MapOfColumnsRowsAndColumns input) { - LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns(input, null, null, null, OffsetLimit.limit(Integer.MAX_VALUE), null, null); + LazilyDecoratedRowsAndColumns rac = new LazilyDecoratedRowsAndColumns( + input, + null, + null, + null, + OffsetLimit.limit(Integer.MAX_VALUE), + null, + null + ); rac.numRows(); // materialize return (ColumnBasedFrameRowsAndColumns) rac.getBase(); } - } From b2e2498956a74143d7d1ca04d52d4334a3ca7240 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Mon, 29 Jul 2024 09:43:43 +0530 Subject: [PATCH 14/22] refactor --- .../druid/query/operator/WindowOperatorQueryQueryToolChest.java | 1 + .../java/org/apache/druid/jackson/DefaultObjectMapperTest.java | 1 - .../query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java | 1 - sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest | 2 +- 4 files changed, 2 insertions(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java index 60c618fa7285..bec529eedefa 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java @@ -45,6 +45,7 @@ public class WindowOperatorQueryQueryToolChest extends QueryToolChest { + @Override @SuppressWarnings("unchecked") public QueryRunner mergeResults(QueryRunner runner) diff --git a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java index 4f8f7b3ddbdd..92c8a2cb2989 100644 --- a/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java +++ b/processing/src/test/java/org/apache/druid/jackson/DefaultObjectMapperTest.java @@ -128,5 +128,4 @@ public void testColumnBasedFrameRowsAndColumns() throws Exception ColumnBasedFrameRowsAndColumns frc2 = (ColumnBasedFrameRowsAndColumns) om.readValue(bytes, RowsAndColumns.class); assertEquals(frc, frc2); } - } diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java index 44ffe928997a..4ae3eb612a8a 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/RowsAndColumnsDecoratorTest.java @@ -364,5 +364,4 @@ private void validateDecorated( } } } - } diff --git a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest index 150c9225ee6e..9ca9f88e850b 100644 --- a/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest +++ b/sql/src/test/resources/calcite/tests/window/simpleSum.sqlTest @@ -29,4 +29,4 @@ expectedResults: - [ 946857600000, 1, 3 ] - [ 978307200000, 1, 4 ] - [ 978393600000, 1, 5 ] - - [ 978480000000, 1, 6 ] \ No newline at end of file + - [ 978480000000, 1, 6 ] From b19d2b3cea62fcd9d9fb57027c69e285348d3ca7 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 3 Sep 2024 07:44:56 +0530 Subject: [PATCH 15/22] checksty;e --- .../org/apache/druid/query/rowsandcols/RowsAndColumns.java | 1 - .../druid/query/rowsandcols/concrete/FrameRowsAndColumns.java | 4 ---- .../org/apache/druid/sql/calcite/run/NativeSqlEngine.java | 2 +- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 5b9fc8aa1da1..84e2dd607971 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -44,7 +44,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; - import java.util.Collection; /** diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 0a9393879c7e..32be7acc9472 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -20,18 +20,14 @@ package org.apache.druid.query.rowsandcols.concrete; import com.google.common.base.Objects; -import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.read.FrameReader; -import org.apache.druid.frame.read.columnar.FrameColumnReaders; import org.apache.druid.frame.segment.FrameStorageAdapter; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.segment.CloseableShapeshifter; import org.apache.druid.segment.StorageAdapter; -import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java index 2477ac38dec1..d02d302437b8 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/run/NativeSqlEngine.java @@ -109,7 +109,6 @@ public boolean featureAvailable(EngineFeature feature) case ALLOW_TOP_LEVEL_UNION_ALL: case TIME_BOUNDARY_QUERY: case GROUPBY_IMPLICITLY_SORTS: - case WINDOW_LEAF_OPERATOR: return true; case CAN_INSERT: case CAN_REPLACE: @@ -117,6 +116,7 @@ public boolean featureAvailable(EngineFeature feature) case WRITE_EXTERNAL_DATA: case SCAN_ORDER_BY_NON_TIME: case SCAN_NEEDS_SIGNATURE: + case WINDOW_LEAF_OPERATOR: return false; default: throw SqlEngines.generateUnrecognizedFeatureException(NativeSqlEngine.class.getSimpleName(), feature); From 2ac95097fd547c3e61ae7cd106ded9a1ddc390b4 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sun, 8 Sep 2024 08:54:30 +0530 Subject: [PATCH 16/22] retire WireTransferable --- ...WindowOperatorQueryQueryRunnerFactory.java | 17 +-------- .../LazilyDecoratedRowsAndColumns.java | 17 ++++----- .../semantic/WireTransferable.java | 37 ------------------- 3 files changed, 8 insertions(+), 63 deletions(-) delete mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d18f6c252c1a..70b39a7d8843 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -21,7 +21,6 @@ import com.google.common.base.Function; import org.apache.druid.error.DruidException; -import org.apache.druid.frame.Frame; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -31,10 +30,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.Segment; -import org.apache.druid.segment.column.RowSignature; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -100,19 +96,8 @@ public Sequence apply( @Override public RowsAndColumns apply(@Nullable RowsAndColumns input) { - // This is interim code to force a materialization by synthesizing the wire transfer - // that will need to naturally happen as we flesh out this code more. For now, we - // materialize the bytes on-heap and then read them back in as a frame. if (input instanceof LazilyDecoratedRowsAndColumns) { - final WireTransferable wire = WireTransferable.fromRAC(input); - final byte[] frameBytes = wire.bytesToTransfer(); - - RowSignature.Builder sigBob = RowSignature.builder(); - for (String column : input.getColumnNames()) { - sigBob.add(column, input.findColumn(column).toAccessor().getType()); - } - - return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + return ((LazilyDecoratedRowsAndColumns) input).toFrameRowsAndColumns(); } return input; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index 6be9500b9fe3..54dc5470b75e 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -39,10 +39,10 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; -import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorBuildSpec; @@ -150,16 +150,13 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() @SuppressWarnings("unused") @SemanticCreator - public WireTransferable toWireTransferable() + public FrameRowsAndColumns toFrameRowsAndColumns() { - return () -> { - final Pair materialized = materialize(); - if (materialized == null) { - return new byte[]{}; - } else { - return materialized.lhs; - } - }; + maybeMaterialize(); + if (base instanceof FrameRowsAndColumns) { + return (FrameRowsAndColumns) base; + } + return null; } private void maybeMaterialize() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java deleted file mode 100644 index a7d55f599293..000000000000 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/WireTransferable.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.rowsandcols.semantic; - -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.query.rowsandcols.RowsAndColumns; - -public interface WireTransferable -{ - static WireTransferable fromRAC(RowsAndColumns rac) - { - WireTransferable retVal = rac.as(WireTransferable.class); - if (retVal == null) { - throw new ISE("Rac[%s] cannot be transferred over the wire", rac.getClass()); - } - return retVal; - } - - byte[] bytesToTransfer(); -} From b925341bf37cbf8aacb976a9ec382b5f765160a4 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sun, 8 Sep 2024 09:44:31 +0530 Subject: [PATCH 17/22] default RowsAndColumns::as --- .../query/rowsandcols/AppendableMapOfColumns.java | 10 ---------- .../druid/query/rowsandcols/ConcatRowsAndColumns.java | 7 ------- .../druid/query/rowsandcols/EmptyRowsAndColumns.java | 8 -------- .../druid/query/rowsandcols/LimitedRowsAndColumns.java | 9 --------- .../query/rowsandcols/MapOfColumnsRowsAndColumns.java | 2 +- .../query/rowsandcols/RearrangedRowsAndColumns.java | 7 ------- .../apache/druid/query/rowsandcols/RowsAndColumns.java | 10 +++++++++- .../rowsandcols/StorageAdapterRowsAndColumns.java | 2 +- .../rowsandcols/concrete/FrameRowsAndColumns.java | 5 +---- .../druid/query/rowsandcols/NoAsRowsAndColumns.java | 9 --------- 10 files changed, 12 insertions(+), 57 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java index 61f6855cd01c..d83f56c7ba5f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/AppendableMapOfColumns.java @@ -79,14 +79,4 @@ public Column findColumn(String name) } return retVal; } - - @Override - @SuppressWarnings("unchecked") - public T as(Class clazz) - { - if (AppendableRowsAndColumns.class.equals(clazz)) { - return (T) this; - } - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java index c6ced60849d4..3f70f82a2537 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/ConcatRowsAndColumns.java @@ -141,13 +141,6 @@ public Column findColumn(String name) } } - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - private class ConcatedidColumn implements Column { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java index dd0c7dab1cda..56647e0f5687 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/EmptyRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; @@ -44,11 +43,4 @@ public Column findColumn(String name) { return null; } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java index abb3d4649b1a..8cfadecb4dd2 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LimitedRowsAndColumns.java @@ -23,7 +23,6 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.LimitedColumn; -import javax.annotation.Nullable; import java.util.Collection; public class LimitedRowsAndColumns implements RowsAndColumns @@ -66,12 +65,4 @@ public Column findColumn(String name) return new LimitedColumn(column, start, end); } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java index 29f092f67440..d6bc1026a98d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/MapOfColumnsRowsAndColumns.java @@ -164,7 +164,7 @@ public T as(Class clazz) if (AppendableRowsAndColumns.class.equals(clazz)) { return (T) new AppendableMapOfColumns(this); } - return null; + return RowsAndColumns.super.as(clazz); } public static class Builder diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java index f1793f8fd0e4..e64f086edd7f 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RearrangedRowsAndColumns.java @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum) ); } } - - @Nullable - @Override - public T as(Class clazz) - { - return null; - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 84e2dd607971..4cbaa25bfcc4 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -44,6 +44,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.util.Arrays; import java.util.Collection; /** @@ -129,8 +130,15 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) * @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had * through a local implementation of the interface. */ + @SuppressWarnings("unchecked") @Nullable - T as(Class clazz); + default T as(Class clazz) + { + if (Arrays.asList(getClass().getInterfaces()).contains(clazz)) { + return (T) this; + } + return null; + } /** * Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java index 2d49cc324400..62d20cdd4adf 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/StorageAdapterRowsAndColumns.java @@ -58,7 +58,7 @@ public T as(Class clazz) if (StorageAdapter.class == clazz) { return (T) storageAdapter; } - return null; + return RowsAndColumns.super.as(clazz); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 32be7acc9472..1e058b70e093 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -76,10 +76,7 @@ public T as(Class clazz) if (StorageAdapter.class.equals(clazz)) { return (T) new FrameStorageAdapter(frame, FrameReader.create(signature), Intervals.ETERNITY); } - if (FrameRowsAndColumns.class.equals(clazz)) { - return (T) this; - } - return null; + return RowsAndColumns.super.as(clazz); } @Override diff --git a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java index 422c87c8b7c6..16cd44e870ba 100644 --- a/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java +++ b/processing/src/test/java/org/apache/druid/query/rowsandcols/NoAsRowsAndColumns.java @@ -21,7 +21,6 @@ import org.apache.druid.query.rowsandcols.column.Column; -import javax.annotation.Nullable; import java.util.Collection; public class NoAsRowsAndColumns implements RowsAndColumns @@ -50,12 +49,4 @@ public Column findColumn(String name) { return rac.findColumn(name); } - - @Nullable - @Override - public T as(Class clazz) - { - // Pretend like this doesn't implement any semantic interfaces - return null; - } } From b0b606a0764aa86e73c5747ba960bfe71d587800 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sun, 8 Sep 2024 19:08:21 +0530 Subject: [PATCH 18/22] refactors --- .../apache/druid/query/rowsandcols/RowsAndColumns.java | 8 +------- .../druid/common/semantic/SemanticCreatorUsageTest.java | 6 +++--- .../org/apache/druid/sql/calcite/CalciteQueryTest.java | 2 +- .../org/apache/druid/sql/calcite/NotYetSupported.java | 2 +- 4 files changed, 6 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 4cbaa25bfcc4..f59f1ef3cab2 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.util.Arrays; import java.util.Collection; @@ -165,12 +164,7 @@ public void serialize( Frame frame = frameRAC.getFrame(); final ByteArrayOutputStream baos = new ByteArrayOutputStream(); - frame.writeTo( - Channels.newChannel(baos), - false, - ByteBuffer.allocate(Frame.compressionBufferSize((int) frame.numBytes())), - ByteTracker.unboundedTracker() - ); + frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker()); jsonGenerator.writeBinary(baos.toByteArray()); } diff --git a/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java index 0dd61fb4b3ea..a6821d0ffda4 100644 --- a/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java +++ b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java @@ -78,7 +78,7 @@ public void testPublic() } /** - * {@link SemanticCreator} must return with an interface. + * {@link SemanticCreator} must return with an interface or abstract class. *

* An exact implementation may indicate that some interface methods might be missing. */ @@ -87,8 +87,8 @@ public void testReturnType() { Class returnType = method.getReturnType(); assertTrue( - returnType + " is not an interface; this method must return with an interface; ", - returnType.isInterface() + returnType + " is not an interface or abstract class; this method must return with either of them; ", + returnType.isInterface() || Modifier.isAbstract(returnType.getModifiers()) ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 58d29a738c09..73ca25508670 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -16085,7 +16085,7 @@ public void testScanAndSortOnJoin() .run(); } - @NotYetSupported(Modes.CANNOT_RETRIEVE_ROWS) + @NotYetSupported(Modes.UNSUPPORTED_DATASOURCE) @Test public void testWindowingOverJoin() { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java index e5442a2bda24..f0c48ff44f2f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/NotYetSupported.java @@ -89,7 +89,7 @@ enum Modes RESULT_MISMATCH(AssertionError.class, "(assertResulEquals|AssertionError: column content mismatch)"), LONG_CASTING(AssertionError.class, "expected: java.lang.Long"), UNSUPPORTED_NULL_ORDERING(DruidException.class, "(A|DE)SCENDING ordering with NULLS (LAST|FIRST)"), - CANNOT_RETRIEVE_ROWS(UnsupportedOperationException.class, "Cannot retrieve number of rows from join segment"), + UNSUPPORTED_DATASOURCE(DruidException.class, "WindowOperatorQuery must run on top of a query or inline data source"), UNION_WITH_COMPLEX_OPERAND(DruidException.class, "Only Table and Values are supported as inputs for Union"), UNION_MORE_STRICT_ROWTYPE_CHECK(DruidException.class, "Row signature mismatch in Union inputs"), JOIN_CONDITION_NOT_PUSHED_CONDITION(DruidException.class, "SQL requires a join with '.*' condition"), From 98805d82d9d4c36714c6e070a4543cf56fffed9b Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Mon, 9 Sep 2024 08:30:48 +0530 Subject: [PATCH 19/22] bug --- .../operator/WindowOperatorQueryQueryRunnerFactory.java | 3 ++- .../org/apache/druid/query/rowsandcols/RowsAndColumns.java | 7 +++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index 70b39a7d8843..f86f91be18be 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -30,6 +30,7 @@ import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; import org.apache.druid.segment.Segment; import org.joda.time.Interval; @@ -97,7 +98,7 @@ public Sequence apply( public RowsAndColumns apply(@Nullable RowsAndColumns input) { if (input instanceof LazilyDecoratedRowsAndColumns) { - return ((LazilyDecoratedRowsAndColumns) input).toFrameRowsAndColumns(); + return input.as(FrameRowsAndColumns.class); } return input; } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index f59f1ef3cab2..08b533096eaf 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -136,6 +136,13 @@ default T as(Class clazz) if (Arrays.asList(getClass().getInterfaces()).contains(clazz)) { return (T) this; } + Class superClass = this.getClass().getSuperclass(); + while (superClass != null) { + if (superClass.equals(clazz)) { + return (T) this; + } + superClass = superClass.getSuperclass(); + } return null; } From ce2be7a22c227a5f28fe3adfe13a9abd87264100 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Tue, 10 Sep 2024 12:30:14 +0530 Subject: [PATCH 20/22] comments and conflicts --- .../LazilyDecoratedRowsAndColumns.java | 5 +---- .../query/rowsandcols/RowsAndColumns.java | 18 ++++++------------ 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index 31342008b650..bb35f6837976 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -153,10 +153,7 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() public FrameRowsAndColumns toFrameRowsAndColumns() { maybeMaterialize(); - if (base instanceof FrameRowsAndColumns) { - return (FrameRowsAndColumns) base; - } - return null; + return base.as(FrameRowsAndColumns.class); } private void maybeMaterialize() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 08b533096eaf..a34d0e463c07 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -43,7 +43,6 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.channels.Channels; -import java.util.Arrays; import java.util.Collection; /** @@ -133,16 +132,9 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) @Nullable default T as(Class clazz) { - if (Arrays.asList(getClass().getInterfaces()).contains(clazz)) { + if (clazz.isInstance(this)) { return (T) this; } - Class superClass = this.getClass().getSuperclass(); - while (superClass != null) { - if (superClass.equals(clazz)) { - return (T) this; - } - superClass = superClass.getSuperclass(); - } return null; } @@ -197,9 +189,11 @@ public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationCon ByteArrayOutputStream baos = new ByteArrayOutputStream(); jsonParser.readBinaryValue(baos); Frame frame = Frame.wrap(baos.toByteArray()); - return (frame.type() == FrameType.COLUMNAR) - ? new ColumnBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig) - : new RowBasedFrameRowsAndColumns(Frame.wrap(baos.toByteArray()), sig); + if (frame.type() == FrameType.COLUMNAR) { + return new ColumnBasedFrameRowsAndColumns(frame, sig); + } else { + return new RowBasedFrameRowsAndColumns(frame, sig); + } } } } From 151f6d29e5bd1f2425f8a972af5e6b9afb302e89 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Sat, 14 Sep 2024 13:07:20 +0530 Subject: [PATCH 21/22] refactor --- .../concrete/AbstractFrameRowsAndColumns.java | 106 ++++++++++++++++++ .../ColumnBasedFrameRowsAndColumns.java | 2 +- .../concrete/FrameRowsAndColumns.java | 73 +----------- .../concrete/RowBasedFrameRowsAndColumns.java | 2 +- .../semantic/SemanticCreatorUsageTest.java | 6 +- .../sql/calcite/CalciteWindowQueryTest.java | 2 +- 6 files changed, 116 insertions(+), 75 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java new file mode 100644 index 000000000000..5295326c8622 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/AbstractFrameRowsAndColumns.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols.concrete; + +import com.google.common.base.Objects; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.query.rowsandcols.column.Column; +import org.apache.druid.segment.CloseableShapeshifter; +import org.apache.druid.segment.CursorFactory; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.Collection; +import java.util.LinkedHashMap; + +public abstract class AbstractFrameRowsAndColumns implements FrameRowsAndColumns, AutoCloseable, CloseableShapeshifter +{ + final Frame frame; + final RowSignature signature; + final LinkedHashMap colCache = new LinkedHashMap<>(); + + public AbstractFrameRowsAndColumns(Frame frame, RowSignature signature) + { + this.frame = frame; + this.signature = signature; + } + + @Override + public Frame getFrame() + { + return frame; + } + + @Override + public RowSignature getSignature() + { + return signature; + } + + @Override + public Collection getColumnNames() + { + return signature.getColumnNames(); + } + + @Override + public int numRows() + { + return frame.numRows(); + } + + @SuppressWarnings("unchecked") + @Nullable + @Override + public T as(Class clazz) + { + if (CursorFactory.class.equals(clazz)) { + return (T) FrameReader.create(signature).makeCursorFactory(frame); + } + return FrameRowsAndColumns.super.as(clazz); + } + + @Override + public void close() + { + // nothing to close + } + + @Override + public int hashCode() + { + return Objects.hashCode(frame, signature); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof AbstractFrameRowsAndColumns)) { + return false; + } + AbstractFrameRowsAndColumns otherFrame = (AbstractFrameRowsAndColumns) o; + + return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); + } +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java index a7b0b4bab951..c4a4577dc1af 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/ColumnBasedFrameRowsAndColumns.java @@ -29,7 +29,7 @@ import javax.annotation.Nullable; -public class ColumnBasedFrameRowsAndColumns extends FrameRowsAndColumns +public class ColumnBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { public ColumnBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 827e6a8457bb..882aa4904852 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -19,87 +19,22 @@ package org.apache.druid.query.rowsandcols.concrete; -import com.google.common.base.Objects; import org.apache.druid.frame.Frame; -import org.apache.druid.frame.read.FrameReader; import org.apache.druid.query.rowsandcols.RowsAndColumns; -import org.apache.druid.query.rowsandcols.column.Column; -import org.apache.druid.segment.CloseableShapeshifter; -import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.column.RowSignature; import javax.annotation.Nullable; -import java.util.Collection; -import java.util.LinkedHashMap; -public abstract class FrameRowsAndColumns implements RowsAndColumns, AutoCloseable, CloseableShapeshifter +public interface FrameRowsAndColumns extends RowsAndColumns { - final Frame frame; - final RowSignature signature; - final LinkedHashMap colCache = new LinkedHashMap<>(); + Frame getFrame(); - public FrameRowsAndColumns(Frame frame, RowSignature signature) - { - this.frame = frame; - this.signature = signature; - } - - public Frame getFrame() - { - return frame; - } - - public RowSignature getSignature() - { - return signature; - } + RowSignature getSignature(); - @Override - public Collection getColumnNames() - { - return signature.getColumnNames(); - } - - @Override - public int numRows() - { - return frame.numRows(); - } - - @SuppressWarnings("unchecked") @Nullable @Override - public T as(Class clazz) + default T as(Class clazz) { - if (CursorFactory.class.equals(clazz)) { - return (T) FrameReader.create(signature).makeCursorFactory(frame); - } return RowsAndColumns.super.as(clazz); } - - @Override - public void close() - { - // nothing to close - } - - @Override - public int hashCode() - { - return Objects.hashCode(frame, signature); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (!(o instanceof FrameRowsAndColumns)) { - return false; - } - FrameRowsAndColumns otherFrame = (FrameRowsAndColumns) o; - - return frame.writableMemory().equals(otherFrame.frame.writableMemory()) && signature.equals(otherFrame.signature); - } } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java index 1b5a9e3e43cf..c702c210775c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowsAndColumns.java @@ -30,7 +30,7 @@ import javax.annotation.Nullable; -public class RowBasedFrameRowsAndColumns extends FrameRowsAndColumns +public class RowBasedFrameRowsAndColumns extends AbstractFrameRowsAndColumns { public RowBasedFrameRowsAndColumns(Frame frame, RowSignature signature) { diff --git a/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java index a6821d0ffda4..0dd61fb4b3ea 100644 --- a/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java +++ b/processing/src/test/java/org/apache/druid/common/semantic/SemanticCreatorUsageTest.java @@ -78,7 +78,7 @@ public void testPublic() } /** - * {@link SemanticCreator} must return with an interface or abstract class. + * {@link SemanticCreator} must return with an interface. *

* An exact implementation may indicate that some interface methods might be missing. */ @@ -87,8 +87,8 @@ public void testReturnType() { Class returnType = method.getReturnType(); assertTrue( - returnType + " is not an interface or abstract class; this method must return with either of them; ", - returnType.isInterface() || Modifier.isAbstract(returnType.getModifiers()) + returnType + " is not an interface; this method must return with an interface; ", + returnType.isInterface() ); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java index ccf459e743e7..b03938b6ee96 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java @@ -266,7 +266,7 @@ public void testFailure_partitionByMVD() ); assertEquals( - "Encountered a multi value column [v0]. Window processing does not support MVDs. " + "Encountered a multi value column. Window processing does not support MVDs. " + "Consider using UNNEST or MV_TO_ARRAY.", e.getMessage() ); From f1dc27a93e07fbbaf69f7ccb8973c67fc739dba7 Mon Sep 17 00:00:00 2001 From: sreemanamala Date: Mon, 16 Sep 2024 12:58:23 +0530 Subject: [PATCH 22/22] comments --- .../query/rowsandcols/concrete/FrameRowsAndColumns.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java index 882aa4904852..022a0f91ac16 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java @@ -23,18 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.segment.column.RowSignature; -import javax.annotation.Nullable; - public interface FrameRowsAndColumns extends RowsAndColumns { Frame getFrame(); RowSignature getSignature(); - - @Nullable - @Override - default T as(Class clazz) - { - return RowsAndColumns.super.as(clazz); - } }