diff --git a/README.md b/README.md index 9a7dae8057e..1be8961afe1 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
Option | +Required | +Default | +Type | +Description | +
---|---|---|---|---|
connector | +required | +(none) | +String | +Specify what connector to use, here should be ‘vitess-cdc’ . |
+
hostname | +required | +(none) | +String | +IP address or hostname of the Vitess database server (VTGate). | +
keyspace | +required | +(none) | +String | +The name of the keyspace from which to stream the changes. | +
username | +optional | +(none) | +String | +An optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used. | +
password | +optional | +(none) | +String | +An optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used. | +
table-name | +required | +(none) | +String | +Table name of the MySQL database to monitor. | +
port | +optional | +15991 | +Integer | +Integer port number of the VTCtld server. | +
vtctld.host | +required | +(none) | +String | +IP address or hostname of the VTCtld server. | +
vtctld.port | +optional | +15999 | +Integer | +Integer port number of the VTCtld server. | +
vtctld.user | +optional | +(none) | +String | +An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used. | +
vtctld.password | +optional | +(none) | +String | +An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used. | +
tablet.type | +optional | +RDONLY | +String | +The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents streaming from the master MySQL instance REPLICA represents streaming from the replica slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance. | +
extends EventDispatcher
{
private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceEventDispatcher.class);
public static final String HISTORY_RECORD_FIELD = "historyRecord";
@@ -132,7 +134,9 @@ public ChangeEventQueue incrementalEventSource =
+ getIncrementalSnapshotChangeEventSource();
+ if (incrementalEventSource != null) {
+ incrementalEventSource.processSchemaChange(partition, dataCollectionId);
+ }
}
@Override
diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/EmbeddedFlinkDatabaseHistory.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/EmbeddedFlinkDatabaseHistory.java
index df7a4321a27..c08cd5fbe58 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/EmbeddedFlinkDatabaseHistory.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/EmbeddedFlinkDatabaseHistory.java
@@ -107,6 +107,12 @@ public void recover(
listener.recoveryStopped();
}
+ @Override
+ public void recover(
+ Map