Skip to content

Commit ea60449

Browse files
tjiumingwuzhanpeng
authored and
wuzhanpeng
committed
[Pulsar SQL] support protobuf/timestamp (apache#13287)
1 parent 8044117 commit ea60449

File tree

5 files changed

+770
-197
lines changed

5 files changed

+770
-197
lines changed

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeColumnDecoder.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import io.prestosql.spi.type.RowType;
5151
import io.prestosql.spi.type.RowType.Field;
5252
import io.prestosql.spi.type.SmallintType;
53+
import io.prestosql.spi.type.TimestampType;
5354
import io.prestosql.spi.type.TinyintType;
5455
import io.prestosql.spi.type.Type;
5556
import io.prestosql.spi.type.VarbinaryType;
@@ -69,7 +70,8 @@ public class PulsarProtobufNativeColumnDecoder {
6970
BigintType.BIGINT,
7071
RealType.REAL,
7172
DoubleType.DOUBLE,
72-
VarbinaryType.VARBINARY);
73+
VarbinaryType.VARBINARY,
74+
TimestampType.TIMESTAMP);
7375

7476
private final Type columnType;
7577
private final String columnMapping;
@@ -192,6 +194,15 @@ public long getLong() {
192194
return floatToIntBits((Float) value);
193195
}
194196

197+
//return millisecond which parsed from protobuf/timestamp
198+
if (columnType instanceof TimestampType && value instanceof DynamicMessage) {
199+
DynamicMessage message = (DynamicMessage) value;
200+
int nanos = (int) message.getField(message.getDescriptorForType().findFieldByName("nanos"));
201+
long seconds = (long) message.getField(message.getDescriptorForType().findFieldByName("seconds"));
202+
//maybe an exception here, but seems will never happen in hundred years.
203+
return seconds * MILLIS_PER_SECOND + nanos / NANOS_PER_MILLISECOND;
204+
}
205+
195206
throw new PrestoException(DECODER_CONVERSION_NOT_SUPPORTED,
196207
format("cannot decode object of '%s' as '%s' for column '%s'",
197208
value.getClass(), columnType, columnName));
@@ -376,5 +387,6 @@ private static Block serializeRow(BlockBuilder parentBlockBuilder, Object value,
376387

377388
protected static final String PROTOBUF_MAP_KEY_NAME = "key";
378389
protected static final String PROTOBUF_MAP_VALUE_NAME = "value";
379-
390+
private static final long MILLIS_PER_SECOND = 1000;
391+
private static final long NANOS_PER_MILLISECOND = 1000000;
380392
}

pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/protobufnative/PulsarProtobufNativeRowDecoderFactory.java

+13-5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import com.google.common.collect.ImmutableList;
2727
import com.google.protobuf.Descriptors;
28+
import com.google.protobuf.TimestampProto;
2829
import io.airlift.log.Logger;
2930
import io.prestosql.decoder.DecoderColumnHandle;
3031
import io.prestosql.spi.PrestoException;
@@ -37,11 +38,13 @@
3738
import io.prestosql.spi.type.RealType;
3839
import io.prestosql.spi.type.RowType;
3940
import io.prestosql.spi.type.StandardTypes;
41+
import io.prestosql.spi.type.TimestampType;
4042
import io.prestosql.spi.type.Type;
4143
import io.prestosql.spi.type.TypeManager;
4244
import io.prestosql.spi.type.TypeSignature;
4345
import io.prestosql.spi.type.TypeSignatureParameter;
4446
import io.prestosql.spi.type.VarbinaryType;
47+
4548
import java.util.List;
4649
import java.util.Optional;
4750
import java.util.Set;
@@ -142,11 +145,16 @@ private Type parseProtobufPrestoType(Descriptors.FieldDescriptor field) {
142145
ImmutableList.of(TypeSignatureParameter.typeParameter(keyType),
143146
TypeSignatureParameter.typeParameter(valueType)));
144147
} else {
145-
//row
146-
dataType = RowType.from(msg.getFields().stream()
147-
.map(rowField -> new RowType.Field(Optional.of(rowField.getName()),
148-
parseProtobufPrestoType(rowField)))
149-
.collect(toImmutableList()));
148+
if (TimestampProto.getDescriptor().toProto().getName().equals(msg.getFile().toProto().getName())) {
149+
//if msg type is protobuf/timestamp
150+
dataType = TimestampType.TIMESTAMP;
151+
} else {
152+
//row
153+
dataType = RowType.from(msg.getFields().stream()
154+
.map(rowField -> new RowType.Field(Optional.of(rowField.getName()),
155+
parseProtobufPrestoType(rowField)))
156+
.collect(toImmutableList()));
157+
}
150158
}
151159
break;
152160
default:

0 commit comments

Comments
 (0)