Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flattening and unflattening structured types #79

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
install:
./gradlew compileJava installDist

test:
./gradlew test -x spotbugsMain -x spotbugsTest -x spotbugsTestFixtures

build:
./gradlew build
docker build . -t hoptimator
Expand Down Expand Up @@ -77,4 +80,4 @@ release:
test -n "$(VERSION)" # MISSING ARG: $$VERSION
./gradlew publish

.PHONY: build clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
.PHONY: build test install clean quickstart deploy-dev-environment deploy deploy-samples deploy-demo deploy-config integration-tests bounce generate-models release
46 changes: 46 additions & 0 deletions hoptimator-avro/build.gradle
Original file line number Diff line number Diff line change
@@ -1,9 +1,55 @@
plugins {
id 'java'
id 'maven-publish'
}

dependencies {
implementation project(':hoptimator-api')
implementation libs.avro
implementation libs.calcite.core
}

publishing {
repositories {
maven {
name 'GitHubPackages'
url = 'https://maven.pkg.github.com/linkedin/Hoptimator'
credentials {
username = System.getenv('GITHUB_ACTOR')
password = System.getenv('GITHUB_TOKEN')
}
}
maven {
name 'LinkedInJFrog'
url 'https://linkedin.jfrog.io/artifactory/hoptimator'
credentials {
username = System.getenv('JFROG_USERNAME')
password = System.getenv('JFROG_API_KEY')
}
}
}
publications {
maven(MavenPublication) {
groupId = 'com.linkedin.hoptimator'
artifactId = 'hoptimator-avro'
version = System.getenv('VERSION')
from components.java
pom {
name = 'hoptimator-avro'
description = 'Hoptimator plugin for Apache Avro'
url = 'https://github.com/linkedin/Hoptimator'
licenses {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
url = 'https://github.com/linkedin/Hoptimator'
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class AdsSchema extends AbstractSchema {
public AdsSchema() {
tableMap.put("PAGE_VIEWS", new PageViewTable());
tableMap.put("AD_CLICKS", new AdClickTable());
tableMap.put("CAMPAIGNS", new CampaignTable());
}

@Override
Expand Down
1 change: 0 additions & 1 deletion hoptimator-jdbc-driver/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ plugins {
}

dependencies {
implementation project(':hoptimator-avro')
implementation project(':hoptimator-demodb')
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-util')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import com.linkedin.hoptimator.util.RemoteTable;


/** A table populated with all available Catlaogs. */
/** A table populated with all available Catalogs. */
public class CatalogTable extends RemoteTable<Catalog, CatalogTable.Row> {

// This and other Row classes are used by generated code, so it is important
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import java.sql.SQLException;
import java.sql.Wrapper;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.calcite.schema.SchemaPlus;
Expand All @@ -15,7 +15,7 @@
/** Built-in utility tables. */
public class UtilityCatalog extends AbstractSchema implements Catalog {

private final Map<String, Table> tableMap = new HashMap<>();
private final Map<String, Table> tableMap = new LinkedHashMap<>();

public UtilityCatalog() {
tableMap.put("PRINT", new PrintTable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.io.IOException;
import java.io.StringReader;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -47,10 +47,9 @@ public Map<String, String> configure(Source source) throws SQLException {
} catch (IOException e) {
throw new SQLException(e);
}
Map<String, String> map = new HashMap<>();
for (String key : props.stringPropertyNames()) {
map.put(key, props.getProperty(key));
}
Map<String, String> map = new LinkedHashMap<>();
props.stringPropertyNames().stream().sorted().forEach(k ->
map.put(k, props.getProperty(k)));
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public K8sDatabaseTable(K8sContext context) {
public void addDatabases(SchemaPlus parentSchema) {
for (Row row : rows()) {
parentSchema.add(schemaName(row),
HoptimatorJdbcSchema.create(row.NAME, null, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
HoptimatorJdbcSchema.create(row.NAME, row.SCHEMA, dataSource(row), parentSchema, dialect(row)));
}
}

Expand Down
4 changes: 2 additions & 2 deletions hoptimator-kafka/src/test/resources/kafka-ddl.id
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ spec:
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2', 'connector'='kafka')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1', 'connector'='kafka')
- CREATE TABLE IF NOT EXISTS `existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-2')
- CREATE TABLE IF NOT EXISTS `existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='existing-topic-1')
- INSERT INTO `existing-topic-1` (`KEY`, `VALUE`) SELECT * FROM `KAFKA`.`existing-topic-2`
jarURI: local:///opt/hoptimator-flink-runner.jar
parallelism: 1
Expand Down
61 changes: 61 additions & 0 deletions hoptimator-util/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,69 @@
plugins {
id 'java'
id 'maven-publish'
}

dependencies {
implementation project(':hoptimator-api')
implementation libs.calcite.core

testImplementation(testFixtures(project(':hoptimator-jdbc')))
testImplementation(platform('org.junit:junit-bom:5.11.3'))
testImplementation 'org.junit.jupiter:junit-jupiter'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine'
}

test {
useJUnitPlatform {
excludeTags 'integration'
}
testLogging {
events "passed", "skipped", "failed"
}
}

publishing {
repositories {
maven {
name 'GitHubPackages'
url = 'https://maven.pkg.github.com/linkedin/Hoptimator'
credentials {
username = System.getenv('GITHUB_ACTOR')
password = System.getenv('GITHUB_TOKEN')
}
}
maven {
name 'LinkedInJFrog'
url 'https://linkedin.jfrog.io/artifactory/hoptimator'
credentials {
username = System.getenv('JFROG_USERNAME')
password = System.getenv('JFROG_API_KEY')
}
}
}
publications {
maven(MavenPublication) {
groupId = 'com.linkedin.hoptimator'
artifactId = 'hoptimator-util'
version = System.getenv('VERSION')
from components.java
pom {
name = 'hoptimator-util'
description = 'Utilities to help with extending Hoptimator'
url = 'https://github.com/linkedin/Hoptimator'
licenses {
license {
name = 'BSD 2-Clause'
url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE'
}
}
scm {
connection = 'scm:git:git://github.com:linkedin/Hoptimator.git'
developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git'
url = 'https://github.com/linkedin/Hoptimator'
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
Expand All @@ -19,7 +19,7 @@ private ConnectionService() {
}

public static <T> Map<String, String> configure(T object, Class<T> clazz) throws SQLException {
Map<String, String> configs = new HashMap<>();
Map<String, String> configs = new LinkedHashMap<>();
for (Connector<T> connector : connectors(clazz)) {
configs.putAll(connector.configure(object));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package com.linkedin.hoptimator.util;

import java.util.Collections;
import java.util.List;
import java.util.LinkedHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.type.SqlTypeName;


public final class DataTypeUtils {

private DataTypeUtils() {
}

/**
* Flattens nested structs and complex arrays.
*
* Nested structs like `FOO Row(BAR Row(QUX VARCHAR)))` are promoted to
* top-level fields like `FOO$BAR$QUX VARCHAR`.
*
* Complex arrays are demoted to just `ANY ARRAY`. Primitive arrays are
* unchanged.
*
*/
public static RelDataType flatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be need anything complicated here once we support map types?

if (!dataType.isStruct()) {
return dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
flattenInto(typeFactory, dataType, builder, Collections.emptyList());
return builder.build();
}

private static void flattenInto(RelDataTypeFactory typeFactory, RelDataType dataType,
RelDataTypeFactory.Builder builder, List<String> path) {
if (dataType.getComponentType() != null && (dataType.getComponentType().isStruct()
|| dataType.getComponentType().getComponentType() != null)) {
// demote complex arrays to just `ANY ARRAY`
builder.add(path.stream().collect(Collectors.joining("$")), typeFactory.createArrayType(
typeFactory.createSqlType(SqlTypeName.ANY), -1));
} else if (!dataType.isStruct()) {
builder.add(path.stream().collect(Collectors.joining("$")), dataType);
} else {
for (RelDataTypeField field : dataType.getFieldList()) {
flattenInto(typeFactory, field.getType(), builder, Stream.concat(path.stream(),
Stream.of(field.getName())).collect(Collectors.toList()));
}
}
}

/** Restructures flattened types, from `FOO$BAR VARCHAR` to `FOO Row(BAR VARCHAR...)` */
public static RelDataType unflatten(RelDataType dataType, RelDataTypeFactory typeFactory) {
if (!dataType.isStruct()) {
throw new IllegalArgumentException("Can only unflatten a struct type.");
}
Node root = new Node();
for (RelDataTypeField field : dataType.getFieldList()) {
buildNodes(root, field.getName(), field.getType());
}
return buildRecord(root, typeFactory);
}

private static void buildNodes(Node pos, String name, RelDataType dataType) {
if (!name.contains("$")) {
pos.children.put(name, new Node(dataType));
} else {
String[] parts = name.split("\\$", 2);
Node child = pos.children.computeIfAbsent(parts[0], x -> new Node());
buildNodes(child, parts[1], dataType);
}
}

private static RelDataType buildRecord(Node node, RelDataTypeFactory typeFactory) {
if (node.dataType != null) {
return node.dataType;
}
RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(typeFactory);
for (LinkedHashMap.Entry<String, Node> child : node.children.entrySet()) {
builder.add(child.getKey(), buildRecord(child.getValue(), typeFactory));
}
return builder.build();
}

private static class Node {
RelDataType dataType;
LinkedHashMap<String, Node> children = new LinkedHashMap<>();

Node(RelDataType dataType) {
this.dataType = dataType;
}

Node() {
// nop
}
}
}

This file was deleted.

Loading
Loading