From a67fbfc6bc45692f58482e12e0d4aae0f1251292 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Thu, 29 Apr 2021 18:11:14 +0800 Subject: [PATCH] [hotfix][hive / test] Add an ITCase that checks partition-time commit pretty well --- .../connectors/hive/HiveTableSinkITCase.java | 188 ++++++++++++------ 1 file changed, 128 insertions(+), 60 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java index 410ebe90b31c7..493a5df51ad99 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSinkITCase.java @@ -23,15 +23,19 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.FiniteTestSource; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ExplainDetail; +import org.apache.flink.table.api.Expressions; +import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.ObjectPath; import org.apache.flink.table.catalog.hive.HiveCatalog; import org.apache.flink.table.catalog.hive.HiveTestUtils; -import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.CollectionUtil; import org.junit.AfterClass; @@ -46,7 +50,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.Consumer; import static org.apache.flink.table.api.Expressions.$; @@ -232,82 +239,133 @@ public void testStreamingSinkWithTimestampLtzWatermark() throws Exception { env.enableCheckpointing(100); StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env); + tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tEnv.useCatalog(hiveCatalog.getName()); tEnv.getConfig().setSqlDialect(SqlDialect.HIVE); try { tEnv.executeSql("create database db1"); tEnv.useDatabase("db1"); + + // source table DDL + tEnv.executeSql( + "create external table source_table (" + + " a int," + + " b string," + + " c string," + + " epoch_ts bigint)" + + " partitioned by (" + + " pt_day string, pt_hour string) TBLPROPERTIES(" + + "'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00'," + + "'streaming-source.enable'='true'," + + "'streaming-source.monitor-interval'='1s'," + + "'streaming-source.consume-order'='partition-time'" + + ")"); + tEnv.executeSql( "create external table sink_table (" + " a int," + " b string," + " c string)" - + " partitioned by (d string,e string)" - + " stored as parquet TBLPROPERTIES (" - + " 'partition.time-extractor.timestamp-pattern'='$d $e:00:00'," + + " partitioned by (" + + " d string, e string) TBLPROPERTIES(" + + " 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00'," + + " 'auto-compaction'='true'," + + " 'compaction.file-size' = '128MB'," + " 'sink.partition-commit.trigger'='partition-time'," - + " 'sink.partition-commit.delay'='1h'," + + " 'sink.partition-commit.delay'='30min'," + " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai'," + " 'sink.partition-commit.policy.kind'='metastore,success-file'," - + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS')"); + + " 'sink.partition-commit.success-file.name'='_MY_SUCCESS'," + + " 'streaming-source.enable'='true'," + + " 'streaming-source.monitor-interval'='1s'," + + " 'streaming-source.consume-order'='partition-time'" + + ")"); - // hive dialect only works with hive tables at the moment, switch to default dialect tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); - tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai")); - // prepare source - // epoch mills 1588460400000L <=> local timestamp 2020-05-03 07:00:00 in Shanghai - // epoch mills 1588464000000L <=> local timestamp 2020-05-03 08:00:00 in Shanghai - // epoch mills 1588467600000L <=> local timestamp 2020-05-03 09:00:00 in Shanghai - // epoch mills 1588471200000L <=> local timestamp 2020-05-03 10:00:00 in Shanghai - // epoch mills 1588474800000L <=> local timestamp 2020-05-03 11:00:00 in Shanghai - List data = - Arrays.asList( - Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L), - Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L), - Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L), - Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L), - Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L), - Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L), - Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L), - Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L), - Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L), - Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L)); - - String dataId = TestValuesTableFactory.registerData(data); - String sourceTableDDL = - String.format( - "create table my_table(" - + " a INT," - + " b STRING," - + " c STRING," - + " d STRING," - + " e STRING," - + " ts BIGINT," - + " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3)," - + " WATERMARK FOR ts_ltz as ts_ltz" - + ") with (" - + " 'connector' = 'values'," - + " 'data-id' = '%s'," - + " 'failing-source' = 'true')", - dataId); - tEnv.executeSql(sourceTableDDL); - tEnv.executeSql("insert into sink_table select a, b, c, d, e from my_table").await(); - - assertBatch( - "db1.sink_table", - Arrays.asList( - "+I[1, a, b, 2020-05-03, 7]", - "+I[1, a, b, 2020-05-03, 7]", - "+I[2, p, q, 2020-05-03, 8]", - "+I[2, p, q, 2020-05-03, 8]", - "+I[3, x, y, 2020-05-03, 9]", - "+I[3, x, y, 2020-05-03, 9]", - "+I[4, x, y, 2020-05-03, 10]", - "+I[4, x, y, 2020-05-03, 10]", - "+I[5, x, y, 2020-05-03, 11]", - "+I[5, x, y, 2020-05-03, 11]")); + // Build a partitioned table source with watermark base on the streaming-hive table + DataStream dataStream = + tEnv.toDataStream( + tEnv.sqlQuery( + "select a, b, c, epoch_ts, pt_day, pt_hour from source_table")); + Table table = + tEnv.fromDataStream( + dataStream, + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .column("c", DataTypes.STRING()) + .column("epoch_ts", DataTypes.BIGINT()) + .column("pt_day", DataTypes.STRING()) + .column("pt_hour", DataTypes.STRING()) + .columnByExpression( + "ts_ltz", + Expressions.callSql("TO_TIMESTAMP_LTZ(epoch_ts, 3)")) + .watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND") + .build()); + tEnv.createTemporaryView("my_table", table); + /* + * prepare test data, the poch mills used to define watermark, the watermark value is + * the max timestamp value of all the partition data, i.e: + * partition timestamp + 1 hour - 1 second in this case + * + *
+             * epoch mills 1588464000000L <=>  local timestamp 2020-05-03 08:00:00 in Shanghai
+             * epoch mills 1588467600000L <=>  local timestamp 2020-05-03 09:00:00 in Shanghai
+             * epoch mills 1588471200000L <=>  local timestamp 2020-05-03 10:00:00 in Shanghai
+             * epoch mills 1588474800000L <=>  local timestamp 2020-05-03 11:00:00 in Shanghai
+             * epoch mills 1588478400000L <=>  local timestamp 2020-05-03 12:00:00 in Shanghai
+             * 
+ */ + Map testData = new HashMap<>(); + testData.put(1, new Object[] {1, "a", "b", 1588464000000L}); + testData.put(2, new Object[] {2, "p", "q", 1588467600000L}); + testData.put(3, new Object[] {3, "x", "y", 1588471200000L}); + testData.put(4, new Object[] {4, "x", "y", 1588474800000L}); + testData.put(5, new Object[] {5, "x", "y", 1588478400000L}); + + Map testPartition = new HashMap<>(); + testPartition.put(1, "pt_day='2020-05-03',pt_hour='7'"); + testPartition.put(2, "pt_day='2020-05-03',pt_hour='8'"); + testPartition.put(3, "pt_day='2020-05-03',pt_hour='9'"); + testPartition.put(4, "pt_day='2020-05-03',pt_hour='10'"); + testPartition.put(5, "pt_day='2020-05-03',pt_hour='11'"); + + Map expectedData = new HashMap<>(); + expectedData.put(1, new Object[] {1, "a", "b", "2020-05-03", "7"}); + expectedData.put(2, new Object[] {2, "p", "q", "2020-05-03", "8"}); + expectedData.put(3, new Object[] {3, "x", "y", "2020-05-03", "9"}); + expectedData.put(4, new Object[] {4, "x", "y", "2020-05-03", "10"}); + expectedData.put(5, new Object[] {5, "x", "y", "2020-05-03", "11"}); + + tEnv.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table"); + CloseableIterator iter = tEnv.executeSql("select * from sink_table").collect(); + + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table") + .addRow(testData.get(1)) + .addRow(testData.get(1)) + .commit(testPartition.get(1)); + + for (int i = 2; i < 7; i++) { + try { + Thread.sleep(1_000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertEquals( + Arrays.asList( + Row.of(expectedData.get(i - 1)).toString(), + Row.of(expectedData.get(i - 1)).toString()), + fetchRows(iter, 2)); + + if (i < 6) { + HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table") + .addRow(testData.get(i)) + .addRow(testData.get(i)) + .commit(testPartition.get(i)); + } + } this.checkSuccessFiles( URI.create( hiveCatalog @@ -320,6 +378,16 @@ public void testStreamingSinkWithTimestampLtzWatermark() throws Exception { } } + private static List fetchRows(Iterator iter, int size) { + List strings = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Assert.assertTrue(iter.hasNext()); + strings.add(iter.next().toString()); + } + strings.sort(String::compareTo); + return strings; + } + private void checkSuccessFiles(String path) { File basePath = new File(path, "d=2020-05-03"); Assert.assertEquals(5, basePath.list().length);