-
Notifications
You must be signed in to change notification settings - Fork 236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Oracle 基于logminer实时读 Connector 实现 #374
Comments
Oracle CDC 存在的问题CREATE TABLE DEBEZIUM."base" (
"base_id" NUMBER NOT NULL,
"start_time" TIMESTAMP NULL,
"update_date" DATE NULL,
"update_time" TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
"price" NUMBER(5, 2) NULL,
"json_content" CLOB NULL,
"col_blob" BLOB NULL,
"col_text" CLOB NULL,
CONSTRAINT PK_base PRIMARY KEY ("base_id")
)
LOB ("json_content", "col_blob", "col_text") STORE AS (TABLESPACE USERS); 通过以上 create table ddl 创建的 oracle base表,执行insert一条insert语句,注意json_content,col_blob,col_text 字段均有值,flink-cdc 使用log mining 的online_catalog 策略进行监听增量更新,在redo 日志文件中会生成insert 和 update类型的 event,为什么不能合并成一条insert 消息呢? 原因分析在使用 Flink-CDC 监听 Oracle 数据库的增量更新时,如果执行了一条 INSERT 语句,并且该语句包含了对 LOB 字段(如 CLOB 或 BLOB)的赋值,可能会导致在重做日志(redo log)中生成多条事件(例如一条 INSERT 和一条或更多 UPDATE 类型的事件)。这种行为的原因主要与 Oracle 数据库处理 LOB 数据的方式有关。以下是详细的解释: LOB 数据的特殊处理
解决方案为了确保 Flink-CDC 能够正确处理 LOB 字段并合并相关事件,你可以考虑以下措施:
ALTER TABLE DEBEZIUM."base" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
示例:合并事件的策略 如果你无法完全避免多条事件的产生,可以在 Flink 应用程序中实现逻辑来合并这些事件。比如,你可以设计一个状态管理器,它会跟踪每个记录的最新状态,并仅在检测到真正的变更时触发下游处理。 // 假设有一个状态管理器,它保存每个记录的最新版本
StateStore stateStore = ...;
// 当接收到新的变更事件时
for (ChangeRecord record : changeRecords) {
// 根据主键查找现有状态
Record existingRecord = stateStore.get(record.getKey());
if (existingRecord == null || !existingRecord.equals(record)) {
// 如果是新记录或确实发生了变更,则更新状态并发送给下游
stateStore.put(record.getKey(), record);
downstreamProcessor.process(record);
}
} 通过上述方法,你可以尽量减少由于 LOB 数据处理而导致的冗余事件,并确保应用程序能够正确反映数据库中的真实变化。 |
已经完成 |
启动时报以错误 Caused by: java.io.IOException: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.
at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:59)
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.validateAndLoadDatabaseHistory(OracleSourceFetchTaskContext.java:275)
at org.apache.flink.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext.configure(OracleSourceFetchTaskContext.java:118)
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.submitTask(IncrementalSourceStreamFetcher.java:84)
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.submitStreamSplit(IncrementalSourceSplitReader.java:261)
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:153)
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
... 8 more |
基于docker 的可提供logminer 的oracle 的服务端搭建 docker 启动命令:
确保本地目录有权限: 本地目录权限要打开:chmod -R 777 /root/oracle/oradata 按照flink-cdc 官网说明,打开redo logger 功能,链接:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/oracle-cdc/#for-non-cdb-database Enable log archiving(1.1). Connect to the database as DBAORACLE_SID=xe (1.2). Enable log archivingalter system set db_recovery_file_dest_size = 10G; // 需要确保目录存在,且owner 为oracle shutdown immediate; Enable log archiving requires database restart, pay attention when try to do it (1.3). Check whether log archiving is enabled-- Should now "Database log mode: Archive Mode" Supplemental logging must be enabled for captured tables or the database in order for data changes to capture the before state of changed database rows. The following illustrates how to configure this on the table/database level. -- Enable supplemental logging for a specific table: (2.1). Create Tablespacesqlplus sys/oracle@host:port/xe AS SYSDBA; (2.2). Create a user and grant permissionssqlplus sys/oracle@localhost:1521/xe AS SYSDBA; GRANT CREATE TABLE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; |
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.2/docs/connectors/flink-sources/oracle-cdc/
https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-oracle/src/main/resources/spec.json
The text was updated successfully, but these errors were encountered: