Skip to content

Commit f358372

Browse files
authored
Merge pull request #10 from taosdata/feat/TD-32275
Feat/td 32275
2 parents 8fc866d + 2aa7cac commit f358372

21 files changed

+898
-151
lines changed

README.md

+12-9
Original file line numberDiff line numberDiff line change
@@ -87,19 +87,22 @@ TDengine currently supports timestamp, number, character, and boolean types, and
8787
| GEOMETRY | byte[] |
8888

8989
## Instructions for use
90+
9091
### Flink Semantic Selection Instructions
9192

9293
The semantic reason for using At Least One (at least once) is:
9394
-TDengine currently does not support transactions and cannot perform frequent checkpoint operations and complex transaction coordination.
9495
-Due to TDengine's use of timestamps as primary keys, downstream operators of duplicate data can perform filtering operations to avoid duplicate calculations.
9596
-Using At Least One (at least once) to ensure high data processing performance and low data latency, the setting method is as follows:
9697

97-
```text
98+
```java
9899
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
99100
env.enableCheckpointing(5000);
100101
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
101102
```
102103

104+
### Usage mode
105+
103106
If using Maven to manage a project, simply add the following dependencies in pom.xml.
104107

105108
```xml
@@ -152,14 +155,14 @@ Users can split the SQL query into multiple subtasks based on time, entering: st
152155
```java
153156
SourceSplitSql splitSql = new SourceSplitSql();
154157
splitSql.setSql("select ts, `current`, voltage, phase, groupid, location, tbname from meters")
155-
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
156-
.setTimestampSplitInfo(new TimestampSplitInfo(
157-
"2024-12-19 16:12:48.000",
158-
"2024-12-19 19:12:48.000",
159-
"ts",
160-
Duration.ofHours(1),
161-
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
162-
ZoneId.of("Asia/Shanghai")));
158+
.setSplitType(SplitType.SPLIT_TYPE_TIMESTAMP)
159+
.setTimestampSplitInfo(new TimestampSplitInfo(
160+
"2024-12-19 16:12:48.000",
161+
"2024-12-19 19:12:48.000",
162+
"ts",
163+
Duration.ofHours(1),
164+
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"),
165+
ZoneId.of("Asia/Shanghai")));
163166
```
164167

165168
Splitting by Super Table TAG

deploy-pom.xml

+141-32
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.taosdata.flink</groupId>
88
<artifactId>flink-connector-tdengine</artifactId>
9-
<version>2.0.0</version>
9+
<version>2.0.1</version>
1010
<packaging>jar</packaging>
1111

1212
<name>flink-connector-tdengine</name>
@@ -42,12 +42,52 @@
4242
<product.version>3.0.0.0</product.version>
4343
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4444
<java.version>1.8</java.version>
45-
<taos.jdbcdriver.version>3.3.0</taos.jdbcdriver.version>
46-
<flink.version>1.18.0</flink.version>
45+
<flink.version>1.20.0</flink.version>
4746
<scala.binary.version>2.12</scala.binary.version>
4847
<maven-compiler-plugin.version>3.6.0</maven-compiler-plugin.version>
48+
<hamcrest.version>1.3</hamcrest.version>
49+
<powermock.version>2.0.9</powermock.version>
50+
<jackson.version>2.18.0</jackson.version>
51+
<taos.jdbcdriver.version>3.5.2</taos.jdbcdriver.version>
4952
</properties>
5053

54+
<dependencyManagement>
55+
<dependencies>
56+
<dependency>
57+
<groupId>com.fasterxml.jackson.core</groupId>
58+
<artifactId>jackson-annotations</artifactId>
59+
<version>${jackson.version}</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>com.fasterxml.jackson.core</groupId>
63+
<artifactId>jackson-core</artifactId>
64+
<version>${jackson.version}</version>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>com.fasterxml.jackson.core</groupId>
69+
<artifactId>jackson-databind</artifactId>
70+
<version>${jackson.version}</version>
71+
</dependency>
72+
<dependency>
73+
<groupId>com.fasterxml.jackson.datatype</groupId>
74+
<artifactId>jackson-datatype-jsr310</artifactId>
75+
<version>${jackson.version}</version>
76+
</dependency>
77+
78+
<dependency>
79+
<groupId>org.apache.logging.log4j</groupId>
80+
<artifactId>log4j-api</artifactId>
81+
<version>2.17.1</version>
82+
</dependency>
83+
<dependency>
84+
<groupId>org.apache.logging.log4j</groupId>
85+
<artifactId>log4j-core</artifactId>
86+
<version>2.17.1</version>
87+
</dependency>
88+
</dependencies>
89+
</dependencyManagement>
90+
5191
<dependencies>
5292
<dependency>
5393
<groupId>com.taosdata.jdbc</groupId>
@@ -59,12 +99,11 @@
5999
<groupId>org.apache.flink</groupId>
60100
<artifactId>flink-streaming-java</artifactId>
61101
<version>${flink.version}</version>
62-
<scope>compile</scope>
63102
</dependency>
64103

