Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.CollectionUtil;

import org.junit.AfterClass;
Expand All @@ -46,7 +50,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.flink.table.api.Expressions.$;
Expand Down Expand Up @@ -232,82 +239,133 @@ public void testStreamingSinkWithTimestampLtzWatermark() throws Exception {
env.enableCheckpointing(100);
StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env);

tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
tEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
tEnv.useCatalog(hiveCatalog.getName());
tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
try {
tEnv.executeSql("create database db1");
tEnv.useDatabase("db1");

// source table DDL
tEnv.executeSql(
"create external table source_table ("
+ " a int,"
+ " b string,"
+ " c string,"
+ " epoch_ts bigint)"
+ " partitioned by ("
+ " pt_day string, pt_hour string) TBLPROPERTIES("
+ "'partition.time-extractor.timestamp-pattern'='$pt_day $pt_hour:00:00',"
+ "'streaming-source.enable'='true',"
+ "'streaming-source.monitor-interval'='1s',"
+ "'streaming-source.consume-order'='partition-time'"
+ ")");

tEnv.executeSql(
"create external table sink_table ("
+ " a int,"
+ " b string,"
+ " c string)"
+ " partitioned by (d string,e string)"
+ " stored as parquet TBLPROPERTIES ("
+ " 'partition.time-extractor.timestamp-pattern'='$d $e:00:00',"
+ " partitioned by ("
+ " d string, e string) TBLPROPERTIES("
+ " 'partition.time-extractor.timestamp-pattern' = '$d $e:00:00',"
+ " 'auto-compaction'='true',"
+ " 'compaction.file-size' = '128MB',"
+ " 'sink.partition-commit.trigger'='partition-time',"
+ " 'sink.partition-commit.delay'='1h',"
+ " 'sink.partition-commit.delay'='30min',"
+ " 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai',"
+ " 'sink.partition-commit.policy.kind'='metastore,success-file',"
+ " 'sink.partition-commit.success-file.name'='_MY_SUCCESS')");
+ " 'sink.partition-commit.success-file.name'='_MY_SUCCESS',"
+ " 'streaming-source.enable'='true',"
+ " 'streaming-source.monitor-interval'='1s',"
+ " 'streaming-source.consume-order'='partition-time'"
+ ")");

// hive dialect only works with hive tables at the moment, switch to default dialect
tEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
// prepare source
// epoch mills 1588460400000L <=> local timestamp 2020-05-03 07:00:00 in Shanghai
// epoch mills 1588464000000L <=> local timestamp 2020-05-03 08:00:00 in Shanghai
// epoch mills 1588467600000L <=> local timestamp 2020-05-03 09:00:00 in Shanghai
// epoch mills 1588471200000L <=> local timestamp 2020-05-03 10:00:00 in Shanghai
// epoch mills 1588474800000L <=> local timestamp 2020-05-03 11:00:00 in Shanghai
List<Row> data =
Arrays.asList(
Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
Row.of(1, "a", "b", "2020-05-03", "7", 1588460400000L),
Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
Row.of(2, "p", "q", "2020-05-03", "8", 1588464000000L),
Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
Row.of(3, "x", "y", "2020-05-03", "9", 1588467600000L),
Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
Row.of(4, "x", "y", "2020-05-03", "10", 1588471200000L),
Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L),
Row.of(5, "x", "y", "2020-05-03", "11", 1588474800000L));

String dataId = TestValuesTableFactory.registerData(data);
String sourceTableDDL =
String.format(
"create table my_table("
+ " a INT,"
+ " b STRING,"
+ " c STRING,"
+ " d STRING,"
+ " e STRING,"
+ " ts BIGINT,"
+ " ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),"
+ " WATERMARK FOR ts_ltz as ts_ltz"
+ ") with ("
+ " 'connector' = 'values',"
+ " 'data-id' = '%s',"
+ " 'failing-source' = 'true')",
dataId);
tEnv.executeSql(sourceTableDDL);
tEnv.executeSql("insert into sink_table select a, b, c, d, e from my_table").await();

assertBatch(
"db1.sink_table",
Arrays.asList(
"+I[1, a, b, 2020-05-03, 7]",
"+I[1, a, b, 2020-05-03, 7]",
"+I[2, p, q, 2020-05-03, 8]",
"+I[2, p, q, 2020-05-03, 8]",
"+I[3, x, y, 2020-05-03, 9]",
"+I[3, x, y, 2020-05-03, 9]",
"+I[4, x, y, 2020-05-03, 10]",
"+I[4, x, y, 2020-05-03, 10]",
"+I[5, x, y, 2020-05-03, 11]",
"+I[5, x, y, 2020-05-03, 11]"));

