Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug "Unhandled type for long: timestamp(0)" when query a table contains timestamp(0) cols (#28) #30

Merged
merged 7 commits into from
Oct 7, 2023
Merged
6 changes: 3 additions & 3 deletions paimon-trino-422/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ under the License.
<jdk.test.version>17</jdk.test.version>
<trino.version>422</trino.version>
<hadoop.version>2.8.5</hadoop.version>
<configuration.version>216</configuration.version>
<slice.version>0.42</slice.version>
<configuration.version>235</configuration.version>
<slice.version>0.45</slice.version>
<maven.toolchains.plugin.version>3.1.0</maven.toolchains.plugin.version>
</properties>

Expand Down Expand Up @@ -216,4 +216,4 @@ under the License.
</plugins>
</build>

</project>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.trino;

import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.utils.InternalRowUtils;

import io.trino.spi.TrinoException;
import io.trino.spi.block.ArrayBlockBuilder;
import io.trino.spi.block.ArrayValueBuilder;
import io.trino.spi.block.BlockBuilder;
import io.trino.spi.block.MapBlockBuilder;
import io.trino.spi.block.MapValueBuilder;
import io.trino.spi.block.RowBlockBuilder;
import io.trino.spi.block.RowValueBuilder;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.MapType;
import io.trino.spi.type.RowType;
import io.trino.spi.type.Type;

import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;

/** Trino {@link ConnectorPageSource}. */
public class TrinoPageSource extends TrinoPageSourceBase {

public TrinoPageSource(RecordReader<InternalRow> reader, List<ColumnHandle> projectedColumns) {
super(reader, projectedColumns);
}

protected void writeBlock(BlockBuilder output, Type type, DataType logicalType, Object value) {
if (type instanceof ArrayType) {
ArrayBlockBuilder arrayBlockBuilder = (ArrayBlockBuilder) output;
try {
arrayBlockBuilder.buildEntry(
(ArrayValueBuilder<Throwable>)
elementBuilder -> {
InternalArray arrayData = (InternalArray) value;
DataType elementType =
DataTypeChecks.getNestedTypes(logicalType).get(0);
for (int i = 0; i < arrayData.size(); i++) {
appendTo(
type.getTypeParameters().get(0),
elementType,
InternalRowUtils.get(arrayData, i, elementType),
elementBuilder);
}
});
} catch (Throwable e) {
e.printStackTrace();
}
return;
}
if (type instanceof RowType) {
RowBlockBuilder rowBlockBuilder = (RowBlockBuilder) output;
try {
rowBlockBuilder.buildEntry(
(RowValueBuilder<Throwable>)
fieldBuilders -> {
InternalRow rowData = (InternalRow) value;
for (int index = 0;
index < type.getTypeParameters().size();
index++) {
Type fieldType = type.getTypeParameters().get(index);
DataType fieldLogicalType =
((org.apache.paimon.types.RowType) logicalType)
.getTypeAt(index);
appendTo(
fieldType,
fieldLogicalType,
InternalRowUtils.get(
rowData, index, fieldLogicalType),
fieldBuilders.get(index));
}
});
} catch (Throwable e) {
e.printStackTrace();
}
return;
}
if (type instanceof MapType) {
InternalMap mapData = (InternalMap) value;
InternalArray keyArray = mapData.keyArray();
InternalArray valueArray = mapData.valueArray();
DataType keyType = ((org.apache.paimon.types.MapType) logicalType).getKeyType();
DataType valueType = ((org.apache.paimon.types.MapType) logicalType).getValueType();
MapBlockBuilder mapBlockBuilder = (MapBlockBuilder) output;
try {
mapBlockBuilder.buildEntry(
(MapValueBuilder<Throwable>)
(keyBuilder, valueBuilder) -> {
for (int i = 0; i < keyArray.size(); i++) {
appendTo(
type.getTypeParameters().get(0),
keyType,
InternalRowUtils.get(keyArray, i, keyType),
keyBuilder);
appendTo(
type.getTypeParameters().get(1),
valueType,
InternalRowUtils.get(valueArray, i, valueType),
valueBuilder);
}
});
} catch (Throwable e) {
e.printStackTrace();
}
return;
}
throw new TrinoException(
GENERIC_INTERNAL_ERROR, "Unhandled type for Block: " + type.getTypeSignature());
}

@Override
public OptionalLong getCompletedPositions() {
return super.getCompletedPositions();
}

@Override
public long getMemoryUsage() {
return 0;
}

@Override
public CompletableFuture<?> isBlocked() {
return super.isBlocked();
}

@Override
public Metrics getMetrics() {
return super.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import static io.trino.spi.type.TimeType.TIME_MICROS;
import static io.trino.spi.type.TimeZoneKey.UTC_KEY;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
import static io.trino.spi.type.TimestampType.TIMESTAMP_SECONDS;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.Timestamps.MICROSECONDS_PER_MILLISECOND;
import static io.trino.spi.type.TinyintType.TINYINT;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void close() throws IOException {
this.reader.close();
}

private void appendTo(Type type, DataType logicalType, Object value, BlockBuilder output) {
protected void appendTo(Type type, DataType logicalType, Object value, BlockBuilder output) {
if (value == null) {
output.appendNull();
return;
Expand All @@ -175,7 +176,7 @@ private void appendTo(Type type, DataType logicalType, Object value, BlockBuilde
DecimalType decimalType = (DecimalType) type;
BigDecimal decimal = ((Decimal) value).toBigDecimal();
type.writeLong(output, encodeShortScaledValue(decimal, decimalType.getScale()));
} else if (type.equals(TIMESTAMP_MILLIS)) {
} else if (type.equals(TIMESTAMP_MILLIS) || type.equals(TIMESTAMP_SECONDS)) {
type.writeLong(
output,
((Timestamp) value).getMillisecond() * MICROSECONDS_PER_MILLISECOND);
Expand Down Expand Up @@ -229,7 +230,7 @@ private static void writeObject(BlockBuilder output, Type type, Object value) {
}
}

private void writeBlock(BlockBuilder output, Type type, DataType logicalType, Object value) {
protected void writeBlock(BlockBuilder output, Type type, DataType logicalType, Object value) {
if (type instanceof ArrayType) {
BlockBuilder builder = output.beginBlockEntry();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@

package org.apache.paimon.trino;

import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
Expand All @@ -28,6 +30,7 @@
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.InnerTableWrite;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
Expand All @@ -36,6 +39,7 @@
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarCharType;

import io.trino.testing.AbstractTestQueryFramework;
Expand All @@ -45,9 +49,11 @@
import org.testng.annotations.Test;

import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

Expand Down Expand Up @@ -114,6 +120,10 @@ protected QueryRunner createQueryRunner() throws Exception {

{
Path tablePath4 = new Path(warehouse, "default.db/t4");
List<DataField> innerRowFields = new ArrayList<>();
innerRowFields.add(new DataField(4, "innercol1", new IntType()));
innerRowFields.add(
new DataField(5, "innercol2", new VarCharType(VarCharType.MAX_LENGTH)));
RowType rowType =
new RowType(
Arrays.asList(
Expand All @@ -123,7 +133,9 @@ protected QueryRunner createQueryRunner() throws Exception {
"map",
new MapType(
new VarCharType(VarCharType.MAX_LENGTH),
new VarCharType(VarCharType.MAX_LENGTH)))));
new VarCharType(VarCharType.MAX_LENGTH))),
new DataField(2, "innerrow", new RowType(true, innerRowFields)),
new DataField(3, "array", new ArrayType(new IntType()))));
new SchemaManager(LocalFileIO.create(), tablePath4)
.createTable(
new Schema(
Expand All @@ -143,7 +155,36 @@ protected QueryRunner createQueryRunner() throws Exception {
{
put(fromString("1"), fromString("2"));
}
})));
}),
GenericRow.of(2, fromString("male")),
new GenericArray(new int[] {1, 2, 3})));
commit.commit(0, writer.prepareCommit(true, 0));
}

{
Path tablePath6 = new Path(warehouse, "default.db/t99");
RowType rowType =
new RowType(
Arrays.asList(
new DataField(0, "i", new IntType()),
new DataField(1, "createdtime", new TimestampType(0)),
new DataField(2, "updatedtime", new TimestampType(3))));
new SchemaManager(LocalFileIO.create(), tablePath6)
.createTable(
new Schema(
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("i"),
new HashMap<>(),
""));
FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath6);
InnerTableWrite writer = table.newWrite("user");
InnerTableCommit commit = table.newCommit("user");
writer.write(
GenericRow.of(
1,
Timestamp.fromMicros(1694505288000000L),
Timestamp.fromMicros(1694505288001000L)));
commit.commit(0, writer.prepareCommit(true, 0));
}

