Skip to content

Commit

Permalink
Flink: Refactoring code and properties to make Flink 1.19 to work
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses authored and pvary committed Apr 16, 2024
1 parent f761d98 commit b3ebcf1
Show file tree
Hide file tree
Showing 25 changed files with 110 additions and 79 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/flink-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
strategy:
matrix:
jvm: [8, 11]
flink: ['1.16', '1.17', '1.18']
flink: ['1.17', '1.18', '1.19']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
2 changes: 1 addition & 1 deletion dev/stage-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#

SCALA_VERSION=2.12
FLINK_VERSIONS=1.16,1.17,1.18
FLINK_VERSIONS=1.17,1.18,1.19
SPARK_VERSIONS=3.3,3.4,3.5
HIVE_VERSIONS=2,3

Expand Down
7 changes: 4 additions & 3 deletions flink/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",")

if (flinkVersions.contains("1.16")) {
apply from: file("$projectDir/v1.16/build.gradle")
}

if (flinkVersions.contains("1.17")) {
apply from: file("$projectDir/v1.17/build.gradle")
Expand All @@ -30,3 +27,7 @@ if (flinkVersions.contains("1.17")) {
if (flinkVersions.contains("1.18")) {
apply from: file("$projectDir/v1.18/build.gradle")
}

if (flinkVersions.contains("1.19")) {
apply from: file("$projectDir/v1.19/build.gradle")
}
36 changes: 18 additions & 18 deletions flink/v1.19/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

String flinkMajorVersion = '1.18'
String flinkMajorVersion = '1.19'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")

project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
Expand All @@ -32,15 +32,15 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-parquet')
implementation project(':iceberg-hive-metastore')

compileOnly libs.flink118.avro
compileOnly libs.flink119.avro
// for dropwizard histogram metrics implementation
compileOnly libs.flink118.metrics.dropwizard
compileOnly libs.flink118.streaming.java
compileOnly "${libs.flink118.streaming.java.get().module}:${libs.flink118.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink118.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
compileOnly libs.flink118.connector.base
compileOnly libs.flink118.connector.files
compileOnly libs.flink119.metrics.dropwizard
compileOnly libs.flink119.streaming.java
compileOnly "${libs.flink119.streaming.java.get().module}:${libs.flink119.streaming.java.get().getVersion()}:tests"
compileOnly libs.flink119.table.api.java.bridge
compileOnly "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"
compileOnly libs.flink119.connector.base
compileOnly libs.flink119.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
Expand All @@ -66,13 +66,13 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'org.slf4j'
}

testImplementation libs.flink118.connector.test.utils
testImplementation libs.flink118.core
testImplementation libs.flink118.runtime
testImplementation(libs.flink118.test.utilsjunit) {
testImplementation libs.flink119.connector.test.utils
testImplementation libs.flink119.core
testImplementation libs.flink119.runtime
testImplementation(libs.flink119.test.utilsjunit) {
exclude group: 'junit'
}
testImplementation(libs.flink118.test.utils) {
testImplementation(libs.flink119.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}
Expand Down Expand Up @@ -166,7 +166,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
}

// for dropwizard histogram metrics implementation
implementation libs.flink118.metrics.dropwizard
implementation libs.flink119.metrics.dropwizard

// for integration testing with the flink-runtime-jar
// all of those dependencies are required because the integration test extends FlinkTestBase
Expand All @@ -176,13 +176,13 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation project(path: ":iceberg-flink:iceberg-flink-${flinkMajorVersion}", configuration: "testArtifacts")
integrationImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
integrationImplementation(libs.flink118.test.utils) {
integrationImplementation(libs.flink119.test.utils) {
exclude group: "org.apache.curator", module: 'curator-test'
exclude group: 'junit'
}

integrationImplementation libs.flink118.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"
integrationImplementation libs.flink119.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public class FlinkCatalogFactory implements CatalogFactory {
public static final String HADOOP_CONF_DIR = "hadoop-conf-dir";
public static final String DEFAULT_DATABASE = "default-database";
public static final String DEFAULT_DATABASE_NAME = "default";
public static final String DEFAULT_CATALOG_NAME = "default_catalog";
public static final String BASE_NAMESPACE = "base-namespace";

public static final String TYPE = "type";
public static final String PROPERTY_VERSION = "property-version";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;

import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;

import java.util.List;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
Expand Down Expand Up @@ -126,4 +128,20 @@ protected void dropCatalog(String catalogName, boolean ifExists) {
sql("USE CATALOG default_catalog");
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
}

/**
* We can not drop currently used database after FLINK-33226, so we have make sure that we do not
* use the current database before dropping it. This method switches to the default database in
* the default catalog, and then it and drops the one requested.
*
* @param database The database to drop
* @param ifExists If we should use the 'IF EXISTS' when dropping the database
*/
protected void dropDatabase(String database, boolean ifExists) {
String currentCatalog = getTableEnv().getCurrentCatalog();
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
sql("USE %s", getTableEnv().listDatabases()[0]);
sql("USE CATALOG %s", currentCatalog);
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.flink;

import static org.apache.iceberg.flink.FlinkCatalogFactory.DEFAULT_CATALOG_NAME;

import java.nio.file.Path;
import java.util.List;
import org.apache.flink.table.api.EnvironmentSettings;
Expand Down Expand Up @@ -124,7 +126,23 @@ protected void assertSameElements(String message, Iterable<Row> expected, Iterab
* @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
*/
protected void dropCatalog(String catalogName, boolean ifExists) {
sql("USE CATALOG default_catalog");
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
}

/**
* We can not drop currently used database after FLINK-33226, so we have make sure that we do not
* use the current database before dropping it. This method switches to the default database in
* the default catalog, and then it and drops the one requested.
*
* @param database The database to drop
* @param ifExists If we should use the 'IF EXISTS' when dropping the database
*/
protected void dropDatabase(String database, boolean ifExists) {
String currentCatalog = getTableEnv().getCurrentCatalog();
sql("USE CATALOG %s", DEFAULT_CATALOG_NAME);
sql("USE %s", getTableEnv().listDatabases()[0]);
sql("USE CATALOG %s", currentCatalog);
sql("DROP DATABASE %s %s", ifExists ? "IF EXISTS" : "", database);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void before() {
@Override
public void clean() {
sql("DROP TABLE IF EXISTS %s", TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
dropDatabase(DATABASE_NAME, true);
dropCatalog(CATALOG_NAME, true);
BoundedTableFactory.clearDataSets();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TestFlinkCatalogDatabase extends CatalogTestBase {
@Override
public void clean() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand All @@ -61,7 +61,7 @@ public void testCreateNamespace() {
.as("Database should still exist")
.isTrue();

sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
.as("Database should be dropped")
.isFalse();
Expand All @@ -81,7 +81,7 @@ public void testDropEmptyDatabase() {
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
.as("Namespace should exist")
.isTrue();
sql("DROP DATABASE %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
assertThat(validationNamespaceCatalog.namespaceExists(icebergNamespace))
.as("Namespace should have been dropped")
.isFalse();
Expand All @@ -105,7 +105,7 @@ public void testDropNonEmptyNamespace() {
assertThat(validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")))
.as("Table should exist")
.isTrue();
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
Assertions.assertThatThrownBy(() -> dropDatabase(flinkDatabase, true))
.cause()
.isInstanceOf(DatabaseNotEmptyException.class)
.hasMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void before() {
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.tl", flinkDatabase);
sql("DROP TABLE IF EXISTS %s.tl2", flinkDatabase);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void before() {
@AfterEach
public void cleanNamespaces() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void checkSQLQuery(Map<String, String> catalogProperties, File warehouse
"Should have a .crc file and a .parquet file", 2, Files.list(dataPath).count());

sql("DROP TABLE test_table");
sql("DROP DATABASE test_db");
dropDatabase("test_db", false);
dropCatalog("test_catalog", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
BoundedTableFactory.clearDataSets();
super.clean();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void before() {
@Override
@AfterEach
public void clean() {
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,20 +257,12 @@ public void testCreateTableUnderDefaultDatabase() {
public void testCatalogDatabaseConflictWithFlinkDatabase() {
sql("CREATE DATABASE IF NOT EXISTS `%s`", databaseName());
sql("USE `%s`", databaseName());

try {
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
Assertions.assertThatThrownBy(
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME))
.isInstanceOf(org.apache.flink.table.api.TableException.class)
.hasMessageStartingWith("Could not execute CreateTable in path");
} finally {
sql("DROP TABLE IF EXISTS `%s`.`%s`", databaseName(), TABLE_NAME);
if (!isDefaultDatabaseName()) {
sql("DROP DATABASE `%s`", databaseName());
}
}
testCreateConnectorTable();
// Ensure that the table was created under the specific database.
Assertions.assertThatThrownBy(
() -> sql("CREATE TABLE `default_catalog`.`%s`.`%s`", databaseName(), TABLE_NAME))
.isInstanceOf(org.apache.flink.table.api.TableException.class)
.hasMessageStartingWith("Could not execute CreateTable in path");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTITIONED);
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTITIONED);
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_WITH_PK);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
Expand Down Expand Up @@ -234,7 +235,8 @@ private StateInitializationContext getStateContext() throws Exception {
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
OperatorStateStore operatorStateStore =
abstractStateBackend.createOperatorStateBackend(
env, "test-operator", Collections.emptyList(), cancelStreamRegistry);
new OperatorStateBackendParametersImpl(
env, "test-operator", Collections.emptyList(), cancelStreamRegistry));
return new StateInitializationContextImpl(null, operatorStateStore, null, null, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void before() {
@After
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", DATABASE_NAME, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", DATABASE_NAME);
dropDatabase(DATABASE_NAME, true);
dropCatalog(CATALOG_NAME, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void before() {
@AfterEach
public void clean() {
sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE);
sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
dropDatabase(flinkDatabase, true);
super.clean();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class TestFlinkPackage {
/** This unit test would need to be adjusted as new Flink version is supported. */
@Test
public void testVersion() {
assertThat(FlinkPackage.version()).isEqualTo("1.18.1");
assertThat(FlinkPackage.version()).isEqualTo("1.19.0");
}

@Test
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhJsonOutputPath=build/reports/jmh/results.json
jmhIncludeRegex=.*
systemProp.defaultFlinkVersions=1.18
systemProp.knownFlinkVersions=1.16,1.17,1.18
systemProp.defaultFlinkVersions=1.19
systemProp.knownFlinkVersions=1.17,1.18,1.19
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.5
Expand Down
Loading

0 comments on commit b3ebcf1

Please sign in to comment.