65104
<dependency>
66105
<groupId>org.apache.flink</groupId>
67-
<artifactId>flink-scala_${scala.binary.version}</artifactId>
106+
<artifactId>flink-connector-base</artifactId>
68107
<version>${flink.version}</version>
69108
</dependency>
70109

@@ -76,9 +115,65 @@
76115

77116
<dependency>
78117
<groupId>org.apache.flink</groupId>
79-
<artifactId>flink-tests</artifactId>
118+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
80119
<version>${flink.version}</version>
81-
<type>test-jar</type>
120+
</dependency>
121+
122+
<dependency>
123+
<groupId>org.apache.flink</groupId>
124+
<artifactId>flink-scala_${scala.binary.version}</artifactId>
125+
<version>${flink.version}</version>
126+
</dependency>
127+
128+
<dependency>
129+
<groupId>org.apache.flink</groupId>
130+
<artifactId>flink-core</artifactId>
131+
<version>${flink.version}</version>
132+
</dependency>
133+
<!-- Table ecosystem -->
134+
135+
<!-- Projects depending on this project won't depend on flink-table-*. -->
136+
<dependency>
137+
<groupId>org.apache.flink</groupId>
138+
<artifactId>flink-table-api-java-bridge</artifactId>
139+
<version>${flink.version}</version>
140+
</dependency>
141+
142+
<!-- Tests -->
143+
<dependency>
144+
<groupId>org.hamcrest</groupId>
145+
<artifactId>hamcrest-all</artifactId>
146+
<version>${hamcrest.version}</version>
147+
<scope>test</scope>
148+
</dependency>
149+
150+
<dependency>
151+
<groupId>org.powermock</groupId>
152+
<artifactId>powermock-module-junit4</artifactId>
153+
<version>${powermock.version}</version>
154+
<type>jar</type>
155+
<scope>test</scope>
156+
</dependency>
157+
158+
<dependency>
159+
<groupId>org.powermock</groupId>
160+
<artifactId>powermock-api-mockito2</artifactId>
161+
<version>${powermock.version}</version>
162+
<type>jar</type>
163+
<scope>test</scope>
164+
<exclusions>
165+
<exclusion>
166+
<groupId>org.mockito</groupId>
167+
<artifactId>mockito-core</artifactId>
168+
</exclusion>
169+
</exclusions>
170+
</dependency>
171+
172+
173+
<dependency>
174+
<groupId>org.yaml</groupId>
175+
<artifactId>snakeyaml</artifactId>
176+
<version>2.2</version>
82177
<scope>test</scope>
83178
</dependency>
84179

@@ -90,41 +185,55 @@
90185
</dependency>
91186

92187
<dependency>
93-
<groupId>ch.qos.logback</groupId>
94-
<artifactId>logback-classic</artifactId>
95-
<version>1.5.0</version>
188+
<groupId>org.apache.flink</groupId>
189+
<artifactId>flink-connector-test-utils</artifactId>
190+
<version>${flink.version}</version>
191+
<scope>test</scope>
96192
</dependency>
97193

98194
<dependency>
99-
<groupId>junit</groupId>
100-
<artifactId>junit</artifactId>
101-
<version>4.13.2</version>
195+
<groupId>org.apache.flink</groupId>
196+
<artifactId>flink-runtime</artifactId>
197+
<version>${flink.version}</version>
198+
<type>test-jar</type>
102199
<scope>test</scope>
103200
</dependency>
104201

202+
<dependency>
203+
<groupId>org.apache.flink</groupId>
204+
<artifactId>flink-connector-base</artifactId>
205+
<version>${flink.version}</version>
206+
<scope>test</scope>
207+
<type>test-jar</type>
208+
</dependency>
209+
210+
<dependency>
211+
<groupId>org.apache.flink</groupId>
212+
<artifactId>flink-metrics-jmx</artifactId>
213+
<version>${flink.version}</version>
214+
<scope>test</scope>
215+
</dependency>
216+
<dependency>
217+
<groupId>org.testcontainers</groupId>
218+
<artifactId>junit-jupiter</artifactId>
219+
<version>1.17.2</version>
220+
</dependency>
221+
<dependency>
222+
<groupId>org.junit.jupiter</groupId>
223+
<artifactId>junit-jupiter</artifactId>
224+
<version>5.10.1</version>
225+
<scope>test</scope>
226+
</dependency>
227+
<dependency>
228+
<groupId>org.junit.jupiter</groupId>
229+
<artifactId>junit-jupiter-api</artifactId>
230+
<version>5.10.1</version>
231+
</dependency>
232+
105233
</dependencies>
106234

