Skip to content

Commit

Permalink
DBZ-8114 Implementing VariableScaleDecimal and improving tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nrkljo committed Aug 5, 2024
1 parent c91a2ad commit d730b7d
Show file tree
Hide file tree
Showing 26 changed files with 3,617 additions and 343 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/cross-maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,6 @@ jobs:
-Dhttp.keepAlive=false
-Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Ddebezium.test.records.waittime=7
-Ddebezium.test.records.waittime.after.nulls=13
-Ddebezium.test.records.waittime=10
-Ddebezium.test.records.waittime.after.nulls=10
-DfailFlakyTests=false
4 changes: 2 additions & 2 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,6 @@ jobs:
-Dhttp.keepAlive=false
-Dmaven.wagon.http.pool=false
-Dmaven.wagon.httpconnectionManager.ttlSeconds=120
-Ddebezium.test.records.waittime=7
-Ddebezium.test.records.waittime.after.nulls=13
-Ddebezium.test.records.waittime=10
-Ddebezium.test.records.waittime.after.nulls=10
-DfailFlakyTests=false
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

[![License](http://img.shields.io/:license-apache%202.0-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.html)
[![Maven Central](https://maven-badges.herokuapp.com/maven-central/io.debezium/debezium-connector-informix/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22io.debezium%22)
[![Build Status](https://github.com/debezium/debezium-connector-informix/actions/workflows/maven.yml/badge.svg?branch=main)](https://github.com/debezium/debezium-connector-informix/actions)
[![Build Status](https://github.com/debezium/debezium-connector-informix/actions/workflows/Maven%20CI/badge.svg?branch=main)](https://github.com/debezium/debezium-connector-informix/actions)
[![User chat](https://img.shields.io/badge/chat-users-brightgreen.svg)](https://gitter.im/debezium/user)
[![Developer chat](https://img.shields.io/badge/chat-devs-brightgreen.svg)](https://gitter.im/debezium/dev)
[![Google Group](https://img.shields.io/:mailing%20list-debezium-brightgreen.svg)](https://groups.google.com/forum/#!forum/debezium)
Expand Down
49 changes: 7 additions & 42 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,47 +1,12 @@
## 1. Fix reload of schema after restart

## 2. Fix blocking snapshot

## 3. Successfully determine current maximum LSN

## 4. Testcases

Adapt more standard testcases from Debezium parent
## ~~1. Fix reload of schema after restart~~
## ~~2. Fix blocking snapshot~~
## ~~3. Successfully determine current maximum LSN~~
## ~~4. Testcases~~
~~Adapt more standard testcases from Debezium parent~~

## 5. Implements Metrics

- Reference: https://github.com/debezium/debezium/blob/main/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlChangeEventSourceMetricsFactory.java

## 6. Decimal handling mode

If we set "decimal.handling.mode=percision", which is the default option, it will cause the following exception:

```text
[2022-05-15 00:48:42,552] ERROR WorkerSourceTask{id=informix-connector-214414-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:316)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:240)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.connect.errors.DataException: BigDecimal has mismatching scale value for given Decimal schema
at org.apache.kafka.connect.data.Decimal.fromLogical(Decimal.java:68)
at org.apache.kafka.connect.json.JsonConverter$13.toJson(JsonConverter.java:206)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:606)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:693)
at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:693)
at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:581)
at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:335)
at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:62)
at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:290)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
... 11 more
```
## ~~6. Decimal handling mode~~
~~If we set "decimal.handling.mode=percision", which is the default option, it will cause the following exception: ...~~
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,12 @@
<source>${project.basedir}/src/test/docker/informix-cdc-docker/12/informix_init.sh</source>
<outputDirectory>scripts</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/12/testdb.sql</source>
<outputDirectory>informix/etc</outputDirectory>
<lineEnding>unix</lineEnding>
</file>
</files>
</inline>
Expand Down Expand Up @@ -550,6 +556,7 @@
<source>${project.basedir}/src/test/docker/informix-cdc-docker/14/informix_post_init.sh</source>
<outputDirectory>config</outputDirectory>
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</file>
<file>
<source>${project.basedir}/src/test/docker/informix-cdc-docker/14/testdb.sql</source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public class InformixConnection extends JdbcConnection {

private static final ConnectionFactory FACTORY = JdbcConnection.patternBasedFactory(
URL_PATTERN,
IfxDriver.class.getName(),
IfxDriver.class.getCanonicalName(),
InformixConnection.class.getClassLoader(),
JdbcConfiguration.PORT.withDefault(InformixConnectorConfig.PORT.defaultValueAsString()));

Expand All @@ -81,7 +81,7 @@ private String retrieveRealDatabaseName() {
return queryAndMap(GET_DATABASE_NAME, singleResultMapper(rs -> rs.getString(1), "Could not retrieve database name"));
}
catch (SQLException e) {
throw new RuntimeException("Couldn't obtain database name", e);
throw new DebeziumException("Couldn't obtain database name", e);
}
}

Expand Down Expand Up @@ -186,6 +186,8 @@ public String quotedColumnIdString(String columnName) {

public DataSource datasource() {
return new DataSource() {
private PrintWriter logWriter;

@Override
public Connection getConnection() throws SQLException {
return connection();
Expand All @@ -199,12 +201,12 @@ public Connection getConnection(String username, String password) throws SQLExce

@Override
public PrintWriter getLogWriter() {
throw new UnsupportedOperationException("getLogWriter");
return this.logWriter;
}

@Override
public void setLogWriter(PrintWriter out) {
throw new UnsupportedOperationException("setLogWriter");
this.logWriter = out;
}

@Override
Expand All @@ -214,12 +216,12 @@ public void setLoginTimeout(int seconds) {

@Override
public int getLoginTimeout() {
return 0;
return (int) config().getConnectionTimeout().toSeconds();
}

@Override
public java.util.logging.Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException("getParentLogger");
return java.util.logging.Logger.getLogger("io.debezium.connector.informix");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public enum SnapshotMode implements EnumeratedValue {
* Perform a snapshot of the schema but no data upon initial startup of a connector.
* @deprecated to be removed in Debezium 3.0, replaced by {{@link #NO_DATA}}
*/
@Deprecated
SCHEMA_ONLY("schema_only"),

/**
Expand Down Expand Up @@ -340,7 +341,7 @@ public String getValue() {
public static final Field CDC_TIMEOUT = Field.create("cdc.timeout")
.withDisplayName("CDC Engine timeout")
.withType(ConfigDef.Type.INT)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 0))
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR_ADVANCED, 1))
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Specifies a timeout to interrupt blocking to wait on an event, in seconds. "
Expand All @@ -359,8 +360,8 @@ public String getValue() {
PORT,
USER,
PASSWORD,
QUERY_TIMEOUT_MS,
DATABASE_NAME)
DATABASE_NAME,
QUERY_TIMEOUT_MS)
.connector(
SNAPSHOT_MODE,
SNAPSHOT_ISOLATION_MODE,
Expand All @@ -373,7 +374,6 @@ public String getValue() {
SCHEMA_EXCLUDE_LIST,
INCLUDE_SCHEMA_COMMENTS,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
SNAPSHOT_MAX_THREADS,
DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY)
.create();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@
*/
package io.debezium.connector.informix;

import java.io.BufferedReader;
import java.math.BigDecimal;
import java.sql.Clob;
import java.sql.SQLException;
import java.sql.Types;
import java.time.ZoneOffset;
import java.util.stream.Collectors;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.SchemaBuilder;

import com.informix.jdbc.IfxCblob;

import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.Column;
Expand All @@ -24,6 +34,8 @@
*/
public class InformixValueConverters extends JdbcValueConverters {

private static final int FLOATING_POINT_DECIMAL_SCALE = 255;

/**
* Create a new instance that always uses UTC for the default time zone when
* converting values without timezone information to values that require
Expand All @@ -41,20 +53,106 @@ public InformixValueConverters(DecimalMode decimalMode, TemporalPrecisionMode te

@Override
public SchemaBuilder schemaBuilder(Column column) {
logger.debug("Building schema for column {} of type {} named {} with constraints ({},{})",
column.name(),
column.jdbcType(),
column.typeName(),
column.length(),
column.scale());

switch (column.jdbcType()) {
case Types.NUMERIC:
case Types.DECIMAL:
return getNumericSchema(column);
default:
return super.schemaBuilder(column);
SchemaBuilder builder = super.schemaBuilder(column);
logger.debug("JdbcValueConverters returned '{}' for column '{}'", builder != null ? builder.getClass().getName() : null, column.name());
return builder;
}
}

private SchemaBuilder getNumericSchema(Column column) {
if (column.scale().isPresent()) {

if (column.scale().get() == FLOATING_POINT_DECIMAL_SCALE && decimalMode == DecimalMode.PRECISE) {
return VariableScaleDecimal.builder();
}

return super.schemaBuilder(column);
}

if (decimalMode == DecimalMode.PRECISE) {
return VariableScaleDecimal.builder();
}

if (column.length() == 0) {
// Defined as DECIMAL without specifying a length and scale, treat as DECIMAL(16)
return SpecialValueDecimal.builder(decimalMode, 16, -1);
}

return SpecialValueDecimal.builder(decimalMode, column.length(), -1);
}

@Override
public ValueConverter converter(Column column, Field fieldDefn) {
switch (column.jdbcType()) {
case Types.NUMERIC:
case Types.DECIMAL:
return getNumericConverter(column, fieldDefn);
default:
return super.converter(column, fieldDefn);
}
}

private ValueConverter getNumericConverter(Column column, Field fieldDefn) {
if (column.scale().isPresent()) {

if (column.scale().get() == FLOATING_POINT_DECIMAL_SCALE && decimalMode == DecimalMode.PRECISE) {
return data -> convertVariableScale(column, fieldDefn, data);
}

return data -> convertNumeric(column, fieldDefn, data);
}

return data -> convertVariableScale(column, fieldDefn, data);
}

private Object convertVariableScale(Column column, Field fieldDefn, Object data) {
data = convertNumeric(column, fieldDefn, data); // provides default value

if (data == null) {
return null;
}
if (decimalMode == DecimalMode.PRECISE) {
if (data instanceof SpecialValueDecimal) {
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (SpecialValueDecimal) data);
}
else if (data instanceof BigDecimal) {
return VariableScaleDecimal.fromLogical(fieldDefn.schema(), (BigDecimal) data);
}
}
else {
return data;
}
return handleUnknownData(column, fieldDefn, data);
}

@Override
protected Object convertString(Column column, Field fieldDefn, Object data) {
if (data instanceof Clob) {
return convertValue(column, fieldDefn, data, "", (receiver) -> {
try {
receiver.deliver(new BufferedReader(((IfxCblob) data).getCharacterStream()).lines().collect(Collectors.joining(System.lineSeparator())));
}
catch (SQLException e) {
throw new RuntimeException("Error processing data from " + column.jdbcType() + " and column " + column +
": class=" + data.getClass(), e);
}
});
}
return super.convertString(column, fieldDefn, data);
}

@Override
protected int getTimePrecision(Column column) {
return column.length() < 20 ? 0 : column.length() - 20;
Expand Down
1 change: 1 addition & 0 deletions src/test/docker/informix-cdc-docker/12/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM icr.io/informix/informix-developer-database:12.10.FC12W1DE

ADD --chown=informix:informix informix_init.sh /opt/ibm/scripts
ADD --chown=informix:informix testdb.sql /opt/ibm/informix/etc

RUN sed -Ei 's/^(USEOSTIME)\s+\S/\1 1/' /opt/ibm/informix/etc/onconfig.std

Expand Down
10 changes: 4 additions & 6 deletions src/test/docker/informix-cdc-docker/12/informix_init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@ if [ ${iter} -gt 120 ]; then
exit
fi

dbaccess sysadmin $INFORMIX_DATA_DIR/extend_root.sql >>$INIT_LOG 2>&1
dbaccess sysadmin $BASEDIR/sql/informix_extend_root.sql >>$INIT_LOG 2>&1

if [ $DB_SBSPACE ]; then
dbaccess sysadmin $INFORMIX_DATA_DIR/sbspace.sql >>$INIT_LOG 2>&1
fi
dbaccess sysadmin $BASEDIR/sql/informix_sbspace.sql >>$INIT_LOG 2>&1

dbaccess < $INFORMIXDIR/etc/syscdcv1.sql
dbaccess sysadmin $INFORMIXDIR/etc/syscdcv1.sql >>$INIT_LOG 2>&1

echo 'create database testdb with log;' | dbaccess - -
dbaccess sysadmin $INFORMIXDIR/etc/testdb.sql >>$INIT_LOG 2>&1
2 changes: 2 additions & 0 deletions src/test/docker/informix-cdc-docker/12/testdb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
set lock mode to wait;
create database testdb with log;
10 changes: 4 additions & 6 deletions src/test/docker/informix-cdc-docker/14/informix_post_init.sh
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#!/bin/bash

dbaccess < $INFORMIXDIR/etc/syscdcv1.sql
dbaccess sysadmin $BASEDIR/sql/informix_extend_root.sql >>$INIT_LOG 2>&1

dbaccess < $INFORMIXDIR/etc/testdb.sql
dbaccess sysadmin $BASEDIR/sql/informix_sbspace.sql >>$INIT_LOG 2>&1

# cat /dev/null > $INFORMIX_DATA_DIR/spaces/rootdbs.001 > $INFORMIX_DATA_DIR/spaces/rootdbs.002
# chmod 660 $INFORMIX_DATA_DIR/spaces/rootdbs.001 $INFORMIX_DATA_DIR/spaces/rootdbs.002
# onspaces -c -d rootdbs1 -p $INFORMIX_DATA_DIR/spaces/rootdbs.001 -o 0 -s 350000
# onspaces -c -d rootdbs2 -p $INFORMIX_DATA_DIR/spaces/rootdbs.002 -o 0 -s 350000
dbaccess sysadmin $INFORMIXDIR/etc/syscdcv1.sql >>$INIT_LOG 2>&1

dbaccess sysadmin $INFORMIXDIR/etc/testdb.sql >>$INIT_LOG 2>&1
Loading

0 comments on commit d730b7d

Please sign in to comment.