Skip to content

Commit 7bf377f

Browse files
authored
Merge pull request #14 from taosdata/fix/TS-6006
fix: failed to adapt varchar type to different data sources
2 parents 6af0485 + b529b0d commit 7bf377f

File tree

5 files changed

+45
-28
lines changed

5 files changed

+45
-28
lines changed

src/main/java/com/taosdata/flink/cdc/serializable/RowDataCdcDeserializer.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
import org.slf4j.Logger;
1717
import org.slf4j.LoggerFactory;
1818

19+
import java.nio.charset.StandardCharsets;
1920
import java.sql.*;
2021

22+
import static java.nio.charset.StandardCharsets.UTF_8;
23+
2124
public class RowDataCdcDeserializer implements Deserializer<RowData>, ResultTypeQueryable<RowData> {
2225
private final Logger LOG = LoggerFactory.getLogger(RowDataCdcDeserializer.class);
2326
/**
@@ -63,7 +66,13 @@ public RowData deserialize(ResultSet data, String topic, String dbName) throws D
6366
} else if (value instanceof Short) {
6467
binaryRowWriter.writeShort(i - 1, (Short) value);
6568
} else if (value instanceof byte[]) {
66-
binaryRowWriter.writeBinary(i - 1, (byte[]) value);
69+
if (metaData.getColumnTypeName(i - 1).equals("VARCHAR") || metaData.getColumnTypeName(i - 1).equals("BINARY")) {
70+
String strVal = new String((byte[]) value, StandardCharsets.UTF_8);
71+
binaryRowWriter.writeString(i - 1, StringData.fromString(strVal));
72+
}else {
73+
binaryRowWriter.writeBinary(i - 1, (byte[]) value);
74+
}
75+
6776
} else {
6877
LOG.error("Unknown data type:" + value.getClass().getName());
6978
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN_SQL_TYPE_IN_TDENGINE);

src/main/java/com/taosdata/flink/sink/serializer/RowDataSerializerBase.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.slf4j.LoggerFactory;
1010

1111
import java.io.IOException;
12-
import java.nio.charset.StandardCharsets;
1312
import java.sql.SQLException;
1413
import java.util.ArrayList;
1514
import java.util.List;
@@ -27,8 +26,11 @@ public TDengineSinkRecord getSinkRecord(RowData record, List<SinkMetaInfo> sinkM
2726
switch (sinkMetaInfos.get(i).getFieldType()) {
2827
case DATA_TYPE_BINARY:
2928
case DATA_TYPE_VARCHAR:
30-
Object binaryVal = new String(record.getBinary(i), StandardCharsets.UTF_8);
31-
columnParams.add(binaryVal);
29+
Object strVal = null;
30+
if (record.getString(i) != null) {
31+
strVal = record.getString(i).toString();
32+
}
33+
columnParams.add(strVal);
3234
break;
3335
case DATA_TYPE_INT:
3436
Object intVal = record.getInt(i);
@@ -56,11 +58,11 @@ public TDengineSinkRecord getSinkRecord(RowData record, List<SinkMetaInfo> sinkM
5658
break;
5759
case DATA_TYPE_JSON:
5860
case DATA_TYPE_NCHAR:
59-
Object strVal = null;
61+
Object strNcharVal = null;
6062
if (record.getString(i) != null) {
61-
strVal = record.getString(i).toString();
63+
strNcharVal = record.getString(i).toString();
6264
}
63-
columnParams.add(strVal);
65+
columnParams.add(strNcharVal);
6466
break;
6567
case DATA_TYPE_VARBINARY:
6668
case DATA_TYPE_GEOMETRY:

src/main/java/com/taosdata/flink/source/serializable/TDengineRowDataDeserialization.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

15+
import java.nio.charset.StandardCharsets;
1516
import java.sql.ResultSet;
1617
import java.sql.ResultSetMetaData;
1718
import java.sql.SQLException;
@@ -31,7 +32,7 @@ public RowData convert(SplitResultRecord splitResultRecord) throws SQLException
3132
BinaryRowData binaryRowData = new BinaryRowData(rowData.size());
3233
BinaryRowWriter binaryRowWriter = new BinaryRowWriter(binaryRowData);
3334
binaryRowWriter.writeRowKind(RowKind.INSERT);
34-
35+
ResultSetMetaData metaData = splitResultRecord.getMetaData();
3536
for (int i = 0; i < rowData.size(); i++) {
3637
Object value = rowData.get(i);
3738
if (value == null) {
@@ -58,7 +59,12 @@ public RowData convert(SplitResultRecord splitResultRecord) throws SQLException
5859
} else if (value instanceof Short) {
5960
binaryRowWriter.writeShort(i, (Short) value);
6061
} else if (value instanceof byte[]) {
61-
binaryRowWriter.writeBinary(i, (byte[]) value);
62+
if (metaData.getColumnTypeName(i + 1).equals("VARCHAR") || metaData.getColumnTypeName(i + 1).equals("BINARY")) {
63+
String strVal = new String((byte[]) value, StandardCharsets.UTF_8);
64+
binaryRowWriter.writeString(i, StringData.fromString(strVal));
65+
}else {
66+
binaryRowWriter.writeBinary(i, (byte[]) value);
67+
}
6268
} else {
6369
LOG.error("Unknown data type:" + value.getClass().getName());
6470
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN_SQL_TYPE_IN_TDENGINE);

src/test/java/com/taosdata/flink/table/TDFlinkTableAllTypesTest.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ void testTableToSink() throws Exception {
305305
"long_col BIGINT, " +
306306
"double_col DOUBLE, " +
307307
"bool_col BOOLEAN, " +
308-
"binary_col BINARY, " +
308+
"binary_col VARCHAR(100), " +
309309
"nchar_col String, " +
310310
"varbinary_col BINARY, " +
311311
"geometry_col BINARY, " +
@@ -321,7 +321,7 @@ void testTableToSink() throws Exception {
321321
"geometry_tag BINARY, " +
322322
"tinyint_tag TINYINT, " +
323323
"smallint_tag SMALLINT, " +
324-
"tbname VARBINARY" +
324+
"tbname VARCHAR(255)" +
325325
") WITH (" +
326326
" 'connector' = 'tdengine-connector'," +
327327
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/table_all_type_stmt0?user=root&password=taosdata'," +
@@ -337,7 +337,7 @@ void testTableToSink() throws Exception {
337337
"long_col BIGINT, " +
338338
"double_col DOUBLE, " +
339339
"bool_col BOOLEAN, " +
340-
"binary_col BINARY, " +
340+
"binary_col VARCHAR(100), " +
341341
"nchar_col String, " +
342342
"varbinary_col BINARY, " +
343343
"geometry_col BINARY, " +
@@ -353,7 +353,7 @@ void testTableToSink() throws Exception {
353353
"geometry_tag BINARY, " +
354354
"tinyint_tag TINYINT, " +
355355
"smallint_tag SMALLINT, " +
356-
"tbname VARBINARY" +
356+
"tbname VARCHAR(255)" +
357357
") WITH (" +
358358
" 'connector' = 'tdengine-connector'," +
359359
" 'td.jdbc.mode' = 'sink'," +
@@ -386,7 +386,7 @@ void testCdcTableToSink() throws Exception {
386386
"long_col BIGINT, " +
387387
"double_col DOUBLE, " +
388388
"bool_col BOOLEAN, " +
389-
"binary_col BINARY, " +
389+
"binary_col VARCHAR(100), " +
390390
"nchar_col String, " +
391391
"varbinary_col BINARY, " +
392392
"geometry_col BINARY, " +
@@ -402,7 +402,7 @@ void testCdcTableToSink() throws Exception {
402402
"geometry_tag BINARY, " +
403403
"tinyint_tag TINYINT, " +
404404
"smallint_tag SMALLINT, " +
405-
"tbname VARBINARY" +
405+
"tbname VARCHAR(255)" +
406406
") WITH (" +
407407
" 'connector' = 'tdengine-connector'," +
408408
" 'bootstrap.servers' = 'localhost:6041'," +
@@ -420,7 +420,7 @@ void testCdcTableToSink() throws Exception {
420420
"long_col BIGINT, " +
421421
"double_col DOUBLE, " +
422422
"bool_col BOOLEAN, " +
423-
"binary_col BINARY, " +
423+
"binary_col VARCHAR(100), " +
424424
"nchar_col String, " +
425425
"varbinary_col BINARY, " +
426426
"geometry_col BINARY, " +
@@ -436,7 +436,7 @@ void testCdcTableToSink() throws Exception {
436436
"geometry_tag BINARY, " +
437437
"tinyint_tag TINYINT, " +
438438
"smallint_tag SMALLINT, " +
439-
"tbname VARBINARY" +
439+
"tbname VARCHAR(255)" +
440440
") WITH (" +
441441
" 'connector' = 'tdengine-connector'," +
442442
" 'td.jdbc.mode' = 'cdc'," +

src/test/java/com/taosdata/flink/table/TDFlinkTableTest.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void testTable() throws Exception {
179179
" `current` FLOAT," +
180180
" voltage INT," +
181181
" phase FLOAT," +
182-
" tbname VARBINARY" +
182+
" tbname VARCHAR(255)" +
183183
") WITH (" +
184184
" 'connector' = 'tdengine-connector'," +
185185
" 'td.jdbc.url' = 'jdbc:TAOS-WS://localhost:6041/power?user=root&password=taosdata'," +
@@ -235,9 +235,9 @@ void testTableCdc() throws Exception {
235235
" `current` FLOAT," +
236236
" voltage INT," +
237237
" phase FLOAT," +
238-
" location VARBINARY," +
238+
" location VARCHAR(255)," +
239239
" groupid INT," +
240-
" tbname VARBINARY" +
240+
" tbname VARCHAR(255)" +
241241
") WITH (" +
242242
" 'connector' = 'tdengine-connector'," +
243243
" 'bootstrap.servers' = 'localhost:6041'," +
@@ -300,9 +300,9 @@ void testTableToSink() throws Exception {
300300
" `current` FLOAT," +
301301
" voltage INT," +
302302
" phase FLOAT," +
303-
" location VARBINARY," +
303+
" location VARCHAR(255)," +
304304
" groupid INT," +
305-
" tbname VARBINARY," +
305+
" tbname VARCHAR(255)," +
306306
" PRIMARY KEY (ts) NOT ENFORCED" +
307307
") WITH (" +
308308
" 'connector' = 'tdengine-connector'," +
@@ -318,9 +318,9 @@ void testTableToSink() throws Exception {
318318
" `current` FLOAT," +
319319
" voltage INT," +
320320
" phase FLOAT," +
321-
" location VARBINARY," +
321+
" location VARCHAR(255)," +
322322
" groupid INT," +
323-
" tbname VARBINARY" +
323+
" tbname VARCHAR(255)" +
324324
") WITH (" +
325325
" 'connector' = 'tdengine-connector'," +
326326
" 'td.jdbc.mode' = 'sink'," +
@@ -353,9 +353,9 @@ void testCdcTableToSink() throws Exception {
353353
" `current` FLOAT," +
354354
" voltage INT," +
355355
" phase FLOAT," +
356-
" location VARBINARY," +
356+
" location VARCHAR(255)," +
357357
" groupid INT," +
358-
" tbname VARBINARY" +
358+
" tbname VARCHAR(255)" +
359359
") WITH (" +
360360
" 'connector' = 'tdengine-connector'," +
361361
" 'bootstrap.servers' = 'localhost:6041'," +
@@ -371,9 +371,9 @@ void testCdcTableToSink() throws Exception {
371371
" `current` FLOAT," +
372372
" voltage INT," +
373373
" phase FLOAT," +
374-
" location VARBINARY," +
374+
" location VARCHAR(255)," +
375375
" groupid INT," +
376-
" tbname VARBINARY" +
376+
" tbname VARCHAR(255)" +
377377
") WITH (" +
378378
" 'connector' = 'tdengine-connector'," +
379379
" 'td.jdbc.mode' = 'cdc'," +

0 commit comments

Comments
 (0)