Skip to content

Commit

Permalink
Add serde for ColumnBasedRowsAndColumns to fix window queries without…
Browse files Browse the repository at this point in the history
… group by (apache#16658)

Register a Ser-De for RowsAndColumns so that the window operator query running on leaf operators would be transferred properly on the wire. Would fix the empty response given by window queries without group by on the native engine.

(cherry picked from commit bb1c3c1)
  • Loading branch information
sreemanamala committed Sep 18, 2024
1 parent a63ac25 commit 07104e9
Show file tree
Hide file tree
Showing 24 changed files with 296 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.context.ResponseContextDeserializer;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.joda.time.DateTimeZone;

import java.io.IOException;
Expand Down Expand Up @@ -189,20 +188,7 @@ public ByteOrder deserialize(JsonParser jp, DeserializationContext ctxt) throws
);
addDeserializer(ResponseContext.class, new ResponseContextDeserializer());

addSerializer(RowsAndColumns.class, new JsonSerializer<RowsAndColumns>()
{
@Override
public void serialize(
RowsAndColumns value,
JsonGenerator gen,
SerializerProvider serializers
) throws IOException
{
// It would be really cool if jackson offered an output stream that would allow us to push bytes
// through, but it doesn't right now, so we have to build a byte[] instead. Maybe something to contribute
// back to Jackson at some point.
gen.writeBinary(WireTransferable.fromRAC(value).bytesToTransfer());
}
});
addSerializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsSerializer());
addDeserializer(RowsAndColumns.class, new RowsAndColumns.RowsAndColumnsDeserializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand All @@ -31,10 +30,8 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.Interval;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -100,19 +97,8 @@ public Sequence<RowsAndColumns> apply(
@Override
public RowsAndColumns apply(@Nullable RowsAndColumns input)
{
// This is interim code to force a materialization by synthesizing the wire transfer
// that will need to naturally happen as we flesh out this code more. For now, we
// materialize the bytes on-heap and then read them back in as a frame.
if (input instanceof LazilyDecoratedRowsAndColumns) {
final WireTransferable wire = WireTransferable.fromRAC(input);
final byte[] frameBytes = wire.bytesToTransfer();

RowSignature.Builder sigBob = RowSignature.builder();
for (String column : input.getColumnNames()) {
sigBob.add(column, input.findColumn(column).toAccessor().getType());
}

return new ColumnBasedFrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
return input.as(FrameRowsAndColumns.class);
}
return input;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,4 @@ public Column findColumn(String name)
}
return retVal;
}

@Override
@SuppressWarnings("unchecked")
public <T> T as(Class<T> clazz)
{
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) this;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,6 @@ public Column findColumn(String name)
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

private class ConcatedidColumn implements Column
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public <T> T as(Class<T> clazz)
if (CursorFactory.class == clazz) {
return (T) cursorFactory;
}
return null;
return RowsAndColumns.super.as(clazz);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.druid.query.rowsandcols.column.Column;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;

Expand All @@ -44,11 +43,4 @@ public Column findColumn(String name)
{
return null;
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.ColumnAccessor;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
Expand Down Expand Up @@ -150,16 +150,10 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator()

@SuppressWarnings("unused")
@SemanticCreator
public WireTransferable toWireTransferable()
public FrameRowsAndColumns toFrameRowsAndColumns()
{
return () -> {
final Pair<byte[], RowSignature> materialized = materialize();
if (materialized == null) {
return new byte[]{};
} else {
return materialized.lhs;
}
};
maybeMaterialize();
return base.as(FrameRowsAndColumns.class);
}

private void maybeMaterialize()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.column.LimitedColumn;

import javax.annotation.Nullable;
import java.util.Collection;

public class LimitedRowsAndColumns implements RowsAndColumns
Expand Down Expand Up @@ -66,12 +65,4 @@ public Column findColumn(String name)

return new LimitedColumn(column, start, end);
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public <T> T as(Class<T> clazz)
if (AppendableRowsAndColumns.class.equals(clazz)) {
return (T) new AppendableMapOfColumns(this);
}
return null;
return RowsAndColumns.super.as(clazz);
}

public static class Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,4 @@ public int compareRows(int lhsRowNum, int rhsRowNum)
);
}
}

@Nullable
@Override
public <T> T as(Class<T> clazz)
{
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,30 @@

package org.apache.druid.query.rowsandcols;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ByteTracker;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.query.rowsandcols.column.Column;
import org.apache.druid.query.rowsandcols.concrete.ColumnBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.util.Collection;

/**
Expand Down Expand Up @@ -110,6 +128,72 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input)
* @return A concrete implementation of the interface, or null if there is no meaningful optimization to be had
* through a local implementation of the interface.
*/
@SuppressWarnings("unchecked")
@Nullable
<T> T as(Class<T> clazz);
default <T> T as(Class<T> clazz)
{
if (clazz.isInstance(this)) {
return (T) this;
}
return null;
}

/**
* Serializer for {@link RowsAndColumns} by converting the instance to {@link FrameRowsAndColumns}
*/
class RowsAndColumnsSerializer extends StdSerializer<RowsAndColumns>
{
public RowsAndColumnsSerializer()
{
super(RowsAndColumns.class);
}

@Override
public void serialize(
RowsAndColumns rac,
JsonGenerator jsonGenerator,
SerializerProvider serializerProvider
) throws IOException
{
FrameRowsAndColumns frameRAC = rac.as(FrameRowsAndColumns.class);
if (frameRAC == null) {
throw DruidException.defensive("Unable to serialize RAC");
}
JacksonUtils.writeObjectUsingSerializerProvider(jsonGenerator, serializerProvider, frameRAC.getSignature());

Frame frame = frameRAC.getFrame();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
frame.writeTo(Channels.newChannel(baos), false, null, ByteTracker.unboundedTracker());

jsonGenerator.writeBinary(baos.toByteArray());
}
}

/**
* Deserializer for {@link RowsAndColumns} returning as an instance of {@link FrameRowsAndColumns}
*/
class RowsAndColumnsDeserializer extends StdDeserializer<RowsAndColumns>
{
public RowsAndColumnsDeserializer()
{
super(RowsAndColumns.class);
}

@Override
public FrameRowsAndColumns deserialize(JsonParser jsonParser, DeserializationContext deserializationContext)
throws IOException
{
RowSignature sig = jsonParser.readValueAs(RowSignature.class);
jsonParser.nextValue();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
jsonParser.readBinaryValue(baos);
Frame frame = Frame.wrap(baos.toByteArray());
if (frame.type() == FrameType.COLUMNAR) {
return new ColumnBasedFrameRowsAndColumns(frame, sig);
} else {
return new RowBasedFrameRowsAndColumns(frame, sig);
}
}
}
}
Loading

0 comments on commit 07104e9

Please sign in to comment.