Skip to content

Commit

Permalink
[yugabyte/yugabyte-db#15579] Adding changes for consistent streaming …
Browse files Browse the repository at this point in the history
…of records (#155)

Co-authored-by: Vaibhav Kushwaha <vkushwaha@yugabyte.com>
Co-authored-by: Rajat Venkatesh <vrajat@users.noreply.github.com>
Co-authored-by: suranjan kumar <suranjan.kumar@gmail.com>
  • Loading branch information
4 people authored Jun 23, 2023
1 parent 8addddb commit 1998e09
Show file tree
Hide file tree
Showing 23 changed files with 2,850 additions and 76 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ generated-sources
.classpath
.project
.settings
.factorypath
.idea/
dependency-reduced-pom.xml
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ COPY target/debezium-connector-yugabytedb-*.jar $KAFKA_CONNECT_YB_DIR/
ENV KAFKA_OPTS="-Djdk.tls.client.protocols=TLSv1.2"

# Add the required jar files to be packaged with the base connector
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so kafka-connect-jdbc-10.6.0.jar https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar
# RUN cd $KAFKA_CONNECT_YB_DIR && curl -so kafka-connect-jdbc-10.6.0.jar https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/10.6.0/kafka-connect-jdbc-10.6.0.jar
COPY kafka-connect-jdbc-10.6.5-SNAPSHOT.jar $KAFKA_CONNECT_YB_DIR/
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so jdbc-yugabytedb-42.3.5-yb-1.jar https://repo1.maven.org/maven2/com/yugabyte/jdbc-yugabytedb/42.3.5-yb-1/jdbc-yugabytedb-42.3.5-yb-1.jar
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar
RUN cd $KAFKA_CONNECT_YB_DIR && curl -so postgresql-42.5.1.jar https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<version.release.plugin>3.0.0-M5</version.release.plugin>
<version.site.plugin>3.7.1</version.site.plugin>
<version.resources.plugin>3.1.0</version.resources.plugin>
<version.docker.maven.plugin>0.31.0</version.docker.maven.plugin>
<version.docker.maven.plugin>0.40.2</version.docker.maven.plugin>
<version.protoc.maven.plugin>3.11.4</version.protoc.maven.plugin>
<version.fabric8.plugin>0.40.2</version.fabric8.plugin>
<version.jackson>2.14.1</version.jackson>
Expand All @@ -47,7 +47,7 @@
<version.kafka>3.3.1</version.kafka>
<version.org.slf4j>1.7.36</version.org.slf4j>
<version.logback>1.4.0</version.logback>
<version.ybclient>0.8.56-20230615.071435-1</version.ybclient>
<version.ybclient>0.8.56-20230623.112806-3</version.ybclient>
<version.gson>2.8.9</version.gson>

<!--
Expand Down Expand Up @@ -562,7 +562,7 @@
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>${version.fabric8.plugin}</version>
<version>${version.docker.maven.plugin}</version>
<extensions>true</extensions>
<configuration>
<images>
Expand Down
46 changes: 41 additions & 5 deletions src/main/java/io/debezium/connector/yugabytedb/SourceInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.debezium.connector.common.BaseSourceInfo;
import io.debezium.connector.yugabytedb.connection.OpId;
import io.debezium.relational.TableId;
import io.debezium.time.Conversions;

/**
* Information about the source of information for a particular record.
Expand All @@ -31,6 +32,14 @@ public final class SourceInfo extends BaseSourceInfo {
public static final String TXID_KEY = "txId";
public static final String LSN_KEY = "lsn";

public static final String COMMIT_TIME = "commit_time";

public static final String RECORD_TIME = "record_time";

public static final String TABLE_ID = "table_id";
public static final String TABLET_ID = "tablet_id";
public static final String PARTITION_ID_KEY = "partition_id";

private static final ObjectMapper MAPPER = new ObjectMapper();

private final String dbName;
Expand All @@ -41,6 +50,10 @@ public final class SourceInfo extends BaseSourceInfo {
private Instant timestamp;
private String schemaName;
private String tableName;
private String tableUUID;
private String tabletId;
private Long commitTime;
private Long recordTime;

protected SourceInfo(YugabyteDBConnectorConfig connectorConfig) {
super(connectorConfig);
Expand All @@ -57,21 +70,28 @@ protected SourceInfo(YugabyteDBConnectorConfig connectorConfig, OpId lastCommitL
/**
* Updates the source with information about a particular received or read event.
*
* @param tabletId Tablet ID of the partition
* @param lsn the position in the server WAL for a particular event; may be null indicating that this information is not
* available
* @param commitTime the commit time of the transaction that generated the event;
* may be null indicating that this information is not available
* @param txId the ID of the transaction that generated the transaction; may be null if this information is not available
* @param tableId the table that should be included in the source info; may be null
* @param recordTime Hybrid Time Stamp Time of the statement within the transaction.
* @return this instance
*/
protected SourceInfo update(YBPartition partition, OpId lsn, Instant commitTime, String txId,
TableId tableId) {
protected SourceInfo update(YBPartition partition, OpId lsn, long commitTime, String txId,
TableId tableId, Long recordTime) {
this.lsn = lsn;
if (commitTime != null) {
this.timestamp = commitTime;
}
this.commitTime = commitTime;
this.txId = txId;
this.recordTime = recordTime;
this.tableUUID = partition.getTableId();
this.tabletId = partition.getTabletId();

// The commit time of the record is technically the timestamp of the record.
this.timestamp = Conversions.toInstantFromMicros(commitTime);

if (tableId != null && tableId.schema() != null) {
this.schemaName = tableId.schema();
}
Expand Down Expand Up @@ -145,6 +165,22 @@ protected String txId() {
return txId;
}

protected Long commitTime() {
return this.commitTime;
}

protected Long recordTime() {
return this.recordTime;
}

protected String tabletId() {
return this.tabletId;
}

protected String tableUUID() {
return this.tableUUID;
}

@Override
public SnapshotRecord snapshot() {
return super.snapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import io.debezium.util.Clock;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class YugabyteDBChangeEventSourceFactory implements ChangeEventSourceFactory<YBPartition, YugabyteDBOffsetContext> {
private static final Logger LOGGER = LoggerFactory.getLogger(YugabyteDBChangeEventSourceFactory.class);

private final YugabyteDBConnectorConfig configuration;
private final YugabyteDBConnection jdbcConnection;
Expand Down Expand Up @@ -82,17 +85,34 @@ public SnapshotChangeEventSource<YBPartition, YugabyteDBOffsetContext> getSnapsh

@Override
public StreamingChangeEventSource<YBPartition, YugabyteDBOffsetContext> getStreamingChangeEventSource() {
return new YugabyteDBStreamingChangeEventSource(
configuration,
snapshotter,
jdbcConnection,
dispatcher,
errorHandler,
clock,
schema,
taskContext,
replicationConnection,
queue);
LOGGER.info("Consistency mode is {}", configuration.consistencyMode().getValue());
if (configuration.consistencyMode() == YugabyteDBConnectorConfig.ConsistencyMode.DEFAULT) {
LOGGER.info("Instantiating Vanilla Streaming Source");
return new YugabyteDBStreamingChangeEventSource(
configuration,
snapshotter,
jdbcConnection,
dispatcher,
errorHandler,
clock,
schema,
taskContext,
replicationConnection,
queue);
} else {
LOGGER.info("Instantiating CONSISTENT Streaming Source");
return new YugabyteDBConsistentStreamingSource(
configuration,
snapshotter,
jdbcConnection,
dispatcher,
errorHandler,
clock,
schema,
taskContext,
replicationConnection,
queue);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
throw new DebeziumException(e);
}

if (this.yugabyteDBConnectorConfig.consistencyMode() != YugabyteDBConnectorConfig.ConsistencyMode.DEFAULT
&& !enableExplicitCheckpointing) {
final String errorMessage = "Explicit checkpointing not enabled in consistent streaming mode, "
+ "create a stream with explicit checkpointing and try again";
throw new DebeziumException(errorMessage);
}

LOGGER.debug("The streamid being used is " + streamIdValue);

int numGroups = Math.min(this.tabletIds.size(), maxTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,48 @@ public static SchemaRefreshMode parse(String value) {
}
}

public enum ConsistencyMode implements EnumeratedValue {

/**
* Default. No consistency filters applied
*/
DEFAULT("default"),

/**
* Key-level consistency
*/
KEY("key"),

/*
* Global Consistency
*/
GLOBAL("global");

private final String value;

ConsistencyMode(String value) {
this.value = value;
}

@Override
public String getValue() {
return value;
}

public static ConsistencyMode parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (ConsistencyMode consistencyMode : ConsistencyMode.values()) {
if (consistencyMode.getValue().equalsIgnoreCase(value)) {
return consistencyMode;
}
}
return null;
}
}

protected static final String DATABASE_CONFIG_PREFIX = "database.";
protected static final String TASK_CONFIG_PREFIX = "task.";

Expand Down Expand Up @@ -922,6 +964,18 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"'skip' to skip / ignore TRUNCATE events (default), " +
"'include' to handle and include TRUNCATE events");

public static final Field CONSISTENCY_MODE = Field.create("consistency.mode")
.withDisplayName("Transaction Consistency mode")
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 23))
.withEnum(ConsistencyMode.class, ConsistencyMode.DEFAULT)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withValidation(YugabyteDBConnectorConfig::validateTruncateHandlingMode)
.withDescription("Specify how transactions should be managed when streaming events: " +
"'default' no consistency grouping is done. Events are generated when received, " +
"'key' Consistency grouping is at primary key level, " +
"'global' Consistency grouping is at global level across all transactions");

/**
* A comma-separated list of regular expressions that match the prefix of logical decoding messages to be excluded
* from monitoring. Must not be used with {@link #LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST}
Expand Down Expand Up @@ -1043,6 +1097,7 @@ public static AutoCreateMode parse(String value, String defaultValue) {
"If starts with 'hex:' prefix it is expected that the rest of the string repesents hexadecimally encoded octets.");

private final TruncateHandlingMode truncateHandlingMode;
private final ConsistencyMode consistencyMode;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
private final SnapshotMode snapshotMode;
Expand All @@ -1062,6 +1117,7 @@ public YugabyteDBConnectorConfig(Configuration config) {
ColumnFilterMode.SCHEMA);

this.truncateHandlingMode = TruncateHandlingMode.parse(config.getString(YugabyteDBConnectorConfig.TRUNCATE_HANDLING_MODE));
this.consistencyMode = ConsistencyMode.parse(config.getString(YugabyteDBConnectorConfig.CONSISTENCY_MODE));
this.logicalDecodingMessageFilter = new LogicalDecodingMessageFilter(config.getString(LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST),
config.getString(LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST));
String hstoreHandlingModeStr = config.getString(YugabyteDBConnectorConfig.HSTORE_HANDLING_MODE);
Expand Down Expand Up @@ -1173,6 +1229,10 @@ public TruncateHandlingMode truncateHandlingMode() {
return truncateHandlingMode;
}

public ConsistencyMode consistencyMode() {
return consistencyMode;
}

protected HStoreHandlingMode hStoreHandlingMode() {
return hStoreHandlingMode;
}
Expand Down
Loading

0 comments on commit 1998e09

Please sign in to comment.