Expand Down Expand Up @@ -178,7 +219,8 @@ private static SimpleTableTestHelper createTestHelper(Path tablePath) throws Exc

@Test
public void testComplexTypes() {
assertThat(sql("SELECT * FROM paimon.default.t4")).isEqualTo("[[1, {1=2}]]");
assertThat(sql("SELECT * FROM paimon.default.t4"))
.isEqualTo("[[1, {1=2}, [2, male], [1, 2, 3]]]");
}

@Test
Expand Down Expand Up @@ -257,7 +299,7 @@ public void testCreateTable() {
+ "changelog_producer = 'input'"
+ ")");
assertThat(sql("SHOW TABLES FROM paimon.default"))
.isEqualTo("[[orders], [t1], [t2], [t3], [t4]]");
.isEqualTo("[[orders], [t1], [t2], [t3], [t4], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.orders");
}

Expand All @@ -280,7 +322,7 @@ public void testRenameTable() {
+ ")");
sql("ALTER TABLE paimon.default.t5 RENAME TO t6");
assertThat(sql("SHOW TABLES FROM paimon.default"))
.isEqualTo("[[t1], [t2], [t3], [t4], [t6]]");
.isEqualTo("[[t1], [t2], [t3], [t4], [t6], [t99]]");
sql("DROP TABLE IF EXISTS paimon.default.t6");
}

Expand All @@ -302,7 +344,8 @@ public void testDropTable() {
+ "changelog_producer = 'input'"
+ ")");
sql("DROP TABLE IF EXISTS paimon.default.t5");
assertThat(sql("SHOW TABLES FROM paimon.default")).isEqualTo("[[t1], [t2], [t3], [t4]]");
assertThat(sql("SHOW TABLES FROM paimon.default"))
.isEqualTo("[[t1], [t2], [t3], [t4], [t99]]");
}

@Test
Expand Down Expand Up @@ -399,6 +442,12 @@ public void testSetTableProperties() {
sql("DROP TABLE IF EXISTS paimon.default.t5");
}

@Test
public void testTimestamp0AndTimestamp3() {
assertThat(sql("SELECT * FROM paimon.default.t99"))
.isEqualTo("[[1, 2023-09-12T07:54:48, 2023-09-12T07:54:48.001]]");
}

private String sql(String sql) {
MaterializedResult result = getQueryRunner().execute(sql);
return result.getMaterializedRows().toString();
Expand Down
Loading