107235
<build>
108236
<plugins>
109-
<!-- <plugin>-->
110-
<!-- <groupId>org.apache.maven.plugins</groupId>-->
111-
<!-- <artifactId>maven-assembly-plugin</artifactId>-->
112-
<!-- <version>3.0.0</version>-->
113-
<!-- <configuration>-->
114-
<!-- <descriptors>-->
115-
<!-- <descriptor>src/main/assembly/assembly-jar.xml</descriptor>-->
116-
<!-- </descriptors>-->
117-
<!-- </configuration>-->
118-
<!-- <executions>-->
119-
<!-- <execution>-->
120-
<!-- <id>make-assembly</id>-->
121-
<!-- <phase>package</phase>-->
122-
<!-- <goals>-->
123-
<!-- <goal>single</goal>-->
124-
<!-- </goals>-->
125-
<!-- </execution>-->
126-
<!-- </executions>-->
127-
<!-- </plugin>-->
128237
<plugin>
129238
<groupId>org.apache.maven.plugins</groupId>
130239
<artifactId>maven-assembly-plugin</artifactId>

src/main/java/com/taosdata/flink/cdc/serializable/RowDataCdcDeserializer.java

+46-15
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,29 @@
11
package com.taosdata.flink.cdc.serializable;
22

3+
import com.taosdata.jdbc.TSDBError;
4+
import com.taosdata.jdbc.TSDBErrorNumbers;
35
import com.taosdata.jdbc.tmq.Deserializer;
46
import com.taosdata.jdbc.tmq.DeserializerException;
57
import org.apache.flink.api.common.typeinfo.TypeInformation;
68
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
7-
import org.apache.flink.table.data.GenericRowData;
9+
810
import org.apache.flink.table.data.RowData;
911
import org.apache.flink.table.data.StringData;
1012
import org.apache.flink.table.data.TimestampData;
13+
import org.apache.flink.table.data.binary.BinaryRowData;
14+
import org.apache.flink.table.data.writer.BinaryRowWriter;
15+
import org.apache.flink.types.RowKind;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
1118

12-
import java.sql.ResultSet;
13-
import java.sql.ResultSetMetaData;
14-
import java.sql.SQLException;
15-
import java.sql.Timestamp;
19+
import java.sql.*;
1620

1721
public class RowDataCdcDeserializer implements Deserializer<RowData>, ResultTypeQueryable<RowData> {
22+
private final Logger LOG = LoggerFactory.getLogger(RowDataCdcDeserializer.class);
1823
/**
1924
* Serialize ResultSet data to RowData
20-
* @param data poll data
25+
*
26+
* @param data poll data
2127
* @param topic topic
2228
* @param dbName database name
2329
* @return RowData
@@ -27,20 +33,45 @@ public class RowDataCdcDeserializer implements Deserializer<RowData>, ResultType
2733
@Override
2834
public RowData deserialize(ResultSet data, String topic, String dbName) throws DeserializerException, SQLException {
2935
ResultSetMetaData metaData = data.getMetaData();
30-
GenericRowData row = new GenericRowData(metaData.getColumnCount());
36+
BinaryRowData binaryRowData = new BinaryRowData(metaData.getColumnCount());
37+
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRowData);
38+
binaryRowWriter.writeRowKind(RowKind.INSERT);
39+
3140
for (int i = 1; i <= metaData.getColumnCount(); i++) {
3241
Object value = data.getObject(i);
33-
if (value instanceof Timestamp) {
34-
// Convert Timestamp to the TimestampData type supported by RowData
35-
row.setField(i - 1 , TimestampData.fromTimestamp((Timestamp) value));
36-
} else if (value instanceof String) {
37-
// Convert String to the StringData type supported by RowData
38-
row.setField(i - 1 , StringData.fromString((String) value));
42+
if (value == null) {
43+
binaryRowWriter.setNullAt(i - 1);
3944
} else {
40-
row.setField(i - 1 , value);
45+
if (value instanceof Timestamp) {
46+
// Convert Timestamp to the TimestampData type supported by RowData
47+
binaryRowWriter.writeTimestamp(i - 1, TimestampData.fromTimestamp((Timestamp) value), 5);
48+
} else if (value instanceof String) {
49+
// Convert String to the StringData type supported by RowData
50+
binaryRowWriter.writeString(i - 1, StringData.fromString((String) value));
51+
} else if (value instanceof Byte) {
52+
binaryRowWriter.writeByte(i - 1, (Byte) value);
53+
} else if (value instanceof Integer) {
54+
binaryRowWriter.writeInt(i - 1, (Integer) value);
55+
} else if (value instanceof Boolean) {
56+
binaryRowWriter.writeBoolean(i - 1, (Boolean) value);
57+
} else if (value instanceof Float) {
58+
binaryRowWriter.writeFloat(i - 1, (Float) value);
59+
} else if (value instanceof Double) {
60+
binaryRowWriter.writeDouble(i - 1, (Double) value);
61+
} else if (value instanceof Long) {
62+
binaryRowWriter.writeLong(i - 1, (Long) value);
63+
} else if (value instanceof Short) {
64+
binaryRowWriter.writeShort(i - 1, (Short) value);
65+
} else if (value instanceof byte[]) {
66+
binaryRowWriter.writeBinary(i - 1, (byte[]) value);
67+
} else {
68+
LOG.error("Unknown data type:" + value.getClass().getName());
69+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN_SQL_TYPE_IN_TDENGINE);
70+
71+
}
4172
}
4273
}
43-
return row;
74+
return binaryRowData;
4475
}
4576

4677
@Override

0 commit comments

Comments
 (0)