Skip to content

Commit

Permalink
Merge pull request #23891: Fix Beam Sql Calcite Util does not support…
Browse files Browse the repository at this point in the history
… CHAR, VARCHAR, BINARY, VARBINARY

Fixes #23747
  • Loading branch information
apilloud authored Oct 31, 2022
2 parents 1e72e60 + 1cc5e4b commit a443df1
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 31 deletions.
20 changes: 19 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
## Bugfixes
* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues
Expand All @@ -52,6 +51,15 @@

# [2.44.0] - Unreleased

## Highlights

* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).

## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).

## New Features / Improvements

* Local packages can now be used as dependencies in the requirements.txt file, rather
Expand All @@ -63,6 +71,16 @@
* `ParquetIO.withSplit` was removed since splittable reading has been the default behavior since 2.35.0. The effect of
this change is to drop support for non-splittable reading ([#23832](https://github.com/apache/beam/issues/23832)).

## Deprecations

* X behavior is deprecated and will be removed in X versions ([#X](https://github.com/apache/beam/issues/X)).

## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed Beam SQL CalciteUtils (Java) and Cross-language JdbcIO (Python) did not support JDBC CHAR/VARCHAR, BINARY/VARBINARY logical types ([#23747](https://github.com/apache/beam/issues/23747), [#23526](https://github.com/apache/beam/issues/23526)).

# [2.43.0] - Unreleased

## Highlights
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.TimeWithLocalTzType;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -404,8 +408,13 @@ static Object toBeamObject(Object value, FieldType fieldType, boolean verifyValu
return toBeamRow((List<Object>) value, fieldType.getRowSchema(), verifyValues);
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)) {
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return (String) value;
} else if (FixedBytes.IDENTIFIER.equals(identifier)
|| VariableBytes.IDENTIFIER.equals(identifier)) {
return (byte[]) value;
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
return Instant.ofEpochMilli(((Number) value).longValue());
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
Expand Down Expand Up @@ -552,8 +561,13 @@ private static Expression getBeamField(
break;
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)) {
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
value = Expressions.call(expression, "getString", fieldName);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
|| VariableBytes.IDENTIFIER.equals(identifier)) {
value = Expressions.call(expression, "getBytes", fieldName);
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
value = Expressions.call(expression, "getDateTime", fieldName);
} else if (SqlTypes.DATE.getIdentifier().equals(identifier)) {
Expand Down Expand Up @@ -629,8 +643,13 @@ private static Expression toCalciteValue(Expression value, FieldType fieldType)
return nullOr(value, toCalciteRow(value, fieldType.getRowSchema()));
case LOGICAL_TYPE:
String identifier = fieldType.getLogicalType().getIdentifier();
if (CharType.IDENTIFIER.equals(identifier)) {
if (CharType.IDENTIFIER.equals(identifier)
|| FixedString.IDENTIFIER.equals(identifier)
|| VariableString.IDENTIFIER.equals(identifier)) {
return Expressions.convert_(value, String.class);
} else if (FixedBytes.IDENTIFIER.equals(identifier)
|| VariableBytes.IDENTIFIER.equals(identifier)) {
return Expressions.convert_(value, byte[].class);
} else if (TimeWithLocalTzType.IDENTIFIER.equals(identifier)) {
return nullOr(
value, Expressions.call(Expressions.convert_(value, DateTime.class), "getMillis"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,13 @@ public static SqlTypeName toSqlTypeName(FieldType type) {
typeName = BEAM_TO_CALCITE_DEFAULT_MAPPING.get(type);
}
if (typeName == null) {
if (type.getLogicalType() != null) {
Schema.LogicalType<?, ?> logicalType = type.getLogicalType();
if (logicalType instanceof PassThroughLogicalType) {
// for pass through logical type, just return its base type
return toSqlTypeName(logicalType.getBaseType());
}
}
throw new IllegalArgumentException(
String.format("Cannot find a matching Calcite SqlTypeName for Beam type: %s", type));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.extensions.sql;

import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
Expand All @@ -25,11 +26,16 @@
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.apache.beam.sdk.schemas.logicaltypes.FixedString;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableBytes;
import org.apache.beam.sdk.schemas.logicaltypes.VariableString;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand Down Expand Up @@ -82,36 +88,66 @@ public class BeamComplexTypeTest {
.addArrayField("field3", FieldType.INT64)
.build();

private static final Schema rowWithLogicalTypeSchema =
Schema.builder()
.addLogicalTypeField("field1", FixedString.of(10))
.addLogicalTypeField("field2", VariableString.of(10))
.addLogicalTypeField("field3", FixedBytes.of(10))
.addLogicalTypeField("field4", VariableBytes.of(10))
.build();

private static final ReadOnlyTableProvider readOnlyTableProvider =
new ReadOnlyTableProvider(
"test_provider",
ImmutableMap.of(
"arrayWithRowTestTable",
TestBoundedTable.of(FieldType.array(FieldType.row(innerRowSchema)), "col")
.addRows(
Arrays.asList(Row.withSchema(innerRowSchema).addValues("str", 1L).build())),
"nestedArrayTestTable",
TestBoundedTable.of(FieldType.array(FieldType.array(FieldType.INT64)), "col")
.addRows(Arrays.asList(Arrays.asList(1L, 2L, 3L), Arrays.asList(4L, 5L))),
"nestedRowTestTable",
TestBoundedTable.of(Schema.FieldType.row(nestedRowSchema), "col")
.addRows(
Row.withSchema(nestedRowSchema)
.addValues(
"str",
Row.withSchema(innerRowSchema).addValues("inner_str_one", 1L).build(),
2L,
Row.withSchema(innerRowSchema).addValues("inner_str_two", 3L).build())
.build()),
"basicRowTestTable",
TestBoundedTable.of(Schema.FieldType.row(innerRowSchema), "col")
.addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()),
"rowWithArrayTestTable",
TestBoundedTable.of(Schema.FieldType.row(rowWithArraySchema), "col")
.addRows(
Row.withSchema(rowWithArraySchema)
.addValues("str", 4L, Arrays.asList(5L, 6L))
.build())));
ImmutableMap.<String, BeamSqlTable>builder()
.put(
"arrayWithRowTestTable",
TestBoundedTable.of(FieldType.array(FieldType.row(innerRowSchema)), "col")
.addRows(
Arrays.asList(
Row.withSchema(innerRowSchema).addValues("str", 1L).build())))
.put(
"nestedArrayTestTable",
TestBoundedTable.of(FieldType.array(FieldType.array(FieldType.INT64)), "col")
.addRows(Arrays.asList(Arrays.asList(1L, 2L, 3L), Arrays.asList(4L, 5L))))
.put(
"nestedRowTestTable",
TestBoundedTable.of(FieldType.row(nestedRowSchema), "col")
.addRows(
Row.withSchema(nestedRowSchema)
.addValues(
"str",
Row.withSchema(innerRowSchema)
.addValues("inner_str_one", 1L)
.build(),
2L,
Row.withSchema(innerRowSchema)
.addValues("inner_str_two", 3L)
.build())
.build()))
.put(
"basicRowTestTable",
TestBoundedTable.of(FieldType.row(innerRowSchema), "col")
.addRows(Row.withSchema(innerRowSchema).addValues("innerStr", 1L).build()))
.put(
"rowWithArrayTestTable",
TestBoundedTable.of(FieldType.row(rowWithArraySchema), "col")
.addRows(
Row.withSchema(rowWithArraySchema)
.addValues("str", 4L, Arrays.asList(5L, 6L))
.build()))
.put(
"rowWithLogicalTypeSchema",
TestBoundedTable.of(FieldType.row(rowWithLogicalTypeSchema), "col")
.addRows(
Row.withSchema(rowWithLogicalTypeSchema)
.addValues(
"1234567890",
"1",
"1234567890".getBytes(StandardCharsets.UTF_8),
"1".getBytes(StandardCharsets.UTF_8))
.build()))
.build());

@Rule public transient TestPipeline pipeline = TestPipeline.create();

Expand Down Expand Up @@ -211,6 +247,23 @@ public void testRowWithArray() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

@Test
public void testRowWithLogicalTypeSchema() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
PCollection<Row> stream =
BeamSqlRelUtils.toPCollection(
pipeline,
sqlEnv.parseQuery(
"SELECT rowWithLogicalTypeSchema.col.field1, rowWithLogicalTypeSchema.col.field4 FROM rowWithLogicalTypeSchema"));
PAssert.that(stream)
.containsInAnyOrder(
Row.withSchema(
Schema.builder().addStringField("field1").addByteArrayField("field2").build())
.addValues("1234567890", "1".getBytes(StandardCharsets.UTF_8))
.build());
pipeline.run().waitUntilFinish(Duration.standardMinutes(2));
}

@Test
public void testFieldAccessToNestedRow() {
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(readOnlyTableProvider);
Expand Down

0 comments on commit a443df1

Please sign in to comment.