Skip to content

Commit

Permalink
[flink] Support the watermark function of auditlog table (#3837)
Browse files Browse the repository at this point in the history
  • Loading branch information
discivigour authored Jul 31, 2024
1 parent 26afa3e commit 5bd3c6f
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
package org.apache.paimon.flink;

import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AuditLogTable;

import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;

/** A {@link CatalogTable} to represent system table. */
public class SystemCatalogTable implements CatalogTable {
Expand All @@ -46,10 +53,21 @@ public Table table() {

@Override
public Schema getUnresolvedSchema() {
return Schema.newBuilder()
.fromRowDataType(
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())))
.build();
Schema.Builder builder = Schema.newBuilder();
builder.fromRowDataType(
TypeConversions.fromLogicalToDataType(toLogicalType(table.rowType())));
if (table instanceof AuditLogTable) {
Map<String, String> newOptions = new HashMap<>(table.options());
if (newOptions.keySet().stream()
.anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) {
WatermarkSpec watermarkSpec = deserializeWatermarkSpec(newOptions);
return builder.watermark(
watermarkSpec.getRowtimeAttribute(),
watermarkSpec.getWatermarkExpr())
.build();
}
}
return builder.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,39 @@ public void testWatermark() throws Exception {
innerTestWatermark();
}

@Test
public void testAuditLogWatermark() throws Exception {
String[] options =
new String[] {
"'scan.watermark.idle-timeout'='1s'",
"'scan.watermark.alignment.group'='group'",
"'scan.watermark.alignment.update-interval'='2s'",
"'scan.watermark.alignment.max-drift'='1s'"
};
sql(
"CREATE TABLE T (f0 INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts) WITH ("
+ String.join(",", options)
+ ")");

BlockingIterator<Row, Row> select =
BlockingIterator.of(
streamSqlIter(
"SELECT window_start, window_end, SUM(f0) FROM TABLE("
+ "TUMBLE(TABLE T$audit_log, DESCRIPTOR(ts), INTERVAL '10' MINUTES))\n"
+ " GROUP BY window_start, window_end;"));

sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:00:00')");
sql("INSERT INTO T VALUES (1, TIMESTAMP '2023-02-02 12:10:05')");

assertThat(select.collect(1))
.containsExactlyInAnyOrder(
Row.of(
LocalDateTime.parse("2023-02-02T12:00"),
LocalDateTime.parse("2023-02-02T12:10"),
1));
select.close();
}

@Disabled // TODO unstable alignment may block watermark generation
@Test
public void testWatermarkAlignment() throws Exception {
Expand Down

0 comments on commit 5bd3c6f

Please sign in to comment.