// Build a partitioned table source with watermark base on the streaming-hive table
DataStream<Row> dataStream =
tEnv.toDataStream(
tEnv.sqlQuery(
"select a, b, c, epoch_ts, pt_day, pt_hour from source_table"));
Table table =
tEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.column("a", DataTypes.INT())
.column("b", DataTypes.STRING())
.column("c", DataTypes.STRING())
.column("epoch_ts", DataTypes.BIGINT())
.column("pt_day", DataTypes.STRING())
.column("pt_hour", DataTypes.STRING())
.columnByExpression(
"ts_ltz",
Expressions.callSql("TO_TIMESTAMP_LTZ(epoch_ts, 3)"))
.watermark("ts_ltz", "ts_ltz - INTERVAL '1' SECOND")
.build());
tEnv.createTemporaryView("my_table", table);
/*
* prepare test data, the poch mills used to define watermark, the watermark value is
* the max timestamp value of all the partition data, i.e:
* partition timestamp + 1 hour - 1 second in this case
*
* <pre>
* epoch mills 1588464000000L <=> local timestamp 2020-05-03 08:00:00 in Shanghai
* epoch mills 1588467600000L <=> local timestamp 2020-05-03 09:00:00 in Shanghai
* epoch mills 1588471200000L <=> local timestamp 2020-05-03 10:00:00 in Shanghai
* epoch mills 1588474800000L <=> local timestamp 2020-05-03 11:00:00 in Shanghai
* epoch mills 1588478400000L <=> local timestamp 2020-05-03 12:00:00 in Shanghai
* </pre>
*/
Map<Integer, Object[]> testData = new HashMap<>();
testData.put(1, new Object[] {1, "a", "b", 1588464000000L});
testData.put(2, new Object[] {2, "p", "q", 1588467600000L});
testData.put(3, new Object[] {3, "x", "y", 1588471200000L});
testData.put(4, new Object[] {4, "x", "y", 1588474800000L});
testData.put(5, new Object[] {5, "x", "y", 1588478400000L});

Map<Integer, String> testPartition = new HashMap<>();
testPartition.put(1, "pt_day='2020-05-03',pt_hour='7'");
testPartition.put(2, "pt_day='2020-05-03',pt_hour='8'");
testPartition.put(3, "pt_day='2020-05-03',pt_hour='9'");
testPartition.put(4, "pt_day='2020-05-03',pt_hour='10'");
testPartition.put(5, "pt_day='2020-05-03',pt_hour='11'");

Map<Integer, Object[]> expectedData = new HashMap<>();
expectedData.put(1, new Object[] {1, "a", "b", "2020-05-03", "7"});
expectedData.put(2, new Object[] {2, "p", "q", "2020-05-03", "8"});
expectedData.put(3, new Object[] {3, "x", "y", "2020-05-03", "9"});
expectedData.put(4, new Object[] {4, "x", "y", "2020-05-03", "10"});
expectedData.put(5, new Object[] {5, "x", "y", "2020-05-03", "11"});

tEnv.executeSql("insert into sink_table select a, b, c, pt_day, pt_hour from my_table");
CloseableIterator<Row> iter = tEnv.executeSql("select * from sink_table").collect();

HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
.addRow(testData.get(1))
.addRow(testData.get(1))
.commit(testPartition.get(1));

for (int i = 2; i < 7; i++) {
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(
Arrays.asList(
Row.of(expectedData.get(i - 1)).toString(),
Row.of(expectedData.get(i - 1)).toString()),
fetchRows(iter, 2));

if (i < 6) {
HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "source_table")
.addRow(testData.get(i))
.addRow(testData.get(i))
.commit(testPartition.get(i));
}
}
this.checkSuccessFiles(
URI.create(
hiveCatalog
Expand All @@ -320,6 +378,16 @@ public void testStreamingSinkWithTimestampLtzWatermark() throws Exception {
}
}

private static List<String> fetchRows(Iterator<Row> iter, int size) {
List<String> strings = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Assert.assertTrue(iter.hasNext());
strings.add(iter.next().toString());
}
strings.sort(String::compareTo);
return strings;
}

private void checkSuccessFiles(String path) {
File basePath = new File(path, "d=2020-05-03");
Assert.assertEquals(5, basePath.list().length);
Expand Down