Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] Adjust data conversion logic for compatibility with clickhouse-jdbc 0.6 #136 #145

Merged
merged 1 commit into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,14 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;

import com.clickhouse.data.value.UnsignedByte;
import com.clickhouse.data.value.UnsignedInteger;
import com.clickhouse.data.value.UnsignedLong;
import com.clickhouse.data.value.UnsignedShort;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.sql.Array;
import java.sql.Date;
import java.sql.SQLException;
Expand All @@ -39,8 +45,10 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.getFlinkTimeZone;
import static org.apache.flink.connector.clickhouse.util.ClickHouseUtil.toEpochDayOneTimestamp;
Expand Down Expand Up @@ -129,35 +137,51 @@ public static Object toInternal(Object value, LogicalType type) throws SQLExcept
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case INTEGER:
case BIGINT:
case TINYINT:
case BINARY:
case VARBINARY:
return value;
case TINYINT:
return ((Integer) value).byteValue();
case SMALLINT:
return value instanceof Integer ? ((Integer) value).shortValue() : value;
return value instanceof UnsignedByte ? ((UnsignedByte) value).shortValue() : value;
case INTEGER:
return value instanceof UnsignedShort ? ((UnsignedShort) value).intValue() : value;
case BIGINT:
return value instanceof UnsignedInteger
? ((UnsignedInteger) value).longValue()
: value;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return value instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) value, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) value, precision, scale);
BigDecimal decimalValue =
value instanceof BigDecimal
? (BigDecimal) value
: new BigDecimal(
value instanceof UnsignedLong
? ((UnsignedLong) value).bigIntegerValue()
: (BigInteger) value);
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
case DATE:
return (int) (((Date) value).toLocalDate().toEpochDay());
return (int) (((LocalDate) value).toEpochDay());
case TIME_WITHOUT_TIME_ZONE:
return (int) (((Time) value).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return TimestampData.fromLocalDateTime((LocalDateTime) value);
return TimestampData.fromLocalDateTime(
value instanceof OffsetDateTime
? ((OffsetDateTime) value).toLocalDateTime()
: (LocalDateTime) value);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return TimestampData.fromInstant(
((LocalDateTime) value).atZone(getFlinkTimeZone().toZoneId()).toInstant());
case CHAR:
case VARCHAR:
return StringData.fromString((String) value);
if (value instanceof UUID) {
return StringData.fromString(value.toString());
} else if (value instanceof InetAddress) {
return StringData.fromString(((InetAddress) value).getHostAddress());
} else {
return StringData.fromString((String) value);
}
case ARRAY:
LogicalType elementType =
type.getChildren().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.util.Preconditions;

import com.clickhouse.data.value.UnsignedByte;
import com.clickhouse.data.value.UnsignedInteger;
import com.clickhouse.data.value.UnsignedLong;
import com.clickhouse.data.value.UnsignedShort;
import com.clickhouse.jdbc.ClickHousePreparedStatement;
import com.clickhouse.jdbc.ClickHouseResultSet;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetAddress;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -45,6 +50,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.util.UUID;

import static org.apache.flink.connector.clickhouse.internal.converter.ClickHouseConverterUtils.BOOL_TRUE;
Expand Down Expand Up @@ -109,30 +115,41 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case INTEGER:
case BIGINT:
case TINYINT:
case BINARY:
case VARBINARY:
return val -> val;
case TINYINT:
return val -> ((Integer) val).byteValue();
case SMALLINT:
return val -> val instanceof Integer ? ((Integer) val).shortValue() : val;
return val -> val instanceof UnsignedByte ? ((UnsignedByte) val).shortValue() : val;
case INTEGER:
return val -> val instanceof UnsignedShort ? ((UnsignedShort) val).intValue() : val;
case BIGINT:
return val ->
val instanceof UnsignedInteger ? ((UnsignedInteger) val).longValue() : val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val ->
val instanceof BigInteger
? DecimalData.fromBigDecimal(
new BigDecimal((BigInteger) val, 0), precision, scale)
: DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
return val -> {
BigDecimal decimalValue =
val instanceof BigDecimal
? (BigDecimal) val
: new BigDecimal(
val instanceof UnsignedLong
? ((UnsignedLong) val).bigIntegerValue()
: (BigInteger) val);
return DecimalData.fromBigDecimal(decimalValue, precision, scale);
};
case DATE:
return val -> (int) ((Date) val).toLocalDate().toEpochDay();
return val -> (int) ((LocalDate) val).toEpochDay();
case TIME_WITHOUT_TIME_ZONE:
return val -> (int) (((Time) val).toLocalTime().toNanoOfDay() / 1_000_000L);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
return val -> TimestampData.fromLocalDateTime((LocalDateTime) val);
return val ->
TimestampData.fromLocalDateTime(
val instanceof OffsetDateTime
? ((OffsetDateTime) val).toLocalDateTime()
: (LocalDateTime) val);
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val ->
TimestampData.fromInstant(
Expand All @@ -141,10 +158,15 @@ private DeserializationConverter createToInternalConverter(LogicalType type) {
.toInstant());
case CHAR:
case VARCHAR:
return val ->
val instanceof UUID
? StringData.fromString(val.toString())
: StringData.fromString((String) val);
return val -> {
if (val instanceof UUID) {
return StringData.fromString(val.toString());
} else if (val instanceof InetAddress) {
return StringData.fromString(((InetAddress) val).getHostAddress());
} else {
return StringData.fromString((String) val);
}
};
case ARRAY:
case MAP:
return val -> ClickHouseConverterUtils.toInternal(val, type);
Expand Down Expand Up @@ -242,6 +264,7 @@ private SerializationConverter createToExternalConverter(LogicalType type) {

@FunctionalInterface
interface SerializationConverter extends Serializable {

/**
* Convert an internal field to java object and fill into the {@link
* ClickHousePreparedStatement}.
Expand All @@ -252,6 +275,7 @@ void serialize(RowData rowData, int index, ClickHouseStatementWrapper statement)

@FunctionalInterface
interface DeserializationConverter extends Serializable {

/**
* Convert an object of {@link ClickHouseResultSet} to the internal data structure object.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
switch (clickHouseColumnInfo.getDataType()) {
case Int8:
return DataTypes.TINYINT();
case Int16:
case Bool:
return DataTypes.BOOLEAN();
case Int16:
case UInt8:
return DataTypes.SMALLINT();
case Int32:
Expand Down Expand Up @@ -92,6 +92,7 @@ public static DataType toFlinkType(ClickHouseColumn clickHouseColumnInfo) {
case UUID:
return DataTypes.VARCHAR(clickHouseColumnInfo.getPrecision());
case Date:
case Date32:
return DataTypes.DATE();
case DateTime:
case DateTime32:
Expand Down
Loading