From 8967d8f2096d57209621fa43ab4bb37438270aa4 Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 11:09:33 +0800 Subject: [PATCH 1/6] add flink doc --- docs/examples/flink/Main.java | 586 ++++++++++++++++++ .../10-third-party/01-collection/12-flink.md | 366 +++++++++++ 2 files changed, 952 insertions(+) create mode 100644 docs/examples/flink/Main.java create mode 100644 docs/zh/10-third-party/01-collection/12-flink.md diff --git a/docs/examples/flink/Main.java b/docs/examples/flink/Main.java new file mode 100644 index 00000000000..b6017787764 --- /dev/null +++ b/docs/examples/flink/Main.java @@ -0,0 +1,586 @@ +package com.taosdata.flink.example; + +import com.taosdata.flink.cdc.TDengineCdcSource; +import com.taosdata.flink.common.TDengineCdcParams; +import com.taosdata.flink.common.TDengineConfigParams; +import com.taosdata.flink.sink.TDengineSink; +import com.taosdata.flink.source.TDengineSource; +import com.taosdata.flink.source.entity.SourceSplitSql; +import com.taosdata.flink.source.entity.SplitType; +import com.taosdata.flink.source.entity.TimestampSplitInfo; +import com.taosdata.jdbc.TSDBDriver; +import com.taosdata.jdbc.tmq.ConsumerRecords; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.shaded.curator5.com.google.common.base.Strings; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.data.RowData; +import org.junit.Assert; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.SimpleDateFormat; +import java.time.Duration; + +import java.time.ZoneId; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import javax.xml.transform.Source; + +import org.apache.flink.streaming.api.CheckpointingMode; + + + +public class Main { + static String jdbcUrl = "jdbc:TAOS-WS://localhost:6041?user=root&password=taosdata"; + static void prepare() throws ClassNotFoundException, SQLException { + Properties properties = new Properties(); + properties.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + properties.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + String insertQuery = "INSERT INTO " + + "power.d1001 USING power.meters TAGS('California.SanFrancisco', 1) " + + "VALUES " + + "('2024-12-19 19:12:45.642', 50.30000, 201, 0.31000) " + + "('2024-12-19 19:12:46.642', 82.60000, 202, 0.33000) " + + "('2024-12-19 19:12:47.642', 92.30000, 203, 0.31000) " + + "('2024-12-19 18:12:45.642', 50.30000, 201, 0.31000) " + + "('2024-12-19 18:12:46.642', 82.60000, 202, 0.33000) " + + "('2024-12-19 18:12:47.642', 92.30000, 203, 0.31000) " + + "('2024-12-19 17:12:45.642', 50.30000, 201, 0.31000) " + + "('2024-12-19 17:12:46.642', 82.60000, 202, 0.33000) " + + "('2024-12-19 17:12:47.642', 92.30000, 203, 0.31000) " + + "power.d1002 USING power.meters TAGS('Alabama.Montgomery', 2) " + + "VALUES " + + "('2024-12-19 19:12:45.642', 50.30000, 204, 0.25000) " + + "('2024-12-19 19:12:46.642', 62.60000, 205, 0.33000) " + + "('2024-12-19 19:12:47.642', 72.30000, 206, 0.31000) " + + "('2024-12-19 18:12:45.642', 50.30000, 204, 0.25000) " + + "('2024-12-19 18:12:46.642', 62.60000, 205, 0.33000) " + + "('2024-12-19 18:12:47.642', 72.30000, 206, 0.31000) " + + "('2024-12-19 17:12:45.642', 50.30000, 204, 0.25000) " + + "('2024-12-19 17:12:46.642', 62.60000, 205, 0.33000) " + + "('2024-12-19 17:12:47.642', 72.30000, 206, 0.31000) "; + + Class.forName("com.taosdata.jdbc.ws.WebSocketDriver"); + try (Connection connection = DriverManager.getConnection(jdbcUrl, properties); + Statement stmt = connection.createStatement()) { + + stmt.executeUpdate("DROP TOPIC IF EXISTS topic_meters"); + + stmt.executeUpdate("DROP database IF EXISTS power"); + // create database + int rowsAffected = stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power vgroups 5"); + + stmt.executeUpdate("use power"); + // you can check rowsAffected here + System.out.println("Create database power successfully, rowsAffected: " + rowsAffected); + // create table + rowsAffected = stmt.executeUpdate("CREATE STABLE IF NOT EXISTS meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);"); + // you can check rowsAffected here + System.out.println("Create stable power.meters successfully, rowsAffected: " + rowsAffected); + + stmt.executeUpdate("CREATE TOPIC topic_meters as SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM meters"); + + int affectedRows = stmt.executeUpdate(insertQuery); + // you can check affectedRows here + System.out.println("Successfully inserted " + affectedRows + " rows to power.meters."); + + stmt.executeUpdate("DROP database IF EXISTS power_sink"); + // create database + stmt.executeUpdate("CREATE DATABASE IF NOT EXISTS power_sink vgroups 5"); + + stmt.executeUpdate("use power_sink"); + // you can check rowsAffected here + System.out.println("Create database power successfully, rowsAffected: " + rowsAffected); + // create table + stmt.executeUpdate("CREATE STABLE IF NOT EXISTS sink_meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);"); + // you can check rowsAffected here + + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS sink_normal (ts timestamp, current float, voltage int, phase float);"); + // you can check rowsAffected here + + + } catch (Exception ex) { + // please refer to the JDBC specifications for detailed exceptions info + System.out.printf("Failed to create database power or stable meters, %sErrMessage: %s%n", + ex instanceof SQLException ? "ErrCode: " + ((SQLException) ex).getErrorCode() + ", " : "", + ex.getMessage()); + // Print stack trace for context in examples. Use logging in production. + throw ex; + } + } + + public static void main(String[] args) throws Exception { + prepare(); + if (args != null && args.length > 0 && args[0].equals("source")) { + testSource(); + } else if (args != null && args.length > 0 && args[0].equals("table")) { + testTableToSink(); + } else if (args != null && args.length > 0 && args[0].equals("cdc")) { + testCustomTypeCdc(); + }else if (args != null && args.length > 0 && args[0].equals("table-cdc")) { + testCdcTableToSink(); + } + } + + static SourceSplitSql getTimeSplit() { +// ANCHOR: time_interval +SourceSplitSql splitSql = new SourceSplitSql(); +splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters") +.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP) +.setTimestampSplitInfo(new TimestampSplitInfo( + "2024-12-19 16:12:48.000", + "2024-12-19 19:12:48.000", + "ts", + Duration.ofHours(1), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"), + ZoneId.of("Asia/Shanghai"))); +// ANCHOR_END: time_interval +return splitSql; + } + + static SourceSplitSql getTagSplit() throws Exception { +// ANCHOR: tag_split +SourceSplitSql splitSql = new SourceSplitSql(); +splitSql.setSql("select ts, current, voltage, phase, groupid, location from meters where voltage > 100") + .setTagList(Arrays.asList("groupid >100 and location = 'Shanghai'", + "groupid >50 and groupid < 100 and location = 'Guangzhou'", + "groupid >0 and groupid < 50 and location = 'Beijing'")) + .setSplitType(SplitType.SPLIT_TYPE_TAG); +// ANCHOR_END: tag_split +return splitSql; + } + + static SourceSplitSql getTableSqlit() { +// ANCHOR: table_split +SourceSplitSql splitSql = new SourceSplitSql(); +splitSql.setSelect("ts, current, voltage, phase, groupid, location") + .setTableList(Arrays.asList("d1001", "d1002")) + .setOther("order by ts limit 100") + .setSplitType(SplitType.SPLIT_TYPE_TABLE); +// ANCHOR_END: table_split + } + + //ANCHOR: source_test + static void testSource() throws Exception { + Properties connProps = new Properties(); + connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); + connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + + splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters") + .setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP) + .setTimestampSplitInfo(new TimestampSplitInfo( + "2024-12-19 16:12:48.000", + "2024-12-19 19:12:48.000", + "ts", + Duration.ofHours(1), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"), + ZoneId.of("Asia/Shanghai"))); + + TDengineSource source = new TDengineSource<>(connProps, sql, RowData.class); + DataStreamSource input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source"); + DataStream resultStream = input.map((MapFunction) rowData -> { + StringBuilder sb = new StringBuilder(); + sb.append("ts: " + rowData.getTimestamp(0, 0) + + ", current: " + rowData.getFloat(1) + + ", voltage: " + rowData.getInt(2) + + ", phase: " + rowData.getFloat(3) + + ", location: " + new String(rowData.getBinary(4))); + sb.append("\n"); + return sb.toString(); + }); + resultStream.print(); + env.execute("tdengine flink source"); + + } + //ANCHOR_END: source_test + + //ANCHOR: source_custom_type_test + void testCustomTypeSource() throws Exception { + System.out.println("testTDengineSourceByTimeSplit start!"); + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultSoureDeserialization"); + connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); + SourceSplitSql splitSql = new SourceSplitSql(); + splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters") + .setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP) + //按照时间分片 + .setTimestampSplitInfo(new TimestampSplitInfo( + "2024-12-19 16:12:48.000", + "2024-12-19 19:12:48.000", + "ts", + Duration.ofHours(1), + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"), + ZoneId.of("Asia/Shanghai"))); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + TDengineSource source = new TDengineSource<>(connProps, splitSql, ResultBean.class); + DataStreamSource input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source"); + DataStream resultStream = input.map((MapFunction) rowData -> { + StringBuilder sb = new StringBuilder(); + sb.append("ts: " + rowData.getTs() + + ", current: " + rowData.getCurrent() + + ", voltage: " + rowData.getVoltage() + + ", phase: " + rowData.getPhase() + + ", groupid: " + rowData.getGroupid() + + ", location" + rowData.getLocation() + + ", tbname: " + rowData.getTbname()); + sb.append("\n"); + totalVoltage.addAndGet(rowData.getVoltage()); + return sb.toString(); + }); + resultStream.print(); + env.execute("flink tdengine source"); + } + //ANCHOR_END: source_custom_type_test + + //ANCHOR: source_batch_test + void testBatchSource() throws Exception { + Properties connProps = new Properties(); + connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + connProps.setProperty(TDengineConfigParams.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); + connProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); + connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + Class> typeClass = (Class>) (Class) SourceRecords.class; + SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters"); + TDengineSource> source = new TDengineSource<>(connProps, sql, typeClass); + DataStreamSource> input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStream resultStream = input.map((MapFunction, String>) records -> { + StringBuilder sb = new StringBuilder(); + Iterator iterator = records.iterator(); + while (iterator.hasNext()) { + GenericRowData row = (GenericRowData) iterator.next(); + sb.append("ts: " + row.getTimestamp(0, 0) + + ", current: " + row.getFloat(1) + + ", voltage: " + row.getInt(2) + + ", phase: " + row.getFloat(3) + + ", location: " + new String(row.getBinary(4))); + sb.append("\n"); + totalVoltage.addAndGet(row.getInt(2)); + } + return sb.toString(); + }); + resultStream.print(); + env.execute("flink tdengine source"); + + } + //ANCHOR_END: source_batch_test + + //ANCHOR: cdc_source + void testTDengineCdc() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + env.enableCheckpointing(100, AT_LEAST_ONCE); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + Properties config = new Properties(); + config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); + config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); + config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); + config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); + config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000"); + config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); + config.setProperty(TDengineCdcParams.ENABLE_AUTO_COMMIT, "true"); + config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); + config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); + TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStream resultStream = input.map((MapFunction) rowData -> { + StringBuilder sb = new StringBuilder(); + sb.append("tsxx: " + rowData.getTimestamp(0, 0) + + ", current: " + rowData.getFloat(1) + + ", voltage: " + rowData.getInt(2) + + ", phase: " + rowData.getFloat(3) + + ", location: " + new String(rowData.getBinary(4))); + sb.append("\n"); + totalVoltage.addAndGet(rowData.getInt(2)); + return sb.toString(); + }); + resultStream.print(); + JobClient jobClient = env.executeAsync("Flink test cdc Example"); + Thread.sleep(5000L); + // The task submitted by Flink UI cannot be cancle and needs to be stopped on the UI page. + jobClient.cancel().get(); + } + //ANCHOR_END: cdc_source + + //ANCHOR: cdc_batch_source + void testTDengineCdcBatch() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + Properties config = new Properties(); + config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); + config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); + config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); + config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); + config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000"); + config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); + config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); + config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); + config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true"); + + Class> typeClass = (Class>) (Class) ConsumerRecords.class; + TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); + DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStream resultStream = input.map((MapFunction, String>) records -> { + Iterator> iterator = records.iterator(); + StringBuilder sb = new StringBuilder(); + while (iterator.hasNext()) { + GenericRowData row = (GenericRowData) iterator.next().value(); + sb.append("tsxx: " + row.getTimestamp(0, 0) + + ", current: " + row.getFloat(1) + + ", voltage: " + row.getInt(2) + + ", phase: " + row.getFloat(3) + + ", location: " + new String(row.getBinary(4))); + sb.append("\n"); + totalVoltage.addAndGet(row.getInt(2)); + } + return sb.toString(); + + }); + + resultStream.print(); + JobClient jobClient = env.executeAsync("Flink test cdc Example"); + Thread.sleep(5000L); + jobClient.cancel().get(); + } + //ANCHOR_END: cdc_batch_source + + //ANCHOR: cdc_custom_type_test + static void testCustomTypeCdc() throws Exception { + System.out.println("testCustomTypeTDengineCdc start!"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + env.enableCheckpointing(100, AT_LEAST_ONCE); + env.getConfig().setRestartStrategy(RestartStrategies.noRestart()); + env.getCheckpointConfig().setTolerableCheckpointFailureNumber(4); + Properties config = new Properties(); + config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); + config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); + config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); + config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); + config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS, "1000"); + config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); + config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); + config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "com.taosdata.flink.entity.ResultDeserializer"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); + TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, ResultBean.class); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "kafka-source"); + DataStream resultStream = input.map((MapFunction) rowData -> { + StringBuilder sb = new StringBuilder(); + sb.append("ts: " + rowData.getTs() + + ", current: " + rowData.getCurrent() + + ", voltage: " + rowData.getVoltage() + + ", phase: " + rowData.getPhase() + + ", groupid: " + rowData.getGroupid() + + ", location" + rowData.getLocation() + + ", tbname: " + rowData.getTbname()); + sb.append("\n"); + totalVoltage.addAndGet(rowData.getVoltage()); + return sb.toString(); + }); + resultStream.print(); + JobClient jobClient = env.executeAsync("Flink test cdc Example"); + Thread.sleep(5000L); + jobClient.cancel().get(); + } + //ANCHOR_END: cdc_custom_type_test + + //ANCHOR: RowDataToSink + static void testRowDataToSink() throws Exception { + Properties connProps = new Properties(); + connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); + connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); + SourceSplitSql splitSql = getTimeSplit(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); + TDengineSource source = new TDengineSource<>(connProps, sql, RowData.class); + DataStreamSource input = env.fromSource(source, WatermarkStrategy.noWatermarks(), "tdengine-source"); + Properties sinkProps = new Properties(); + sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); + sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_source"); + sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink"); + sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); + sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata"); + sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); + + // Arrays.asList The list of target table field names needs to be consistent with the data order + TDengineSink sink = new TDengineSink<>(sinkProps, + Arrays.asList("ts", "current", "voltage", "phase", "groupid", "location", "tbname")); + + input.sinkTo(sink); + env.execute("flink tdengine source"); + } + //ANCHOR_END: RowDataToSink + + //ANCHOR: BatchRowDataToSink + static void testBatchToTdSink() throws Exception { + System.out.println("testTDengineCdcToTdSink start!"); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + env.enableCheckpointing(500, CheckpointingMode.AT_LEAST_ONCE); + env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); + env.getCheckpointConfig().setCheckpointTimeout(5000); + Properties config = new Properties(); + config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); + config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); + config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); + config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); + config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL, "1000"); + config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); + config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); + config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); + config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); + config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true"); + + Class> typeClass = (Class>) (Class) ConsumerRecords.class; + TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); + DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); + + Properties sinkProps = new Properties(); + sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); + sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); + sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); + sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_cdc"); + sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink"); + sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); + sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); + sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); + + TDengineSink> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); + input.sinkTo(sink); + JobClient jobClient = env.executeAsync("Flink test cdc Example"); + Thread.sleep(6000L); + jobClient.cancel().get(); + System.out.println("testTDengineCdcToTdSink finish!"); + } + //ANCHOR_END: BatchRowDataToSink + + //ANCHOR: source_table + static void testTableToSink() throws Exception { + System.out.println("testTableToSink start!"); + EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(3); + env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); + String tdengineSourceTableDDL = "CREATE TABLE `meters` (" + + " ts TIMESTAMP," + + " `current` FLOAT," + + " voltage INT," + + " phase FLOAT," + + " location VARBINARY," + + " groupid INT," + + " tbname VARBINARY" + + ") WITH (" + + " 'connector' = 'tdengine-connector'," + + " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," + + " 'td.jdbc.mode' = 'source'," + + " 'table-name' = 'meters'," + + " 'scan.query' = 'SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`'" + + ")"; + + + String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" + + " ts TIMESTAMP," + + " `current` FLOAT," + + " voltage INT," + + " phase FLOAT," + + " location VARBINARY," + + " groupid INT," + + " tbname VARBINARY" + + ") WITH (" + + " 'connector' = 'tdengine-connector'," + + " 'td.jdbc.mode' = 'sink'," + + " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," + + " 'sink.db.name' = 'power_sink'," + + " 'sink.supertable.name' = 'sink_meters'" + + ")"; + + tableEnv.executeSql(tdengineSourceTableDDL); + tableEnv.executeSql(tdengineSinkTableDDL); + tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`"); + } + //ANCHOR_END: source_table + + //ANCHOR: cdc_table + static void testCdcTableToSink() throws Exception { + EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(5); + env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); + StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); + String tdengineSourceTableDDL = "CREATE TABLE `meters` (" + + " ts TIMESTAMP," + + " `current` FLOAT," + + " voltage INT," + + " phase FLOAT," + + " location VARBINARY," + + " groupid INT," + + " tbname VARBINARY" + + ") WITH (" + + " 'connector' = 'tdengine-connector'," + + " 'bootstrap.servers' = 'localhost:6041'," + + " 'td.jdbc.mode' = 'cdc'," + + " 'group.id' = 'group_22'," + + " 'auto.offset.reset' = 'earliest'," + + " 'enable.auto.commit' = 'false'," + + " 'topic' = 'topic_meters'" + + ")"; + + + String tdengineSinkTableDDL = "CREATE TABLE `sink_meters` (" + + " ts TIMESTAMP," + + " `current` FLOAT," + + " voltage INT," + + " phase FLOAT," + + " location VARBINARY," + + " groupid INT," + + " tbname VARBINARY" + + ") WITH (" + + " 'connector' = 'tdengine-connector'," + + " 'td.jdbc.mode' = 'cdc'," + + " 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power_sink?user=root&password=taosdata'," + + " 'sink.db.name' = 'power_sink'," + + " 'sink.supertable.name' = 'sink_meters'" + + ")"; + + tableEnv.executeSql(tdengineSourceTableDDL); + tableEnv.executeSql(tdengineSinkTableDDL); + + TableResult tableResult = tableEnv.executeSql("INSERT INTO sink_meters SELECT ts, `current`, voltage, phase, location, groupid, tbname FROM `meters`"); + + Thread.sleep(5000L); + tableResult.getJobClient().get().cancel().get(); + } + //ANCHOR_END: cdc_table + + +} diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md new file mode 100644 index 00000000000..8b2e5422b1c --- /dev/null +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -0,0 +1,366 @@ +--- +sidebar_label: Flink +title: TDengine Flink Connector +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批一体化处理框架,可用于流处理、批处理、复杂事件处理、实时数据仓库构建及为机器学习提供实时数据支持等诸多大数据处理场景。与此同时,Flink 拥有丰富的连接器与各类工具,可对接众多不同类型的数据源实现数据的读取与写入。在数据处理的过程中,Flink 还提供了一系列可靠的容错机制,有力保障任务即便遭遇意外状况,依然能稳定、持续运行。 + +借助 TDengine 的 Flink 连接器,Apache Flink 得以与 TDengine 数据库无缝对接,一方面能够将经过复杂运算和深度分析后所得到的结果精准存入 TDengine 数据库,实现数据的高效存储与管理;另一方面,也可以从 TDengine 数据库中快速、稳定地读取海量数据,并在此基础上进行全面、深入的分析处理,充分挖掘数据的潜在价值,为企业的决策制定提供有力的数据支持和科学依据,极大地提升数据处理的效率和质量,增强企业在数字化时代的竞争力和创新能力。 + +## 前置条件 + +准备以下环境: +- TDengine 集群已部署并正常运行(企业及社区版均可) +- taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter) +- Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/) + +## JRE 版本兼容性 + +- JRE: 支持 JRE 8 及以上版本。 + +# 支持的平台 + +Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 + +## 版本历史 +| Flink Connector 版本 | 主要变化 | TDengine 版本 | +| ------------------| ------------------------------------ | ---------------- | +| 1.1.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据
2. 支持 CDC 订阅 TDengine 数据库中的数据
3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | + +## 异常和错误码 + +在任务执行失败后,查看 Flink 任务执行日志确认失败原因 + +具体的错误码请参考: + +| Error Code | Description | Suggested Actions | +| ---------------- |------------------------------------------------------- | -------------------- | +| 0xa000 |connection param error |连接器参数错误 +| 0xa001 |the groupid parameter of CDC is incorrect |CDC 的 groupid 参数错误。| +| 0xa002 |wrong topic parameter for CDC |CDC 的 topic 参数错误。| +| 0xa010 |database name configuration error |数据库名配置错误| +| 0xa011 |table name configuration error |表名配置错误| +| 0xa012 |no data was obtained from the data source |从数据源中获取数据失败| +| 0xa013 |value.deserializer parameter not set |未设置序列化方式| +| 0xa014 |list of column names for target table not set |未设置目标表的列名列表| +| 0x2301 |connection already closed |连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。| +| 0x2302 |this operation is NOT supported currently! |当前使用接口不支持,可以更换其他连接方式。| +| 0x2303 |invalid variables |参数不合法,请检查相应接口规范,调整参数类型及大小。| +| 0x2304 |statement is closed |statement 已经关闭,请检查 statement 是否关闭后再次使用,或是连接是否正常。| +| 0x2305 |resultSet is closed |resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。| +| 0x230d |parameter index out of range |参数越界,请检查参数的合理范围。| +| 0x230e |connection already closed |连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。| +| 0x230f |unknown sql type in tdengine |请检查 TDengine 支持的 Data Type 类型。| +| 0x2315 |unknown taos type in tdengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。| +| 0x2319 |user is required |创建连接时缺少用户名信息。| +| 0x231a |password is required |创建连接时缺少密码信息。| +| 0x231d |can't create connection with server within |通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。| +| 0x231e |failed to complete the task within the specified time |通过增加参数 messageWaitTimeout 增加执行耗时,或是请检查与 taosAdapter 之间的连接情况。| +| 0x2352 |Unsupported encoding |本地连接下指定了不支持的字符编码集。| +| 0x2353 |internal error of database, please see taoslog for more details |本地连接执行 prepareStatement 时出现错误,请检查 taos log 进行问题定位。| +| 0x2354 |connection is NULL |本地连接执行命令时,Connection 已经关闭。请检查与 TDengine 的连接情况。| +| 0x2355 |result set is NULL |本地连接获取结果集,结果集异常,请检查连接情况,并重试。| +| 0x2356 |invalid num of fields |本地连接获取结果集的 meta 信息不匹配。| +| 0x2357 |empty sql string |填写正确的 SQL 进行执行。| +| 0x2371 |consumer properties must not be null! |创建订阅时参数为空,请填写正确的参数。| +| 0x2375 |topic reference has been destroyed |创建数据订阅过程中,topic 引用被释放。请检查与 TDengine 的连接情况。| +| 0x2376 |failed to set consumer topic, topic name is empty |创建数据订阅过程中,订阅 topic 名称为空。请检查指定的 topic 名称是否填写正确。| +| 0x2377 |consumer reference has been destroyed |订阅数据传输通道已经关闭,请检查与 TDengine 的连接情况。| +| 0x2378 |consumer create error |创建数据订阅失败,请根据错误信息检查 taos log 进行问题定位。| +| 0x237a |vGroup not found in result set VGroup |没有分配给当前 consumer,由于 Rebalance 机制导致 Consumer 与 VGroup 不是绑定的关系。| + +## 数据类型映射 +TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowData Type 对应类型转换如下: + +| TDengine DataType | Flink RowDataType | +| ----------------- | ------------------ | +| TIMESTAMP | TimestampData | +| INT | Integer | +| BIGINT | Long | +| FLOAT | Float | +| DOUBLE | Double | +| SMALLINT | Short | +| TINYINT | Byte | +| BOOL | Boolean | +| BINARY | byte[] | +| NCHAR | StringData | +| JSON | StringData | +| VARBINARY | byte[] | +| GEOMETRY | byte[] | + +## 使用说明 + +### Flink 语义选择说明 + +采用 At-Least-Once(至少一次)语义原因: + - TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。 + - 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。 + - 采用 At-Least-Once(至少一次)确保达到较高的数据处理的性能和较低的数据延时,设置方式如下: + ```java + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(5000); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); + ``` + +如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。 + +```xml + + com.taosdata.flink + flink-connector-tdengine + 1.0.0 + +``` + +### 连接参数 + +建立连接的参数有 URL 和 Properties。 +URL 规范格式为: +`jdbc:TAOS-WS://[host_name]:[port]/[database_name]?[user={user}|&password={password}|&timezone={timezone}]` + +参数说明: +- user:登录 TDengine 用户名,默认值 'root'。 +- password:用户登录密码,默认值 'taosdata'。 +- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 +- httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。 +- messageWaitTimeout: 消息超时时间,单位 ms, 默认值为 60000。 +- useSSL: 连接中是否使用 SSL。 + +### Source + +Source 拉取 TDengine 数据库中的数据,并将获取到的数据转换为 Flink 内部可处理的格式和类型,并以并行的方式进行读取和分发,为后续的数据处理提供高效的输入。 +通过设置数据源的并行度,实现多个线程并行地从数据源中读取数据,提高数据读取的效率和吞吐量,充分利用集群资源进行大规模数据处理能力。 + +#### Source Properties + +Properties 中配置参数如下: + +| 参数名称 | 类型 | 参数说明 | 备注 | +| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| | +| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| | +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| +| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineSource 对象时需要指定数据类型为 SourceRecords\<类型\> | 此处的类型为用下游算子接收数据的类型| +| TDengineConfigParams.PROPERTY_KEY_CHARSET | string | 客户端使用的字符集,默认值为系统字符集。| | +| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false|| +| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效| +| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| + +#### 按时间分片 + +用户可以对查询的 SQL 按照时间拆分为多个子任务,输入:开始时间,结束时间,拆分间隔,时间字段名称,系统会按照设置的间隔(时间左闭右开)进行拆分并行获取数据。 + +```java +{{#include docs/examples/flink/Main.java:time_interval}} +``` + +#### 按超级表 TAG 分片 + +用户可以按照超级表的 TAG 字段将查询的 SQL 拆分为多个查询条件,系统会以一个查询条件对应一个子任务的方式对其进行拆分,进而并行获取数据。 + +```java +{{#include docs/examples/flink/Main.java:tag_split}} +``` + +#### 按表名分片 + +支持输入多个相同表结构的超级表或普通表进行分片,系统会按照一个表一个任务的方式进行拆分,进而并行获取数据。 + +```java +{{#include docs/examples/flink/Main.java:table_split}} +``` + +#### 使用 Source 连接器 + +查询结果为 RowData 数据类型示例: + +
+RowData Source +```java +{{#include docs/examples/flink/Main.java:source_test}} +``` +
+ +批量查询结果示例: + +
+Batch Source +```java +{{#include docs/examples/flink/Main.java:source_batch_test}} +``` +
+ +查询结果为自定义数据类型示例: + +
+Custom Type Source +```java +{{#include docs/examples/flink/Main.java:source_custom_type_test}} +``` +
+ +- ResultBean 自定义的一个内部类,用于定义 Source 查询结果的数据类型。 +- ResultSoureDeserialization 是自定义的一个内部类,通过继承 TdengineRecordDeserialization 并实现 convert 和 getProducedType 方法。 + +### CDC 数据订阅 + +Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine 数据库的数据变化,并将这些变更以数据流形式传输到 Flink 中进行处理,同时确保数据的一致性和完整性。 + +#### 参数说明 +| 参数名称 | 类型 | 参数说明 | 备注 | +| ----------------------- | :-----: | ------------------------- | -------------------------- | +| TDengineCdcParams.BOOTSTRAP_SERVERS| string | 服务端的 IP 地址 | | +| TDengineCdcParams.CONNECT_USER| string | 用户名 | | +| TDengineCdcParams.CONNECT_PASS| string | 密码 | | +| TDengineCdcParams.POLL_INTERVAL_MS|int| 拉取数据间隔, 默认 500ms| | +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| +| TDengineCdcParams.TMQ_BATCH_MODE| boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords\<类型\> | 此处的类型为用下游算子接收数据的类型| +| TDengineCdcParams.GROUP_ID| string | 消费组 ID,同一消费组共享消费进度 |
**必填项**。最大长度:192。
每个 topic 最多可建立 100 个 consumer +| TDengineCdcParams.AUTO_OFFSET_RESET| string | 消费组订阅的初始位置 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅| +| TDengineCdcParams.ENABLE_AUTO_COMMIT| boolean | 是否自动提交,true: 启用(用于下游均为无状态算子) ;false:由 checkpoint 触发 commit | 默认 false| +| TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS| integer|消费记录自动提交消费位点时间间隔,单位为毫秒| 默认值为 5000, 此参数在 AUTO_OFFSET_RESET 为 true 生效| +| TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS| integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从 TDengine 3.3.3.0 版本开始支持)| 默认值为 12000,取值范围 [6000, 1800000] | +| TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS| integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除(从 3.3.3.0 版本开始支持) | 默认值为 300000,[1000,INT32_MAX] + +#### 使用 CDC 连接器 + +CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用户根据资源情况合理设置并行度。 + +订阅结果为 RowData 数据类型示例: + +
+CDC Source +```java +{{#include docs/examples/flink/Main.java:cdc_source}} +``` +
+ +批量查询结果示例: + +
+CDC Batch Source +```java +{{#include docs/examples/flink/Main.java:cdc_batch_source}} +``` +
+ +查询结果为自定义数据类型示例: + +
+CDC Custom Type +```java +{{#include docs/examples/flink/Main.java:cdc_custom_type_test}} +``` +
+ +- ResultBean 是自定义的一个内部类,其字段名和数据类型与列的名称和数据类型一一对应,这样根据 value.deserializer 属性对应的反序列化类可以反序列化出 ResultBean 类型的对象。 + +### Sink + +Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自不同数据源或算子的数据写入 TDengine。在这一过程中,TDengine 所具备的高效写入机制发挥了至关重要的作用,有力保障了数据的快速和稳定存储。 + +#### Sink Properties + +| 参数名称 | 类型 | 参数说明 | 备注 | +| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| | +| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| | +| TDengineConfigParams.PROPERTY_KEY_DBNAME| string | 写入的数据库名称|| +| TDengineConfigParams.TD_SUPERTABLE_NAME| string | 写入的超级表名称| 如果是超级表接收的数据必须有 tbname 字段,确定写入那张子表| +| TDengineConfigParams.TD_TABLE_NAME| string | 写入的表名,此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可| 用于确定写入那张子表或普通表| +| TDengineConfigParams.TD_BATCH_SIZE| integer | 设置批大小 | 当到达批的数量后进行写入,或是一个checkpoint的时间也会触发写入数据库| +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| +| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于设置接收批量数据 | 如果设置为 True:
来源是 TDengine Source, 使用SourceRecords\<类型\> 创建 TDengineSink 对象
来源是 TDengine CDC 使用 ConsumerRecords\<类型\> 创建 TDengineSink 对象 | 此处的类型为接收数据的类型| +| TDengineConfigParams.TD_SOURCE_TYPE | string | 如果数据是表示数据来源是,source 或者 cdc 等 | TDengine source 设置为 "tdengine_source", TDengine cdc 设置为 "tdengine_cdc"| +| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false|| +| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效| +| TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| + +#### 使用 Sink 连接器 + +将接收的 RowData 类型数据写入 TDengine 示例: + +
+Sink RowData +```java +{{#include docs/examples/flink/Main.java:RowDataToSink}} +``` +
+ + +将批量接收的 RowData 类型数据写入 TDengine 示例: + +
+Sink RowData +```java +{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +``` +
+ +### Table SQL + +数据处理 ETL(Extract,Transform,Load):可以使用 Flink SQL With JDBC 从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据,在 Flink 中进行转换操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。 + + +#### Source 连接器 + +参数配置说明: + +| 参数名称 | 类型 | 参数说明 | 备注 | +| ----------------------- | :-----: | ------------ | ------ | +| connector | string | 连接器标识,设置 tdengine-connector || +| td.jdbc.url| string | 连接的 url | | +| td.jdbc.mode | strng | 连接器类型, 设置 source, cdc, sink| | +| table.name| string| 原表或目标表名称| | +| scan.query| string| 获取数据的 SQL 语句|| +| sink.db.name|string| 目标数据库名称|| +| sink.supertable.name|string |写入的超级表名称|| +| sink.batch.size | integer | 写入的批大小|| +| sink.table.name|string|写入的普通表或子表名称|| + +#### Source 连接器使用示例 + +
+Table Source +```java +{{#include docs/examples/flink/Main.java:source_table}} +``` +
+ +#### CDC 连接器 + +参数配置说明: + +| 参数名称 | 类型 | 参数说明 | 备注 | +| ----------------------- | :-----: | ------------ |-------| +| connector | string | 连接器标识,设置 tdengine-connector || +| user| string | 用户名, 默认 root| | +| password | string | 密码, 默认taosdata| | +| bootstrap.servers| string | 服务器地址 | | +| topic | string | 订阅主题 || +| td.jdbc.mode | strng | 连接器类型, cdc, sink| | +| group.id| string| 消费组 ID,同一消费组共享消费进度 | | +| auto.offset.reset| string| 消费组订阅的初始位置 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅| +| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms| | +| sink.db.name|string| 目标数据库名称|| +| sink.supertable.name|string |写入的超级表名称|| +| sink.batch.size | integer | 写入的批大小|| +| sink.table.name|string|写入的普通表或子表名称|| + + + +#### CDC 连接器使用示例 + +
+Table CDC +```java +{{#include docs/examples/flink/Main.java:cdc_table}} +``` +
From 6dfd0747264be360ded119fbbdfce713adf80376 Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 14:13:08 +0800 Subject: [PATCH 2/6] modify flink connector version --- docs/zh/10-third-party/01-collection/12-flink.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index 8b2e5422b1c..d411b728cb0 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -111,7 +111,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowD com.taosdata.flink flink-connector-tdengine - 1.0.0 + 2.0.0 ``` From e5bd690c7a9c16b76bb36cc3de92c1274cf332de Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 14:15:59 +0800 Subject: [PATCH 3/6] modify flink connector history version --- docs/zh/10-third-party/01-collection/12-flink.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index d411b728cb0..46374f80695 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -28,7 +28,7 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 ## 版本历史 | Flink Connector 版本 | 主要变化 | TDengine 版本 | | ------------------| ------------------------------------ | ---------------- | -| 1.1.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据
2. 支持 CDC 订阅 TDengine 数据库中的数据
3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | +| 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据
2. 支持 CDC 订阅 TDengine 数据库中的数据
3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | ## 异常和错误码 From 511b83f187695e0cd6f49c9b0ffccd8ea40e811b Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 17:00:27 +0800 Subject: [PATCH 4/6] modify flink connector docs format --- .../10-third-party/01-collection/12-flink.md | 122 +++++++++--------- 1 file changed, 61 insertions(+), 61 deletions(-) diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index 46374f80695..b58a013f384 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -38,14 +38,14 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 | Error Code | Description | Suggested Actions | | ---------------- |------------------------------------------------------- | -------------------- | -| 0xa000 |connection param error |连接器参数错误 +| 0xa000 |connection param error |连接器参数错误。 | 0xa001 |the groupid parameter of CDC is incorrect |CDC 的 groupid 参数错误。| | 0xa002 |wrong topic parameter for CDC |CDC 的 topic 参数错误。| -| 0xa010 |database name configuration error |数据库名配置错误| -| 0xa011 |table name configuration error |表名配置错误| -| 0xa012 |no data was obtained from the data source |从数据源中获取数据失败| -| 0xa013 |value.deserializer parameter not set |未设置序列化方式| -| 0xa014 |list of column names for target table not set |未设置目标表的列名列表| +| 0xa010 |database name configuration error |数据库名配置错误。| +| 0xa011 |table name configuration error |表名配置错误。| +| 0xa012 |no data was obtained from the data source |从数据源中获取数据失败。| +| 0xa013 |value.deserializer parameter not set |未设置序列化方式。| +| 0xa014 |list of column names for target table not set |未设置目标表的列名列表。| | 0x2301 |connection already closed |连接已经关闭,检查连接情况,或重新创建连接去执行相关指令。| | 0x2302 |this operation is NOT supported currently! |当前使用接口不支持,可以更换其他连接方式。| | 0x2303 |invalid variables |参数不合法,请检查相应接口规范,调整参数类型及大小。| @@ -140,15 +140,15 @@ Properties 中配置参数如下: | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| | -| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| | -| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| -| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineSource 对象时需要指定数据类型为 SourceRecords\<类型\> | 此处的类型为用下游算子接收数据的类型| +| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'。| | +| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'。| | +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。| +| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineSource 对象时需要指定数据类型为 SourceRecords\<类型\> 。 | 此处的类型为用下游算子接收数据的类型。| | TDengineConfigParams.PROPERTY_KEY_CHARSET | string | 客户端使用的字符集,默认值为系统字符集。| | -| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000| | -| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false| | -| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false|| -| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效| +| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000。| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false。| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false。|| +| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。| | TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| #### 按时间分片 @@ -214,18 +214,18 @@ Flink CDC 主要用于提供数据订阅功能,能实时监控 TDengine 数据 #### 参数说明 | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------------------- | -------------------------- | -| TDengineCdcParams.BOOTSTRAP_SERVERS| string | 服务端的 IP 地址 | | -| TDengineCdcParams.CONNECT_USER| string | 用户名 | | -| TDengineCdcParams.CONNECT_PASS| string | 密码 | | -| TDengineCdcParams.POLL_INTERVAL_MS|int| 拉取数据间隔, 默认 500ms| | -| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| -| TDengineCdcParams.TMQ_BATCH_MODE| boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords\<类型\> | 此处的类型为用下游算子接收数据的类型| -| TDengineCdcParams.GROUP_ID| string | 消费组 ID,同一消费组共享消费进度 |
**必填项**。最大长度:192。
每个 topic 最多可建立 100 个 consumer -| TDengineCdcParams.AUTO_OFFSET_RESET| string | 消费组订阅的初始位置 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅| -| TDengineCdcParams.ENABLE_AUTO_COMMIT| boolean | 是否自动提交,true: 启用(用于下游均为无状态算子) ;false:由 checkpoint 触发 commit | 默认 false| -| TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS| integer|消费记录自动提交消费位点时间间隔,单位为毫秒| 默认值为 5000, 此参数在 AUTO_OFFSET_RESET 为 true 生效| -| TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS| integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从 TDengine 3.3.3.0 版本开始支持)| 默认值为 12000,取值范围 [6000, 1800000] | -| TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS| integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除(从 3.3.3.0 版本开始支持) | 默认值为 300000,[1000,INT32_MAX] +| TDengineCdcParams.BOOTSTRAP_SERVERS| string | 服务端的 IP 地址。 | | +| TDengineCdcParams.CONNECT_USER| string | 用户名。 | | +| TDengineCdcParams.CONNECT_PASS| string | 密码。 | | +| TDengineCdcParams.POLL_INTERVAL_MS|int| 拉取数据间隔, 默认 500ms。| | +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型。 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。| +| TDengineCdcParams.TMQ_BATCH_MODE| boolean | 此参数用于批量将数据推送给下游算子,如果设置为 True,创建 TDengineCdcSource 对象时需要指定数据类型为 ConsumerRecords\<类型\>。| 此处的类型为用下游算子接收数据的类型。| +| TDengineCdcParams.GROUP_ID| string | 消费组 ID,同一消费组共享消费进度。 |
**必填项**。最大长度:192。
每个 topic 最多可建立 100 个 consumer 。| +| TDengineCdcParams.AUTO_OFFSET_RESET| string | 消费组订阅的初始位置。 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| +| TDengineCdcParams.ENABLE_AUTO_COMMIT| boolean | 是否自动提交,true: 启用(用于下游均为无状态算子) ;false:由 checkpoint 触发 commit 。| 默认 false。| +| TDengineCdcParams.AUTO_COMMIT_INTERVAL_MS| integer|消费记录自动提交消费位点时间间隔,单位为毫秒。| 默认值为 5000, 此参数在 AUTO_OFFSET_RESET 为 true 生效。| +| TDengineCdcParams.TMQ_SESSION_TIMEOUT_MS| integer | consumer 心跳丢失后超时时间,超时后会触发 rebalance 逻辑,成功后该 consumer 会被删除(从 TDengine 3.3.3.0 版本开始支持)。| 默认值为 12000,取值范围 [6000, 1800000]。 | +| TDengineCdcParams.TMQ_MAX_POLL_INTERVAL_MS| integer | consumer poll 拉取数据间隔的最长时间,超过该时间,会认为该 consumer 离线,触发 rebalance 逻辑,成功后该 consumer 会被删除(从 3.3.3.0 版本开始支持)。 | 默认值为 300000,[1000,INT32_MAX]。| #### 使用 CDC 连接器 @@ -268,19 +268,19 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'| | -| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'| | -| TDengineConfigParams.PROPERTY_KEY_DBNAME| string | 写入的数据库名称|| -| TDengineConfigParams.TD_SUPERTABLE_NAME| string | 写入的超级表名称| 如果是超级表接收的数据必须有 tbname 字段,确定写入那张子表| -| TDengineConfigParams.TD_TABLE_NAME| string | 写入的表名,此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可| 用于确定写入那张子表或普通表| -| TDengineConfigParams.TD_BATCH_SIZE| integer | 设置批大小 | 当到达批的数量后进行写入,或是一个checkpoint的时间也会触发写入数据库| -| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径| -| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于设置接收批量数据 | 如果设置为 True:
来源是 TDengine Source, 使用SourceRecords\<类型\> 创建 TDengineSink 对象
来源是 TDengine CDC 使用 ConsumerRecords\<类型\> 创建 TDengineSink 对象 | 此处的类型为接收数据的类型| -| TDengineConfigParams.TD_SOURCE_TYPE | string | 如果数据是表示数据来源是,source 或者 cdc 等 | TDengine source 设置为 "tdengine_source", TDengine cdc 设置为 "tdengine_cdc"| -| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000| | -| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false| | -| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false|| -| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效| +| TDengineConfigParams.PROPERTY_KEY_USER | string | 登录 TDengine 用户名,默认值 'root'。| | +| TDengineConfigParams.PROPERTY_KEY_PASSWORD| string | 用户登录密码,默认值 'taosdata'。| | +| TDengineConfigParams.PROPERTY_KEY_DBNAME| string | 写入的数据库名称。|| +| TDengineConfigParams.TD_SUPERTABLE_NAME| string | 写入的超级表名称。| 如果是超级表接收的数据必须有 tbname 字段,确定写入那张子表。| +| TDengineConfigParams.TD_TABLE_NAME| string | 写入的表名,此参数和TD_SUPERTABLE_NAME 仅需要设置一个即可。| 用于确定写入那张子表或普通表。| +| TDengineConfigParams.TD_BATCH_SIZE| integer | 设置批大小 | 当到达批的数量后进行写入,或是一个checkpoint的时间也会触发写入数据库。| +| TDengineConfigParams.VALUE_DESERIALIZER| string | 游算子接收数据的类型。 | 如果下游算子接收数据的类型是 RowData 仅需要设置为 RowData, 如果用户需要自定义类型这里需要设置完整的类路径。| +| TDengineConfigParams.TD_BATCH_MODE | boolean | 此参数用于设置接收批量数据。 | 如果设置为 True:
来源是 TDengine Source, 使用SourceRecords\<类型\> 创建 TDengineSink 对象
来源是 TDengine CDC 使用 ConsumerRecords\<类型\> 创建 TDengineSink 对象。| 此处的类型为接收数据的类型。| +| TDengineConfigParams.TD_SOURCE_TYPE | string | 如果数据是表示数据来源是,source 或者 cdc 等。 | TDengine source 设置为 "tdengine_source", TDengine cdc 设置为 "tdengine_cdc"。| +| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | 消息超时时间,单位 ms, 默认值为 60000。| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | 传输过程是否启用压缩。true: 启用,false: 不启用。默认为 false。| | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT| boolean| 是否启用自动重连。true: 启用,false: 不启用。默认为 false。|| +| TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3。 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。| | TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| #### 使用 Sink 连接器 @@ -315,15 +315,15 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------ | ------ | -| connector | string | 连接器标识,设置 tdengine-connector || -| td.jdbc.url| string | 连接的 url | | -| td.jdbc.mode | strng | 连接器类型, 设置 source, cdc, sink| | -| table.name| string| 原表或目标表名称| | -| scan.query| string| 获取数据的 SQL 语句|| -| sink.db.name|string| 目标数据库名称|| -| sink.supertable.name|string |写入的超级表名称|| -| sink.batch.size | integer | 写入的批大小|| -| sink.table.name|string|写入的普通表或子表名称|| +| connector | string | 连接器标识,设置 `tdengine-connector` 。|| +| td.jdbc.url| string | 连接的 url 。| | +| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `cdc`, `sink`。| | +| table.name| string| 原表或目标表名称。| | +| scan.query| string| 获取数据的 SQL 语句。|| +| sink.db.name|string| 目标数据库名称。|| +| sink.supertable.name|string |写入的超级表名称。|| +| sink.batch.size | integer | 写入的批大小。|| +| sink.table.name|string|写入的普通表或子表名称。|| #### Source 连接器使用示例 @@ -340,19 +340,19 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------ |-------| -| connector | string | 连接器标识,设置 tdengine-connector || -| user| string | 用户名, 默认 root| | -| password | string | 密码, 默认taosdata| | -| bootstrap.servers| string | 服务器地址 | | -| topic | string | 订阅主题 || -| td.jdbc.mode | strng | 连接器类型, cdc, sink| | -| group.id| string| 消费组 ID,同一消费组共享消费进度 | | -| auto.offset.reset| string| 消费组订阅的初始位置 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅| -| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms| | -| sink.db.name|string| 目标数据库名称|| -| sink.supertable.name|string |写入的超级表名称|| -| sink.batch.size | integer | 写入的批大小|| -| sink.table.name|string|写入的普通表或子表名称|| +| connector | string | 连接器标识,设置 `tdengine-connector`。|| +| user| string | 用户名, 默认 root。| | +| password | string | 密码, 默认taosdata。| | +| bootstrap.servers| string | 服务器地址。| | +| topic | string | 订阅主题。|| +| td.jdbc.mode | strng | 连接器类型, cdc, sink。| | +| group.id| string| 消费组 ID,同一消费组共享消费进度。 | | +| auto.offset.reset| string| 消费组订阅的初始位置。 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| +| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。| | +| sink.db.name|string| 目标数据库名称。|| +| sink.supertable.name|string |写入的超级表名称。|| +| sink.batch.size | integer | 写入的批大小。|| +| sink.table.name|string|写入的普通表或子表名称。|| From db459aca3776e400420c4605b099d9e29d0a6b5f Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 17:34:45 +0800 Subject: [PATCH 5/6] add english flink connector docs --- docs/en/10-third-party/01-collection/flink.md | 358 ++++++++++++++++++ 1 file changed, 358 insertions(+) create mode 100644 docs/en/10-third-party/01-collection/flink.md diff --git a/docs/en/10-third-party/01-collection/flink.md b/docs/en/10-third-party/01-collection/flink.md new file mode 100644 index 00000000000..f14ffc33a67 --- /dev/null +++ b/docs/en/10-third-party/01-collection/flink.md @@ -0,0 +1,358 @@ +--- +sidebar_label: Flink +title: TDengine Flink Connector +--- + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +## Preconditions + +Prepare the following environment: + +- TDengine cluster has been deployed and is running normally (both enterprise and community versions are available) +- TaosAdapter can run normally. +- Apache Flink v1.19.0 or above is installed. Please refer to the installation of Apache Flink [Official documents](https://flink.apache.org/) + +## JRE version compatibility + +JRE: Supports JRE 8 and above versions. + +## Supported platforms + +Flink Connector supports all platforms that can run Flink 1.19 and above versions. + +## Version History + + +| Flink Connector Version | Major Changes | TDengine Version| +|-------------------------| ------------------------------------ | ---------------- | +| 2.0.0 | 1. Support SQL queries on data in TDengine database
2 Support CDC subscription to data in TDengine database
3 Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions| + +## Exception and error codes + +After the task execution fails, check the Flink task execution log to confirm the reason for the failure +Please refer to: + +| Error Code | Description | Suggested Actions | +| ---------------- |------------------------------------------------------- | -------------------- | +|0xa000 | connection param error | connector parameter error +|0xa001 | The groupid parameter of CDC is incorrect | The groupid parameter of CDC is incorrect| +|0xa002 | wrong topic parameter for CDC | The topic parameter for CDC is incorrect| +|0xa010 | database name configuration error | database name configuration error| +|0xa011 | Table name configuration error | Table name configuration error| +|0xa012 | No data was obtained from the data source | Failed to retrieve data from the data source| +|0xa013 | value.deserializer parameter not set | No serialization method set| +|0xa014 | List of column names for target table not set | List of column names for target table not set || +|0x2301 | Connection already closed | The connection has been closed. Check the connection status or create a new connection to execute the relevant instructions| +|0x2302 | this operation is NOT supported currently | The current interface is not supported, you can switch to other connection methods| +|0x2303 | invalid variables | The parameter is invalid. Please check the corresponding interface specification and adjust the parameter type and size| +|0x2304 | Statement is closed | Statement has already been closed. Please check if the statement is closed and reused, or if the connection is working properly| +|0x2305 | ResultSet is closed | The ResultSet has been released. Please check if the ResultSet has been released and used again| +|0x230d | parameter index out of range | parameter out of range, please check the reasonable range of the parameter| +|0x230e | Connection already closed | The connection has been closed. Please check if the connection is closed and used again, or if the connection is working properly| +|0x230f | unknown SQL type in TDengine | Please check the Data Type types supported by TDengine| +|0x2315 | unknown tao type in TDengine | Did the correct TDengine data type be specified when converting TDengine data type to JDBC data type| +|0x2319 | user is required | Username information is missing when creating a connection| +|0x231a | password is required | Password information is missing when creating a connection| +|0x231d | can't create connection with server within | Increase connection time by adding the parameter httpConnectTimeout, or check the connection status with taosAdapter| +|0x231e | failed to complete the task within the specified time | Increase execution time by adding the parameter messageWaitTimeout, or check the connection with taosAdapter| +|0x2352 | Unsupported encoding | An unsupported character encoding set was specified under the local connection| +|0x2353 |internal error of database, Please see taoslog for more details | An error occurred while executing prepareStatement on the local connection. Please check the taoslog for problem localization| +|0x2354 | Connection is NULL | Connection has already been closed while executing the command on the local connection. Please check the connection with TDengine| +|0x2355 | result set is NULL | Local connection to obtain result set, result set exception, please check connection status and retry| +|0x2356 | invalid num of fields | The meta information obtained from the local connection result set does not match| +|0x2357 | empty SQL string | Fill in the correct SQL for execution| +|0x2371 |consumer properties must not be null | When creating a subscription, the parameter is empty. Please fill in the correct parameter| +|0x2375 | Topic reference has been destroyed | During the process of creating a data subscription, the topic reference was released. Please check the connection with TDengine| +|0x2376 |failed to set consumer topic, Topic name is empty | During the process of creating a data subscription, the subscription topic name is empty. Please check if the specified topic name is filled in correctly| +|0x2377 | Consumer reference has been destroyed | The subscription data transmission channel has been closed, please check the connection with TDengine| +|0x2378 | Consumer create error | Failed to create data subscription. Please check the taos log based on the error message to locate the problem| +|0x237a | vGroup not found in result set VGroup | Not assigned to the current consumer, due to the Rebalance mechanism, the relationship between Consumer and VGroup is not bound| + +## Data type mapping + +TDengine currently supports timestamp, number, character, and boolean types, and the corresponding type conversions with Flink RowData Type are as follows: + +| TDengine DataType | Flink RowDataType | +| ----------------- | ------------------ | +| TIMESTAMP | TimestampData | +| INT | Integer | +| BIGINT | Long | +| FLOAT | Float | +| DOUBLE | Double | +| SMALLINT | Short | +| TINYINT | Byte | +| BOOL | Boolean | +| BINARY | byte[] | +| NCHAR | StringData | +| JSON | StringData | +| VARBINARY | byte[] | +| GEOMETRY | byte[] | + +## Instructions for use +### Flink Semantic Selection Instructions + +The semantic reason for using At Least One (at least once) is: +-TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination. +-Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations. +-Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows: + +```text +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); +env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); +``` + +If using Maven to manage a project, simply add the following dependencies in pom.xml. + +```xml + + com.taosdata.flink + flink-connector-tdengine + 2.0.0 + +``` + +The parameters for establishing a connection include URL and Properties. +The URL specification format is: + +`jdbc: TAOS-WS://[host_name]:[port]/[database_name]? [user={user}|&password={password}|&timezone={timezone}]` + +Parameter description: + +- User: Login TDengine username, default value is' root '. +- Password: User login password, default value 'taosdata'. +- batchErrorIgnore: true: If there is an SQL execution failure in the middle of the ExecutBatch of Statement, continue to execute the following SQL. false: Do not execute any statements after failed SQL. The default value is: false。 +- HttpConnectTimeout: The connection timeout time, measured in milliseconds, with a default value of 60000. +- MessageWaitTimeout: The timeout period for a message, measured in milliseconds, with a default value of 60000. +- UseSSL: Whether SSL is used in the connection. + +### Source + +Source retrieves data from the TDengine database, converts it into a format and type that Flink can handle internally, and reads and distributes it in parallel, providing efficient input for subsequent data processing. +By setting the parallelism of the data source, multiple threads can read data from the data source in parallel, improving the efficiency and throughput of data reading, and fully utilizing cluster resources for large-scale data processing capabilities. + +#### Source Properties + +The configuration parameters in Properties are as follows: + +|Parameter Name | Type | Parameter Description | Remarks| +| ----------------------- | :-----: | ------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| TDengineConfigParams.PROPERTYKEYUSER | string | Login TDengine username, default value 'root' || +| TDengineConfigParams.PROPERTYKEY-PASSWORD | string | User login password, default value 'taosdata' || +| TDengineConfigParams.If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here| +| TDengineConfigParams.TD_STACTMODE | boolean | This parameter is used to batch push data to downstream operators. If set to True, when creating a TDengine Source object, the data type needs to be specified as SourceRecords \| The type here is the type used to receive data from downstream operators| +| TDengineConfigParams.PROPERTYKEY_CARSET | string | The character set used by the client, with the default value being the system character set. || +| TDengineConfigParams.PROPERTYKEY.MSSAGE_maIT_TIMEOUT | integer | Message timeout, in milliseconds, default value is 60000 || +| TDengineConfigParams.Whether compression is enabled during the transmission process. true: Enable, false: Not enabled. Default is false || +| TDengineConfigParams.Whether to enable automatic reconnection or not. true: Enable, false: Not enabled. Default to false|| +| TDengineConfigParams.PROPERTYKEY-RECONNECT-RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true| +| TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false|| + +#### Split by time + +Users can split the SQL query into multiple subtasks based on time, entering: start time, end time, split interval, time field name. The system will split and obtain data in parallel according to the set interval (time left closed and right open). + +```java +{{#include docs/examples/flink/Main.java:time_interval}} +``` + +Splitting by Super Table TAG + +Users can split the query SQL into multiple query conditions based on the TAG field of the super table, and the system will split them into subtasks corresponding to each query condition, thereby obtaining data in parallel. + +```java +{{#include docs/examples/flink/Main.java:tag_split}} +``` + +Classify by table + +Support sharding by inputting multiple super tables or regular tables with the same table structure. The system will split them according to the method of one table, one task, and then obtain data in parallel. + +```java +{{#include docs/examples/flink/Main.java:table_split}} +``` + +Use Source connector + +The query result is RowData data type example: + +
+RowData Source +```java +{{#include docs/examples/flink/Main.java:source_test}} +``` +
+ +Example of batch query results: + +
+Batch Source +```java +{{#include docs/examples/flink/Main.java:source_batch_test}} +``` +
+ +Example of custom data type query result: + +
+Custom Type Source +```java +{{#include docs/examples/flink/Main.java:source_custom_type_test}} +``` +
+ +- ResultBean is a custom inner class used to define the data type of the Source query results. +- ResultSoureDeserialization is a custom inner class that inherits Tdengine RecordDesrialization and implements convert and getProducedType methods. + +### CDC Data Subscription +Flink CDC is mainly used to provide data subscription functionality, which can monitor real-time changes in TDengine database data and transmit these changes in the form of data streams to Flink for processing, while ensuring data consistency and integrity. + +#### Parameter Description + +| Parameter Name | Type | Parameter Description | Remarks | +|-------------------------------------------|:-------:|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| TDengineCdcParams.BOOTSTRAP_SERVER | string | ip address of the server | | +| TDengineCdcParams.CONNECT-USER | string | username | | +| TDengineCdcParams.CONNECT-PASS | string | password | | +| TDengineCdcParams.POLL_INTERVAL_MS | integer | pull data interval, default 500ms | | +| TDengineConfigParams.VALUE_DESERIALIZER | string | the type of data received by the operator| If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here | +| TDengineCdcParams.TMQ_STACTMODE | boolean | this parameter is used to batch push data to downstream operators. If set to True, when creating a TDengine CdcSource object, the data type needs to be specified as ConsumerRecords \ | The type here is the type used to receive data from downstream operators | +| TDengineCdcParams.GROUP ID | string | consumption group ID, shared consumption progress within the same consumption group |
* * Required field * *. Maximum length: 192< Each topic can create up to 100 consumers +| TDengineCdcParams.AUTO-OFFSET-REET | string | initial position of consumer group subscription | early: subscribe from scratch
latest: default; Subscribe only from the latest data | +| TDengineCdcParams.ENABLEAUTO_CMMIT | boolean | whether to automatically submit, true: Enable (for downstream stateless operators); false: Commit triggered by checkpoint; default false | | +| TDengineCdcParams.AUTO_CMMIT_INTERVAL_S | integer | the time interval for automatically submitting consumption records to consumption sites, in milliseconds | The default value is 5000, and this parameter takes effect when AUTO_oFFSET-REET is set to true | +| TDengineCdcParams.TMQ_SSSION_TIMEOUT_SS | integer | timeout after consumer heartbeat loss, which triggers rebalancing logic. After success, the consumer will be deleted (supported from TDengine 3.3.3.0 version) | default value is 12000, value range [60001800000] | +| TDengineCdcParams.TMQ_maX_POLL_INTERVAL_S | integer | the longest time interval for pulling data from a consumer poll. If this time is exceeded, the consumer will be considered offline and the rebalancing logic will be triggered. After success, the consumer will be deleted (supported from version 3.3.3.0) | The default value is 300000, [1000,INT32_MAX] + +#### Use CDC connector + +The CDC connector will create consumers based on the parallelism set by the user, so the user should set the parallelism reasonably according to the resource situation. +The subscription result is RowData data type example: + +
+CDC Source +```java +{{#include docs/examples/flink/Main.java:cdc_source}} +``` +
+ +Example of batch query results: + +
+CDC Batch Source +```java +{{#include docs/examples/flink/Main.java:cdc_batch_source}} +``` +
+ +Example of custom data type query result: + +
+CDC Custom Type +```java +{{#include docs/examples/flink/Main.java:cdc_custom_type_test}} +``` +
+ +- ResultBean is a custom inner class whose field names and data types correspond one-to-one with column names and data types. This allows the deserialization class corresponding to the value.ddeserializer property to deserialize objects of ResultBean type. + +### Sink + +The core function of Sink is to efficiently and accurately write Flink processed data from different data sources or operators into TDengine. In this process, the efficient write mechanism possessed by TDengine played a crucial role, effectively ensuring the fast and stable storage of data. + +#### Sink Properties + +| Parameter Name | Type | Parameter Description | Remarks| +|---------------------------------------------------------|:----------------------------------------------------------------------------------------------------:|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| TDengineConfigParams.PROPERTYKEYUSER | string | Login TDengine username, default value 'root' || +| TDengineConfigParams.PROPERTYKEY-PASSWORD | string | User login password, default value 'taosdata' || +| TDengineConfigParams.PROPERTYKEYDBNAME | string | Database name written || +| TDengineConfigParams.TD_SUPERTABLeNAME | string | Name of the super table to be written | If the data received by the super table must have a tbname field, determine which sub table to write to| +| TDengineConfigParams.TD_TABLeNAME | string | The name of the table to be written, this parameter only needs to be set together with TD_SUPERTABLeNAME | Used to determine which sub table or regular table to write to| +| TDengineConfigParams.TD_STACTISZE | integer | Set batch size | Write when the batch quantity is reached, or a checkpoint time will also trigger writing to the database| +| TDengineConfigParams.VALUE_DESERIALIZER | string | If the downstream operator receives data of RowData type, it only needs to be set to RowData. If the user needs to customize the type, the complete class path needs to be set here | +| TDengineConfigParams.TD_STACTMODE | boolean | This parameter is used to set the reception of batch data | If set to True:< The source is TDengine Source, using SourceRecords \to create TDengine Sink object
The source is TDengine CDC, using ConsumerRecords \to create TDengine Sink object | The type here is the type that receives data| +| TDengineConfigParams.TD_SOURCETYPE | string | If the data is from a source, such as source or cdc | TDengine source is set to "tdengine_stource", TDengine cdc is set to "tdengine_cdc"| +| TDengineConfigParams.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT | integer | Message timeout, in milliseconds, default value is 60000 || +| TDengineConfigParams.PROPERTY_KEY_ENABLE_COMPRESSION | boolean | Whether compression is enabled during the transmission process. true: Enable, false: Not enabled. Default is false | | +| TDengineConfigParams.PROPERTY_KEY_ENABLE_AUTO_RECONNECT | integer| to enable automatic reconnection or not. true: Enable, false: Not enabled. Default to false | | +| TDengineConfigParams.PROPERTYKEY_RECONNECT_RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true| +| TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false || + +#### Use Sink connector +Write the received RowData type data into TDengine example: + +
+Sink RowData +```java +{{#include docs/examples/flink/Main.java:RowDataToSink}} +``` +
+ +Write batch received RowData data into TDengine example: + +
+Sink RowData +```java +{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +``` +
+ +### Table SQL + +ETL (Extract, Transform, Load) data processing: Flink SQL with JDBC can be used to extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.), perform transformation operations (such as data cleaning, format conversion, associating data from different tables, etc.) in Flink, and then load the processed results into the target data source (such as TDengine, MySQL, etc.). +#### Source connector + +Parameter configuration instructions: + +| Parameter Name | Type | Parameter Description | Remarks| +|-----------------------| :-----: | ------------ | ------ | +| connector | string | connector identifier, set `tdengine-connector`|| +| td.jdbc.url | string | url of the connection || +| td.jdbc.mode | strng | connector type: `source`, `cdc`, `sink`| | +| table.name | string | original or target table name || +| scan.query | string | SQL statement to retrieve data|| +| sink.db.name | string | target database name|| +| sink.superstable.name | string | write the name of the superstable|| +| sink.batch.size | integer| batch size written|| +| sink.table.name | string | name of the regular table or sub table written|| + +#### Example of using Source connector + +
+Table Source +```java +{{#include docs/examples/flink/Main.java:source_table}} +``` +
+ +#### CDC connector +Parameter configuration instructions: + +| Parameter Name | Type | Parameter Description | Remarks| +|-------------------| :-----: |--------------------------------------------------------------------------------------|-------| +| connector | string | connector identifier, set `tdengine-connector` || +| user | string | username, default root || +| password | string | password, default taosdata || +| bootstrap. servers| string | server address || +| topic | string | subscribe to topic || +| td.jdbc.mode | strng | connector type: `cdc`, `sink` | | +| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group || +| auto.offset.reset | string | initial position for consumer group subscription | earliest: subscribe from scratch
latest: default; Subscribe only from the latest data| +| poll.interval_mas | integer | Pull data interval, default 500ms || +| sink.db.name | string | Target database name || +| sink.superstable.name | string | Write the name of the superstable || +| sink.batch.size | integer | batch size written || +| sink.table.name | string | Name of the regular table or sub table written || + +#### Example of using CDC connector + +
+Table CDC +```java +{{#include docs/examples/flink/Main.java:cdc_table}} +``` +
+ From fa78cda0ad2708d673fd5fd2d5265911ddfe4d12 Mon Sep 17 00:00:00 2001 From: menshibin Date: Tue, 31 Dec 2024 18:58:06 +0800 Subject: [PATCH 6/6] add english flink connector docs --- docs/en/10-third-party/01-collection/flink.md | 80 +++++++------ docs/examples/flink/Main.java | 21 ++-- .../10-third-party/01-collection/12-flink.md | 108 +++++++++--------- 3 files changed, 110 insertions(+), 99 deletions(-) diff --git a/docs/en/10-third-party/01-collection/flink.md b/docs/en/10-third-party/01-collection/flink.md index f14ffc33a67..e716d5a757d 100644 --- a/docs/en/10-third-party/01-collection/flink.md +++ b/docs/en/10-third-party/01-collection/flink.md @@ -6,7 +6,11 @@ title: TDengine Flink Connector import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -## Preconditions +Apache Flink is an open-source distributed stream batch integrated processing framework supported by the Apache Software Foundation, which can be used for many big data processing scenarios such as stream processing, batch processing, complex event processing, real-time data warehouse construction, and providing real-time data support for machine learning. At the same time, Flink has a wealth of connectors and various tools that can interface with numerous different types of data sources to achieve data reading and writing. In the process of data processing, Flink also provides a series of reliable fault-tolerant mechanisms, effectively ensuring that tasks can run stably and continuously even in the event of unexpected situations. + +With the help of TDengine's Flink connector, Apache Flink can seamlessly integrate with the TDengine database. On the one hand, it can accurately store the results obtained after complex calculations and deep analysis into the TDengine database, achieving efficient storage and management of data; On the other hand, it is also possible to quickly and stably read massive amounts of data from the TDengine database, and conduct comprehensive and in-depth analysis and processing on this basis, fully tapping into the potential value of the data, providing strong data support and scientific basis for enterprise decision-making, greatly improving the efficiency and quality of data processing, and enhancing the competitiveness and innovation ability of enterprises in the digital age. + +## Prerequisites Prepare the following environment: @@ -14,20 +18,16 @@ Prepare the following environment: - TaosAdapter can run normally. - Apache Flink v1.19.0 or above is installed. Please refer to the installation of Apache Flink [Official documents](https://flink.apache.org/) -## JRE version compatibility - -JRE: Supports JRE 8 and above versions. - ## Supported platforms Flink Connector supports all platforms that can run Flink 1.19 and above versions. ## Version History - | Flink Connector Version | Major Changes | TDengine Version| |-------------------------| ------------------------------------ | ---------------- | | 2.0.0 | 1. Support SQL queries on data in TDengine database
2 Support CDC subscription to data in TDengine database
3 Supports reading and writing to TDengine database using Table SQL | 3.3.5.0 and above versions| +| 1.0.0 | Support Sink function to write data from other sources to TDengine in the future| 3.3.2.0 and above versions| ## Exception and error codes @@ -91,13 +91,17 @@ TDengine currently supports timestamp, number, character, and boolean types, and | GEOMETRY | byte[] | ## Instructions for use + ### Flink Semantic Selection Instructions The semantic reason for using At Least One (at least once) is: + -TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination. -Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations. -Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows: +Instructions: + ```text StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); @@ -123,7 +127,8 @@ Parameter description: - User: Login TDengine username, default value is' root '. - Password: User login password, default value 'taosdata'. -- batchErrorIgnore: true: If there is an SQL execution failure in the middle of the ExecutBatch of Statement, continue to execute the following SQL. false: Do not execute any statements after failed SQL. The default value is: false。 +- database_name: database name。 +- timezone: time zone。 - HttpConnectTimeout: The connection timeout time, measured in milliseconds, with a default value of 60000. - MessageWaitTimeout: The timeout period for a message, measured in milliseconds, with a default value of 60000. - UseSSL: Whether SSL is used in the connection. @@ -281,8 +286,9 @@ The core function of Sink is to efficiently and accurately write Flink processed | TDengineConfigParams.PROPERTYKEY_RECONNECT_RETR_COUNT | integer | number of automatic reconnection retries, default value 3 | only takes effect when PROPERTYKEY-INABLE AUTO-RECONNECT is true| | TDengineConfigParams.PROPERTYKEYDISABLE_SSL_CERTVNet | boolean | Disable SSL certificate verification. true: close, false: Not closed. The default is false || -#### Use Sink connector -Write the received RowData type data into TDengine example: +Usage example: + +Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
Sink RowData @@ -291,18 +297,21 @@ Write the received RowData type data into TDengine example: ```
-Write batch received RowData data into TDengine example: +Usage example: + +Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
-Sink RowData +Cdc Sink ```java -{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +{{#include docs/examples/flink/Main.java:CdcRowDataToSink}} ```
### Table SQL -ETL (Extract, Transform, Load) data processing: Flink SQL with JDBC can be used to extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.), perform transformation operations (such as data cleaning, format conversion, associating data from different tables, etc.) in Flink, and then load the processed results into the target data source (such as TDengine, MySQL, etc.). +Extract data from multiple different data source databases (such as TDengine, MySQL, Oracle, etc.) using Table SQL, perform custom operator operations (such as data cleaning, format conversion, associating data from different tables, etc.), and then load the processed results into the target data source (such as TDengine, MySQL, etc.). + #### Source connector Parameter configuration instructions: @@ -311,7 +320,7 @@ Parameter configuration instructions: |-----------------------| :-----: | ------------ | ------ | | connector | string | connector identifier, set `tdengine-connector`|| | td.jdbc.url | string | url of the connection || -| td.jdbc.mode | strng | connector type: `source`, `cdc`, `sink`| | +| td.jdbc.mode | strng | connector type: `source`, `sink`| | | table.name | string | original or target table name || | scan.query | string | SQL statement to retrieve data|| | sink.db.name | string | target database name|| @@ -319,7 +328,9 @@ Parameter configuration instructions: | sink.batch.size | integer| batch size written|| | sink.table.name | string | name of the regular table or sub table written|| -#### Example of using Source connector +Usage example: + +Write the sub table data of the meters table in the power database into the corresponding sub table of the sink_meters super table in the power_stink database.
Table Source @@ -328,26 +339,29 @@ Parameter configuration instructions: ```
-#### CDC connector +#### Table CDC connector + Parameter configuration instructions: -| Parameter Name | Type | Parameter Description | Remarks| -|-------------------| :-----: |--------------------------------------------------------------------------------------|-------| -| connector | string | connector identifier, set `tdengine-connector` || -| user | string | username, default root || -| password | string | password, default taosdata || -| bootstrap. servers| string | server address || -| topic | string | subscribe to topic || -| td.jdbc.mode | strng | connector type: `cdc`, `sink` | | -| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group || -| auto.offset.reset | string | initial position for consumer group subscription | earliest: subscribe from scratch
latest: default; Subscribe only from the latest data| -| poll.interval_mas | integer | Pull data interval, default 500ms || -| sink.db.name | string | Target database name || -| sink.superstable.name | string | Write the name of the superstable || -| sink.batch.size | integer | batch size written || -| sink.table.name | string | Name of the regular table or sub table written || - -#### Example of using CDC connector +| Parameter Name | Type | Parameter Description | +|-------------------| :-----: |--------------------------------------------------------------------------------------| +| connector | string | connector identifier, set `tdengine-connector` | +| user | string | username, default root | +| password | string | password, default taosdata | +| bootstrap. servers| string | server address | +| topic | string | subscribe to topic | +| td.jdbc.mode | strng | connector type: `cdc`, `sink` | +| group.id | string | Consumption group ID, sharing consumption progress within the same consumption group | +| auto.offset.reset | string | initial position for consumer group subscription.
earliest: subscribe from scratch
latest: default; Subscribe only from the latest data| +| poll.interval_mas | integer | Pull data interval, default 500ms | +| sink.db.name | string | Target database name | +| sink.superstable.name | string | Write the name of the superstable | +| sink.batch.size | integer | batch size written | +| sink.table.name | string | Name of the regular table or sub table written | + +Usage example: + +Subscribe to the sub table data of the meters super table in the power database and write it to the corresponding sub table of the sink_meters super table in the power_stink database.
Table CDC diff --git a/docs/examples/flink/Main.java b/docs/examples/flink/Main.java index b6017787764..12d79126cfd 100644 --- a/docs/examples/flink/Main.java +++ b/docs/examples/flink/Main.java @@ -414,7 +414,7 @@ static void testRowDataToSink() throws Exception { Properties connProps = new Properties(); connProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); connProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); - SourceSplitSql splitSql = getTimeSplit(); + SourceSplitSql sql = new SourceSplitSql("select ts, `current`, voltage, phase, tbname from meters"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(1000, CheckpointingMode.AT_LEAST_ONCE); @@ -437,8 +437,8 @@ static void testRowDataToSink() throws Exception { } //ANCHOR_END: RowDataToSink - //ANCHOR: BatchRowDataToSink - static void testBatchToTdSink() throws Exception { + //ANCHOR: CdcRowDataToSink + static void testCdcToSink() throws Exception { System.out.println("testTDengineCdcToTdSink start!"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); @@ -449,39 +449,32 @@ static void testBatchToTdSink() throws Exception { config.setProperty(TDengineCdcParams.CONNECT_TYPE, "ws"); config.setProperty(TDengineCdcParams.BOOTSTRAP_SERVERS, "localhost:6041"); config.setProperty(TDengineCdcParams.AUTO_OFFSET_RESET, "earliest"); - config.setProperty(TDengineCdcParams.MSG_WITH_TABLE_NAME, "true"); - config.setProperty(TDengineCdcParams.AUTO_COMMIT_INTERVAL, "1000"); config.setProperty(TDengineCdcParams.GROUP_ID, "group_1"); config.setProperty(TDengineCdcParams.CONNECT_USER, "root"); config.setProperty(TDengineCdcParams.CONNECT_PASS, "taosdata"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER, "RowData"); config.setProperty(TDengineCdcParams.VALUE_DESERIALIZER_ENCODING, "UTF-8"); - config.setProperty(TDengineCdcParams.TMQ_BATCH_MODE, "true"); - Class> typeClass = (Class>) (Class) ConsumerRecords.class; - TDengineCdcSource> tdengineSource = new TDengineCdcSource<>("topic_meters", config, typeClass); - DataStreamSource> input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); + TDengineCdcSource tdengineSource = new TDengineCdcSource<>("topic_meters", config, RowData.class); + DataStreamSource input = env.fromSource(tdengineSource, WatermarkStrategy.noWatermarks(), "tdengine-source"); Properties sinkProps = new Properties(); sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT, "true"); - sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); sinkProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); sinkProps.setProperty(TDengineConfigParams.VALUE_DESERIALIZER, "RowData"); - sinkProps.setProperty(TDengineConfigParams.TD_BATCH_MODE, "true"); - sinkProps.setProperty(TDengineConfigParams.TD_SOURCE_TYPE, "tdengine_cdc"); sinkProps.setProperty(TDengineConfigParams.TD_DATABASE_NAME, "power_sink"); sinkProps.setProperty(TDengineConfigParams.TD_SUPERTABLE_NAME, "sink_meters"); sinkProps.setProperty(TDengineConfigParams.TD_JDBC_URL, "jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata"); sinkProps.setProperty(TDengineConfigParams.TD_BATCH_SIZE, "2000"); - TDengineSink> sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); + TDengineSink sink = new TDengineSink<>(sinkProps, Arrays.asList("ts", "current", "voltage", "phase", "location", "groupid", "tbname")); input.sinkTo(sink); JobClient jobClient = env.executeAsync("Flink test cdc Example"); Thread.sleep(6000L); jobClient.cancel().get(); System.out.println("testTDengineCdcToTdSink finish!"); } - //ANCHOR_END: BatchRowDataToSink + //ANCHOR_END: CdcRowDataToSink //ANCHOR: source_table static void testTableToSink() throws Exception { diff --git a/docs/zh/10-third-party/01-collection/12-flink.md b/docs/zh/10-third-party/01-collection/12-flink.md index b58a013f384..af5434e0b34 100644 --- a/docs/zh/10-third-party/01-collection/12-flink.md +++ b/docs/zh/10-third-party/01-collection/12-flink.md @@ -17,10 +17,6 @@ Apache Flink 是一款由 Apache 软件基金会支持的开源分布式流批 - taosAdapter 能够正常运行。详细参考 [taosAdapter 使用手册](../../../reference/components/taosadapter) - Apache Flink v1.19.0 或以上版本已安装。安装 Apache Flink 请参考 [官方文档](https://flink.apache.org/) -## JRE 版本兼容性 - -- JRE: 支持 JRE 8 及以上版本。 - # 支持的平台 Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 @@ -29,6 +25,7 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 | Flink Connector 版本 | 主要变化 | TDengine 版本 | | ------------------| ------------------------------------ | ---------------- | | 2.0.0 | 1. 支持 SQL 查询 TDengine 数据库中的数据
2. 支持 CDC 订阅 TDengine 数据库中的数据
3. 支持 Table SQL 方式读取和写入 TDengine 数据库| 3.3.5.0 及以上版本 | +| 1.0.0 | 支持 Sink 功能,将来着其他数据源的数据写入到 TDengine| 3.3.2.0 及以上版本| ## 异常和错误码 @@ -53,8 +50,8 @@ Flink Connector 支持所有能运行 Flink 1.19 及以上版本的平台。 | 0x2305 |resultSet is closed |resultSet 结果集已经释放,请检查 resultSet 是否释放后再次使用。| | 0x230d |parameter index out of range |参数越界,请检查参数的合理范围。| | 0x230e |connection already closed |连接已经关闭,请检查 Connection 是否关闭后再次使用,或是连接是否正常。| -| 0x230f |unknown sql type in tdengine |请检查 TDengine 支持的 Data Type 类型。| -| 0x2315 |unknown taos type in tdengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。| +| 0x230f |unknown sql type in TDengine |请检查 TDengine 支持的 Data Type 类型。| +| 0x2315 |unknown taos type in TDengine |在 TDengine 数据类型与 JDBC 数据类型转换时,是否指定了正确的 TDengine 数据类型。| | 0x2319 |user is required |创建连接时缺少用户名信息。| | 0x231a |password is required |创建连接时缺少密码信息。| | 0x231d |can't create connection with server within |通过增加参数 httpConnectTimeout 增加连接耗时,或是请检查与 taosAdapter 之间的连接情况。| @@ -99,11 +96,14 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Flink RowD - TDengine 目前不支持事务,不能进行频繁的检查点操作和复杂的事务协调。 - 由于 TDengine 采用时间戳作为主键,重复数据下游算子可以进行过滤操作,避免重复计算。 - 采用 At-Least-Once(至少一次)确保达到较高的数据处理的性能和较低的数据延时,设置方式如下: - ```java - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); - ``` + +使用方式: + +```java +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.enableCheckpointing(5000); +env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); +``` 如果使用 Maven 管理项目,只需在 pom.xml 中加入以下依赖。 @@ -124,7 +124,8 @@ URL 规范格式为: 参数说明: - user:登录 TDengine 用户名,默认值 'root'。 - password:用户登录密码,默认值 'taosdata'。 -- batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。 +- database_name: 数据库名称。 +- timezone: 时区设置。 - httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。 - messageWaitTimeout: 消息超时时间,单位 ms, 默认值为 60000。 - useSSL: 连接中是否使用 SSL。 @@ -240,7 +241,7 @@ CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用 ```
-批量查询结果示例: +将订阅结果批量下发到算子的示例:
CDC Batch Source @@ -249,7 +250,7 @@ CDC 连接器会根据用户设置的并行度进行创建 consumer,因此用 ```
-查询结果为自定义数据类型示例: +订阅结果为自定义数据类型示例:
CDC Custom Type @@ -283,9 +284,9 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 | TDengineConfigParams.PROPERTY_KEY_RECONNECT_RETRY_COUNT| integer| 自动重连重试次数,默认值 3。 | 仅在 PROPERTY_KEY_ENABLE_AUTO_RECONNECT 为 true 时生效。| | TDengineConfigParams.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION| boolean| 关闭 SSL 证书验证 。true: 关闭,false: 不关闭。默认为 false。|| -#### 使用 Sink 连接器 +使用示例: -将接收的 RowData 类型数据写入 TDengine 示例: +将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Sink RowData @@ -294,38 +295,41 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 ```
+使用示例: -将批量接收的 RowData 类型数据写入 TDengine 示例: +订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
-Sink RowData +Cdc Sink ```java -{{#include docs/examples/flink/Main.java:BatchRowDataToSink}} +{{#include docs/examples/flink/Main.java:CdcRowDataToSink}} ```
### Table SQL -数据处理 ETL(Extract,Transform,Load):可以使用 Flink SQL With JDBC 从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据,在 Flink 中进行转换操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。 +使用 Table SQL 的方式从多个不同的数据源数据库(如 TDengine、MySQL、Oracle 等)中提取数据后, 再进行自定义的算子操作(如数据清洗、格式转换、关联不同表的数据等),然后将处理后的结果加载到目标数据源(如 TDengine、Mysql 等)中。 - -#### Source 连接器 +#### Table Source 连接器 参数配置说明: | 参数名称 | 类型 | 参数说明 | 备注 | | ----------------------- | :-----: | ------------ | ------ | -| connector | string | 连接器标识,设置 `tdengine-connector` 。|| -| td.jdbc.url| string | 连接的 url 。| | -| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `cdc`, `sink`。| | -| table.name| string| 原表或目标表名称。| | -| scan.query| string| 获取数据的 SQL 语句。|| -| sink.db.name|string| 目标数据库名称。|| -| sink.supertable.name|string |写入的超级表名称。|| -| sink.batch.size | integer | 写入的批大小。|| -| sink.table.name|string|写入的普通表或子表名称。|| - -#### Source 连接器使用示例 +| connector | string | 连接器标识,设置 `tdengine-connector` 。| +| td.jdbc.url| string | 连接的 url 。| +| td.jdbc.mode | strng | 连接器类型, 设置 `source`, `sink`。| +| table.name| string| 原表或目标表名称。| +| scan.query| string| 获取数据的 SQL 语句。| +| sink.db.name|string| 目标数据库名称。| +| sink.supertable.name|string |写入的超级表名称。| +| sink.batch.size | integer | 写入的批大小。| +| sink.table.name|string|写入的普通表或子表名称。| + + +使用示例: + +将 power 库的 meters 表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table Source @@ -334,29 +338,29 @@ Sink 的核心功能在于高效且精准地将经过 Flink 处理的、源自 ```
-#### CDC 连接器 +#### Table CDC 连接器 参数配置说明: -| 参数名称 | 类型 | 参数说明 | 备注 | -| ----------------------- | :-----: | ------------ |-------| -| connector | string | 连接器标识,设置 `tdengine-connector`。|| -| user| string | 用户名, 默认 root。| | -| password | string | 密码, 默认taosdata。| | -| bootstrap.servers| string | 服务器地址。| | +| 参数名称 | 类型 | 参数说明 | +| ----------------------- | :-----: | ------------ | +| connector | string | 连接器标识,设置 `tdengine-connector`。| +| user| string | 用户名, 默认 root。| +| password | string | 密码, 默认taosdata。| +| bootstrap.servers| string | 服务器地址。| | topic | string | 订阅主题。|| -| td.jdbc.mode | strng | 连接器类型, cdc, sink。| | -| group.id| string| 消费组 ID,同一消费组共享消费进度。 | | -| auto.offset.reset| string| 消费组订阅的初始位置。 | earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| -| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。| | -| sink.db.name|string| 目标数据库名称。|| -| sink.supertable.name|string |写入的超级表名称。|| -| sink.batch.size | integer | 写入的批大小。|| -| sink.table.name|string|写入的普通表或子表名称。|| - - - -#### CDC 连接器使用示例 +| td.jdbc.mode | strng | 连接器类型, cdc, sink。| +| group.id| string| 消费组 ID,同一消费组共享消费进度。 | +| auto.offset.reset| string| 消费组订阅的初始位置。
earliest: 从头开始订阅
latest: default; 仅从最新数据开始订阅。| +| poll.interval_ms| integer| 拉取数据间隔, 默认 500ms。| +| sink.db.name|string| 目标数据库名称。| +| sink.supertable.name|string |写入的超级表名称。| +| sink.batch.size | integer | 写入的批大小。| +| sink.table.name|string|写入的普通表或子表名称。| + +使用示例: + +订阅 power 库的 meters 超级表的子表数据,写入 power_sink 库的 sink_meters 超级表对应的子表中。
Table CDC