Skip to content

Commit aa9f34e

Browse files
authored
Merge pull request #11 from taosdata/feat/TS-5951
fix:add update and delete modes support for table sql
2 parents f358372 + 67d29da commit aa9f34e

File tree

6 files changed

+43
-5
lines changed

6 files changed

+43
-5
lines changed

src/main/java/com/taosdata/flink/table/TDengineConnectorOptions.java

+13
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,19 @@ public class TDengineConnectorOptions {
115115
.defaultValue("true")
116116
.withDescription("tdengine database enable auto reconnect");
117117

118+
public static final ConfigOption<String> RECONNECT_INTERVAL_MS = ConfigOptions
119+
.key(TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS)
120+
.stringType()
121+
.defaultValue("2000")
122+
.withDescription("tdengine database auto reconnect interval");
123+
124+
public static final ConfigOption<String> RECONNECT_RETRY_COUNT = ConfigOptions
125+
.key(TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT)
126+
.stringType()
127+
.defaultValue("3")
128+
.withDescription("tdengine database auto reconnect interval");
129+
130+
118131
public static final ConfigOption<String> SCAN_QUERY =
119132
ConfigOptions.key("scan.query")
120133
.stringType()

src/main/java/com/taosdata/flink/table/TDengineDynamicTableFactory.java

+7
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ private DynamicTableSource createTableSource(ReadableConfig config, Context cont
123123

124124
Properties connProps = new Properties();
125125
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, config.get(TDengineConnectorOptions.ENABLE_AUTO_RECONNECT));
126+
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS, config.get(TDengineConnectorOptions.RECONNECT_INTERVAL_MS));
127+
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT, config.get(TDengineConnectorOptions.RECONNECT_RETRY_COUNT));
126128
connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_CHARSET, config.get(TDengineConnectorOptions.CHARSET));
127129
return new TDengineTableSource(url, scanQurey, physicalDataType, connProps);
128130
}
@@ -170,6 +172,9 @@ private DynamicTableSource createTableCdc(ReadableConfig config) {
170172
properties.setProperty(TDengineConnectorOptions.TMQ_POLL_INTERVAL_MS.key(), optionVal);
171173
properties.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData");
172174
properties.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, config.get(TDengineConnectorOptions.CHARSET));
175+
properties.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, config.get(TDengineConnectorOptions.ENABLE_AUTO_RECONNECT));
176+
properties.setProperty(TDengineConfigParams.PROPERTY_KEY_RECONNECT_INTERVAL_MS, config.get(TDengineConnectorOptions.RECONNECT_INTERVAL_MS));
177+
properties.setProperty(TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT, config.get(TDengineConnectorOptions.RECONNECT_RETRY_COUNT));
173178
String topic = config.get(TDengineConnectorOptions.TOPIC);
174179
return new TDengineTableCdc(topic, properties);
175180
}
@@ -206,6 +211,8 @@ public Set<ConfigOption<?>> optionalOptions() {
206211
optionalOptions.add(TDengineConnectorOptions.LOCALE);
207212
optionalOptions.add(TDengineConnectorOptions.CHARSET);
208213
optionalOptions.add(TDengineConnectorOptions.ENABLE_AUTO_RECONNECT);
214+
optionalOptions.add(TDengineConnectorOptions.RECONNECT_INTERVAL_MS);
215+
optionalOptions.add(TDengineConnectorOptions.RECONNECT_RETRY_COUNT);
209216
optionalOptions.add(TDengineConnectorOptions.SERVER_TIME_ZONE);
210217
optionalOptions.add(TDengineConnectorOptions.TMQ_POLL_INTERVAL_MS);
211218

src/main/java/com/taosdata/flink/table/TDengineTableCdc.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.flink.table.connector.source.ScanTableSource;
1414
import org.apache.flink.table.data.RowData;
1515
import org.apache.flink.table.types.DataType;
16+
import org.apache.flink.types.RowKind;
1617

1718
import java.util.Properties;
1819

@@ -31,7 +32,12 @@ public TDengineTableCdc(String topic, Properties properties) {
3132

3233
@Override
3334
public ChangelogMode getChangelogMode() {
34-
return ChangelogMode.insertOnly();
35+
return ChangelogMode.newBuilder()
36+
.addContainedKind(RowKind.INSERT)
37+
.addContainedKind(RowKind.UPDATE_BEFORE)
38+
.addContainedKind(RowKind.UPDATE_AFTER)
39+
.addContainedKind(RowKind.DELETE)
40+
.build();
3541
}
3642

3743
@Override

src/main/java/com/taosdata/flink/table/TDengineTableSink.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public TDengineTableSink(TDengineTableSink tableSink) {
2929

3030
@Override
3131
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
32-
return ChangelogMode.insertOnly();
32+
return requestedMode;
3333
}
3434

3535
@Override

src/main/java/com/taosdata/flink/table/TDengineTableSource.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.apache.flink.table.connector.source.ScanTableSource;
1616
import org.apache.flink.table.data.RowData;
1717
import org.apache.flink.table.types.DataType;
18+
import org.apache.flink.types.RowKind;
1819

1920
import java.util.Properties;
2021

@@ -35,7 +36,12 @@ public TDengineTableSource(String url, String scanQuery, DataType physicalDataTy
3536

3637
@Override
3738
public ChangelogMode getChangelogMode() {
38-
return ChangelogMode.insertOnly();
39+
return ChangelogMode.newBuilder()
40+
.addContainedKind(RowKind.INSERT)
41+
.addContainedKind(RowKind.UPDATE_BEFORE)
42+
.addContainedKind(RowKind.UPDATE_AFTER)
43+
.addContainedKind(RowKind.DELETE)
44+
.build();
3945
}
4046

4147
@Override

src/test/java/com/taosdata/flink/table/TDFlinkTableTest.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ public void prepare() throws Exception {
4545
"power.d1001 USING power.meters TAGS('California.SanFrancisco', 1) " +
4646
"VALUES " +
4747
"('2024-12-19 19:12:45.642', 50.30000, 201, 0.31000) " +
48+
"('2024-12-19 19:12:45.642', 50.30000, 201, 1.31001) " +
49+
"('2024-12-19 19:12:45.642', 50.30000, 201, 2.31002) " +
50+
"('2024-12-19 19:12:45.642', 50.30000, 201, 3.31003) " +
4851
"('2024-12-19 19:12:46.642', 82.60000, 202, 0.33000) " +
4952
"('2024-12-19 19:12:47.642', 92.30000, 203, 0.31000) " +
5053
"('2024-12-19 18:12:45.642', 50.30000, 201, 0.31000) " +
@@ -56,6 +59,9 @@ public void prepare() throws Exception {
5659
"power.d1002 USING power.meters TAGS('Alabama.Montgomery', 2) " +
5760
"VALUES " +
5861
"('2024-12-19 19:12:45.642', 50.30000, 204, 0.25000) " +
62+
"('2024-12-19 19:12:45.642', 50.30000, 204, 1.25001) " +
63+
"('2024-12-19 19:12:45.642', 50.30000, 204, 2.25002) " +
64+
"('2024-12-19 19:12:45.642', 50.30000, 204, 3.25003) " +
5965
"('2024-12-19 19:12:46.642', 62.60000, 205, 0.33000) " +
6066
"('2024-12-19 19:12:47.642', 72.30000, 206, 0.31000) " +
6167
"('2024-12-19 18:12:45.642', 50.30000, 204, 0.25000) " +
@@ -296,7 +302,8 @@ void testTableToSink() throws Exception {
296302
" phase FLOAT," +
297303
" location VARBINARY," +
298304
" groupid INT," +
299-
" tbname VARBINARY" +
305+
" tbname VARBINARY," +
306+
" PRIMARY KEY (ts) NOT ENFORCED" +
300307
") WITH (" +
301308
" 'connector' = 'tdengine-connector'," +
302309
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
@@ -359,7 +366,6 @@ void testCdcTableToSink() throws Exception {
359366
" 'topic' = 'topic_meters'" +
360367
")";
361368

362-
363369
String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" +
364370
" ts TIMESTAMP," +
365371
" `current` FLOAT," +

0 commit comments

Comments
 (0)