Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flattening and unflattening structured types #79

Merged
merged 1 commit into from
Dec 23, 2024

Conversation

ryannedolan
Copy link
Collaborator

Summary

  • Added DataTypeUtils with flatten and unflatten.
  • Added hoptimator-utils and hoptimator-avro artifacts.
  • Added support for transparently unflattening records in target execution engines (e.g. Flink).
  • Fixed (more) non-determinism in integration tests.

Details

SQL via JDBC has poor support for structured/nested types. Nested records show up with opaque STRUCTURED types, which have limited utility in the SQL CLI or JDBC driver.

In order to offer improved support for complex records in data sources such as Avro-encoded Kafka topics, this PR introduces support for flattened types. If a JDBC driver so chooses, it may flatten complex records using a FOO$BAR$QUX naming convention. These will automatically get re-structured as FOO Row(BAR Row(QUX ...)), FOO.BAR.QUX, or FOO_BAR_QUX (depending on context) in the output SQL job.

Arrays with nested structs are elided as ANY ARRAY, which is the closest we can get in JDBC. Primitive arrays are supported, e.g. INTEGER ARRAY.

This means SQL authors need only deal in primitive types, or arrays of primitive types.

N.B. we still support non-flattened data sources, but this requires writing a custom Calcite adapter, for now.

Testing

In addition to new unit tests, the new logic was tested against production Kafka topics with complex Avro schemas. The Kafka JDBC driver was modified to invoke DataTypeUtils.flatten().

(Details elided.)

0: Hoptimator> !pipeline select "document$actorUrn" from tracking."XyzEvent"
CREATE TABLE IF NOT EXISTS `XyzEvent` (`header` ROW(`memberId` INTEGER, ...), `document` ROW(`partitionKey` VARCHAR, `documentUrn` VARCHAR, ...)) WITH ('cluster'='tracking', 'connector'='likafka', 'fabric'='...', 'topic'='XyzEvent ');
CREATE TABLE IF NOT EXISTS `SINK` (`document_actorUrn` VARCHAR) WITH ('cluster'='tracking', 'connector'='likafka', 'fabric'='...', 'topic'='SINK ');
INSERT INTO `SINK` (`document_actorUrn`) SELECT `document`.`actorUrn` FROM `TRACKING`.`XyzEvent`;

As expected:

  • The source table retains the complex structure of the underlying Avro schema.
  • The author's query is flat, but the output SQL is properly restructured.
  • The generated sink table is flat. This is preferable to inheriting the complex structure of the source table.

@ryannedolan ryannedolan mentioned this pull request Dec 23, 2024
@@ -134,7 +144,14 @@ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {

@Override
public void implement(Implementor implementor) throws SQLException {
implementor.setSink(database, table.getQualifiedName(), table.getRowType(), Collections.emptyMap());
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
RelDataType flattened = DataTypeUtils.flatten(table.getRowType(), typeFactory);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jogrogan I wonder if this would make KEY ROW(...) possible? If you were to take a Venice key schema and shove it into a nested record named KEY, it would get flattened as KEY$FOO, KEY$BAR automatically. And then in the sink table it would appear as KEY_FOO, KEY_BAR (see comment below).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't thinking of keys when I wrote this, but it seems like it will help. We could have select key$foo from venice.t1, and the result would be key_foo in the physical venice sink.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea that makes sense, I can play around with it once merged

* unchanged.
*
*/
public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be need anything complicated here once we support map types?

Copy link
Collaborator

@jogrogan jogrogan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ryannedolan ryannedolan merged commit 64b7b73 into main Dec 23, 2024
1 check passed
@ryannedolan ryannedolan deleted the flatten-unflatten branch December 23, 2024 21:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants