Skip to content

Commit

Permalink
Flink SQL/Catalog Support (delta-io#555)
Browse files Browse the repository at this point in the history
* [FlinkSQL_PR_1] Flink Delta Sink - Table API UPDATED (delta-io#389)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>

* [FlinkSQL_PR_2] - SQL Support for Delta Source connector. (delta-io#487)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_3] - Delta catalog skeleton (delta-io#503)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_4] - Delta catalog - Interactions with DeltaLog. Create and get table. (delta-io#506)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_5] - Delta catalog - DDL option validation. (delta-io#509)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_6] - Delta catalog - alter table + tests. (delta-io#510)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_7] - Delta catalog - Restrict Delta Table factory to work only with Delta Catalog + tests. (delta-io#514)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_8] - Delta Catalog - DDL/Query hint validation + tests. (delta-io#520)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_9] - Delta Catalog - Adding Flink's Hive catalog as decorated catalog. (delta-io#524)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column. (delta-io#528)

* [FlinkSQL_PR_10] - Table API support SELECT with filter on partition column.

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Co-authored-by: Scott Sandre <scott.sandre@databricks.com>

* [FlinkSQL_PR_11] - Delta Catalog - cache DeltaLog instances in DeltaCatalog. (delta-io#529)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_12] - UML diagrams. (delta-io#530)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_13] - Remove mergeSchema option from SQL API. (delta-io#531)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* [FlinkSQL_PR_14] - SQL examples. (delta-io#535)

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>

* remove duplicate function after rebasing against master

---------

Signed-off-by: Krzysztof Chmielewski <krzysiek.chmielewski@gmail.com>
Signed-off-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
Co-authored-by: kristoffSC <krzysiek.chmielewski@gmail.com>
Co-authored-by: Paweł Kubit <pawel.kubit@getindata.com>
Co-authored-by: Krzysztof Chmielewski <krzysztof.chmielewski@getindata.com>
  • Loading branch information
3 people authored and tdas committed Jun 6, 2023
1 parent f17a6fd commit 47ae5a3
Show file tree
Hide file tree
Showing 97 changed files with 11,242 additions and 181 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ jobs:
run: build/sbt "++ ${{ matrix.scala }}" hiveMR/test hiveTez/test
- name: Run Hive 2 tests
run: build/sbt "++ ${{ matrix.scala }}" hive2MR/test hive2Tez/test
- name: Run Flink tests (Scala 2.11 and 2.12 only)
- name: Run Flink tests (Scala 2.12 only)
run: build/sbt -mem 3000 "++ ${{ matrix.scala }}" flink/test
if: ${{ !startsWith(matrix.scala, '2.13.') }}
if: ${{ startsWith(matrix.scala, '2.12.') }}
38 changes: 35 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,49 @@ lazy val flink = (project in file("flink"))
"org.apache.flink" % "flink-table-runtime" % flinkVersion % "provided",
"org.apache.flink" % "flink-scala_2.12" % flinkVersion % "provided",
"org.apache.flink" % "flink-runtime-web" % flinkVersion % "test",
"org.apache.flink" % "flink-sql-gateway-api" % flinkVersion % "test",
"org.apache.flink" % "flink-connector-hive_2.12" % flinkVersion % "provided",
"org.apache.flink" % "flink-table-planner_2.12" % flinkVersion % "provided",
"org.apache.flink" % "flink-connector-test-utils" % flinkVersion % "test",
"org.apache.flink" % "flink-clients" % flinkVersion % "test",
"org.apache.flink" % "flink-test-utils" % flinkVersion % "test",
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % "test" classifier "tests",
"org.mockito" % "mockito-inline" % "3.8.0" % "test",
"org.mockito" % "mockito-inline" % "4.11.0" % "test",
"net.aichler" % "jupiter-interface" % JupiterKeys.jupiterVersion.value % Test,
"org.junit.vintage" % "junit-vintage-engine" % "5.8.2" % "test",
"org.mockito" % "mockito-junit-jupiter" % "4.5.0" % "test",
"org.mockito" % "mockito-junit-jupiter" % "4.11.0" % "test",
"org.junit.jupiter" % "junit-jupiter-params" % "5.8.2" % "test",
"io.github.artsok" % "rerunner-jupiter" % "2.1.6" % "test",

// Exclusions due to conflicts with Flink's libraries from table planer, hive, calcite etc.
"org.apache.hive" % "hive-metastore" % "3.1.2" % "test" excludeAll(
ExclusionRule("org.apache.avro", "avro"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("org.pentaho"),
ExclusionRule("org.apache.hbase"),
ExclusionRule("org.apache.hbase"),
ExclusionRule("co.cask.tephra"),
ExclusionRule("com.google.code.findbugs", "jsr305"),
ExclusionRule("org.eclipse.jetty.aggregate", "module: 'jetty-all"),
ExclusionRule("org.eclipse.jetty.orbit", "javax.servlet"),
ExclusionRule("org.apache.parquet", "parquet-hadoop-bundle"),
ExclusionRule("com.tdunning", "json"),
ExclusionRule("javax.transaction", "transaction-api"),
ExclusionRule("'com.zaxxer", "HikariCP"),
),
// Exclusions due to conflicts with Flink's libraries from table planer, hive, calcite etc.
"org.apache.hive" % "hive-exec" % "3.1.2" % "test" classifier "core" excludeAll(
ExclusionRule("'org.apache.avro", "avro"),
ExclusionRule("org.slf4j", "slf4j-log4j12"),
ExclusionRule("org.pentaho"),
ExclusionRule("com.google.code.findbugs", "jsr305"),
ExclusionRule("org.apache.calcite.avatica"),
ExclusionRule("org.apache.calcite"),
ExclusionRule("org.apache.hive", "hive-llap-tez"),
ExclusionRule("org.apache.logging.log4j"),
ExclusionRule("com.google.protobuf", "protobuf-java"),
),

// Compiler plugins
// -- Bump up the genjavadoc version explicitly to 0.18 to work with Scala 2.12
compilerPlugin("com.typesafe.genjavadoc" %% "genjavadoc-plugin" % "0.18" cross CrossVersion.full)
Expand All @@ -761,7 +793,7 @@ lazy val flink = (project in file("flink"))
Compile / sourceGenerators += Def.task {
val file = (Compile / sourceManaged).value / "meta" / "Meta.java"
IO.write(file,
s"""package io.delta.flink.sink.internal.committer;
s"""package io.delta.flink.internal;
|
|final class Meta {
| public static final String FLINK_VERSION = "${flinkVersion}";
Expand Down
10 changes: 8 additions & 2 deletions examples/flink-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ limitations under the License.-->
<maven.compiler.target>1.8</maven.compiler.target>
<staging.repo.url>""</staging.repo.url>
<scala.main.version>2.12</scala.main.version>
<connectors.version>0.6.0</connectors.version>
<connectors.version>0.6.0-SNAPSHOT</connectors.version>
<flink-version>1.16.1</flink-version>
<hadoop-version>3.1.0</hadoop-version>
<log4j.version>2.12.1</log4j.version>
Expand Down Expand Up @@ -56,7 +56,7 @@ limitations under the License.-->
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink-version}</version>
<scope>test</scope>
<scope>${flink.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down Expand Up @@ -86,6 +86,12 @@ limitations under the License.-->
<version>${flink-version}</version>
<scope>${flink.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink-version}</version>
<scope>${flink.scope}</scope>
</dependency>

<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.example.sql;

import java.util.UUID;
import java.util.concurrent.TimeUnit;

import io.delta.flink.source.DeltaSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
import org.utils.Utils;
import static org.utils.job.sql.SqlExampleBase.createTableStreamingEnv;
import static org.utils.job.sql.SqlExampleBase.createTestStreamEnv;

/**
* This is an example of using Delta Connector both in Streaming and Table API. In this example a
* Delta Source will be created using Streaming API and will be registered as Flink table. Next we
* will use Flink SQL to read data from it using SELECT statement and write back to newly created
* Delta table defined by CREATE TABLE statement.
*/
public class StreamingApiDeltaSourceToTableDeltaSinkJob {

private static final String SOURCE_TABLE_PATH = Utils.resolveExampleTableAbsolutePath(
"data/source_table_no_partitions");

private static final String SINK_TABLE_PATH = Utils.resolveExampleTableAbsolutePath(
"example_streamingToTableAPI_table_" + UUID.randomUUID().toString().split("-")[0]);

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = createTestStreamEnv(false); // isStreaming = false
StreamTableEnvironment tableEnv = createTableStreamingEnv(streamEnv);
createPipeline(streamEnv, tableEnv);
}

private static void createPipeline(
StreamExecutionEnvironment streamEnv,
StreamTableEnvironment tableEnv) throws Exception {

// Set up a Delta Source using Flink's Streaming API.
DeltaSource<RowData> deltaSource = DeltaSource.forBoundedRowData(
new Path(SOURCE_TABLE_PATH),
new Configuration()
).build();

// create a source stream from Delta Source connector.
DataStreamSource<RowData> sourceStream =
streamEnv.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");

// setup Delta Catalog
tableEnv.executeSql("CREATE CATALOG myDeltaCatalog WITH ('type' = 'delta-catalog')");
tableEnv.executeSql("USE CATALOG myDeltaCatalog");

// Convert source stream into Flink's table and register it as temporary view under
// "InputTable" name.
Table sourceTable = tableEnv.fromDataStream(sourceStream);
tableEnv.createTemporaryView("InputTable", sourceTable);

// Create Sink Delta table using Flink SQL API.
tableEnv.executeSql(String.format(""
+ "CREATE TABLE sinkTable ("
+ "f1 STRING,"
+ "f2 STRING,"
+ "f3 INT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
SINK_TABLE_PATH)
);

// Insert into sinkTable all rows read by Delta Source that is registered as "InputTable"
// view.
tableEnv.executeSql("INSERT INTO sinkTable SELECT * FROM InputTable")
.await(10, TimeUnit.SECONDS);

// Read and print all rows from sinkTable using Flink SQL.
tableEnv.executeSql("SELECT * FROM sinkTable").print();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package org.example.sql.insert;

import java.util.UUID;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.utils.Utils;
import org.utils.job.sql.SqlSinkExampleBase;

/**
* This is an example of executing a INSERT query on Delta Table using Flink SQL.
*/
public class InsertTableExample extends SqlSinkExampleBase {

static String TABLE_PATH = Utils.resolveExampleTableAbsolutePath(
"example_table_" + UUID.randomUUID().toString().split("-")[0]);

public static void main(String[] args)
throws Exception {
new InsertTableExample().run(TABLE_PATH);
}

@Override
protected Table runSqlJob(String tablePath, StreamTableEnvironment tableEnv) {

// setup Delta Catalog
tableEnv.executeSql("CREATE CATALOG myDeltaCatalog WITH ('type' = 'delta-catalog')");
tableEnv.executeSql("USE CATALOG myDeltaCatalog");

// SQL definition for Delta Table where we will insert rows.
tableEnv.executeSql(String.format(""
+ "CREATE TABLE sinkTable ("
+ "f1 STRING,"
+ "f2 STRING,"
+ "f3 INT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
tablePath)
);

// A SQL query that inserts three rows (three columns per row) into sinkTable.
tableEnv.executeSql(""
+ "INSERT INTO sinkTable VALUES "
+ "('a', 'b', 1),"
+ "('c', 'd', 2),"
+ "('e', 'f', 3)"
);
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.example.sql.select.bounded;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.utils.Utils;
import org.utils.job.sql.BoundedSqlSourceExampleBase;

/**
* This is an example of executing a bounded SELECT query on Delta Table using Flink SQL.
*/
public class SelectBoundedTableExample extends BoundedSqlSourceExampleBase {

private static final String TABLE_PATH =
Utils.resolveExampleTableAbsolutePath("data/source_table_no_partitions");

public static void main(String[] args) throws Exception {
new SelectBoundedTableExample().run(TABLE_PATH);
}

@Override
protected Table runSqlJob(String tablePath, StreamTableEnvironment tableEnv) {

// setup Delta Catalog
tableEnv.executeSql("CREATE CATALOG myDeltaCatalog WITH ('type' = 'delta-catalog')");
tableEnv.executeSql("USE CATALOG myDeltaCatalog");

// SQL definition for Delta Table where we will insert rows.
tableEnv.executeSql(String.format(""
+ "CREATE TABLE sourceTable ("
+ "f1 STRING,"
+ "f2 STRING,"
+ "f3 INT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
tablePath)
);

// A batch SQL query that fetches all columns from sourceTable. The batch mode is a
// default mode for SQL queries on Delta Table.
return tableEnv.sqlQuery("SELECT * FROM sourceTable");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.example.sql.select.bounded;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.utils.Utils;
import org.utils.job.sql.BoundedSqlSourceExampleBase;

/**
* This is an example of executing a bounded SELECT query on Delta Table using Flink SQL
* that will read Delta table from version specified by `versionAsOf` option.
*/
public class SelectBoundedTableVersionAsOfExample extends BoundedSqlSourceExampleBase {

private static final String TABLE_PATH =
Utils.resolveExampleTableAbsolutePath("data/source_table_no_partitions");

public static void main(String[] args) throws Exception {
new SelectBoundedTableVersionAsOfExample().run(TABLE_PATH);
}

@Override
protected Table runSqlJob(String tablePath, StreamTableEnvironment tableEnv) {

// setup Delta Catalog
tableEnv.executeSql("CREATE CATALOG myDeltaCatalog WITH ('type' = 'delta-catalog')");
tableEnv.executeSql("USE CATALOG myDeltaCatalog");

// SQL definition for Delta Table where we will insert rows.
tableEnv.executeSql(String.format(""
+ "CREATE TABLE sourceTable ("
+ "f1 STRING,"
+ "f2 STRING,"
+ "f3 INT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
tablePath)
);

// A SQL query that fetches all columns from sourceTable starting from Delta version 1.
// This query runs in batch mode which is a default mode for SQL queries on Delta Table.
return tableEnv.sqlQuery("SELECT * FROM sourceTable /*+ OPTIONS('versionAsOf' = '1') */");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.example.sql.select.continuous;

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.utils.Utils;
import org.utils.job.sql.ContinuousSqlSourceExampleBase;

/**
* This is an example of executing a continuous SELECT query on Delta Table using Flink SQL.
*/
public class SelectContinuousTableExample extends ContinuousSqlSourceExampleBase {

private static final String TABLE_PATH =
Utils.resolveExampleTableAbsolutePath("data/source_table_no_partitions");

public static void main(String[] args) throws Exception {
new SelectContinuousTableExample().run(TABLE_PATH);
}

@Override
protected Table runSqlJob(String tablePath, StreamTableEnvironment tableEnv) {

// setup Delta Catalog
tableEnv.executeSql("CREATE CATALOG myDeltaCatalog WITH ('type' = 'delta-catalog')");
tableEnv.executeSql("USE CATALOG myDeltaCatalog");

// SQL definition for Delta Table where we will insert rows.
tableEnv.executeSql(String.format(""
+ "CREATE TABLE sourceTable ("
+ "f1 STRING,"
+ "f2 STRING,"
+ "f3 INT"
+ ") WITH ("
+ " 'connector' = 'delta',"
+ " 'table-path' = '%s'"
+ ")",
tablePath)
);

// A SQL query that fetches all columns from sourceTable.
// This query runs in continuous mode.
return tableEnv.sqlQuery("SELECT * FROM sourceTable /*+ OPTIONS('mode' = 'streaming') */");
}
}
Loading

0 comments on commit 47ae5a3

Please sign in to comment.