Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Don't filter oracle V$LOG by rba #2482

Merged
merged 1 commit into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions dozer-ingestion/oracle/src/connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,10 +506,10 @@ mod tests {

env_logger::init();

let replicate_user = "DOZER";
let data_user = "DOZER";
let host = "database-1.cxtwfj9nkwtu.ap-southeast-1.rds.amazonaws.com";
let sid = "ORCL";
let replicate_user = "C##DOZER";
let data_user = "CHUBEI";
let host = "localhost";
let sid = "ORCLPDB1";

let mut connector = super::Connector::new(
"oracle".into(),
Expand All @@ -535,6 +535,7 @@ mod tests {
estimate_throughput(iterator);
let checkpoint = handle.join().unwrap().unwrap();

let sid = "ORCLCDB";
let mut connector = super::Connector::new(
"oracle".into(),
replicate_user.into(),
Expand Down
24 changes: 10 additions & 14 deletions dozer-ingestion/oracle/src/connector/replicate/log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ pub struct LogManagerContent {
pub seg_owner: Option<String>,
pub table_name: Option<String>,
pub rbasqn: u32,
pub rbablk: u32,
pub rbabyte: u16,
pub sql_redo: Option<String>,
pub csf: u8,
}
Expand Down Expand Up @@ -66,12 +64,11 @@ fn log_reader_loop(
ingestor: &Ingestor,
) {
#[derive(Debug, Clone, Copy)]
struct LastRba {
struct LastScn {
sqn: u32,
blk: u32,
byte: u16,
scn: Scn,
}
let mut last_rba: Option<LastRba> = None;
let mut last_scn: Option<LastScn> = None;

loop {
debug!(target: "oracle_replication", "Listing logs starting from SCN {}", start_scn);
Expand Down Expand Up @@ -99,18 +96,18 @@ fn log_reader_loop(
let log = logs.remove(0);
debug!(target: "oracle_replication",
"Reading log {} ({}) ({}, {}), starting from {:?}",
log.name, log.sequence, log.first_change, log.next_change, last_rba
log.name, log.sequence, log.first_change, log.next_change, last_scn
);

let iterator = {
let last_rba = last_rba.and_then(|last_rba| {
if log.sequence == last_rba.sqn {
Some((last_rba.blk, last_rba.byte))
let last_scn = last_scn.and_then(|last_scn| {
if last_scn.sqn == log.sequence {
Some(last_scn.scn)
} else {
None
}
});
match reader.read(connection, &log.name, last_rba, con_id) {
match reader.read(connection, &log.name, last_scn, con_id) {
Ok(iterator) => iterator,
Err(e) => {
if ingestor.is_closed() {
Expand All @@ -133,10 +130,9 @@ fn log_reader_loop(
break 'replicate_logs;
}
};
last_rba = Some(LastRba {
last_scn = Some(LastScn {
sqn: content.rbasqn,
blk: content.rbablk,
byte: content.rbabyte,
scn: content.scn,
});
if sender.send(content).is_err() {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;

use dozer_ingestion_connector::dozer_types::{
chrono::{DateTime, Utc},
log::{error, trace},
Expand Down Expand Up @@ -44,51 +46,65 @@ impl RedoReader for LogMiner {
&self,
connection: &'a Connection,
log_file_name: &str,
last_rba: Option<(u32, u16)>,
last_scn: Option<Scn>,
con_id: Option<u32>,
) -> Result<Self::Iterator<'a>, Error> {
let sql =
"BEGIN DBMS_LOGMNR.ADD_LOGFILE(LOGFILENAME => :name, OPTIONS => DBMS_LOGMNR.NEW); END;";
trace!(target: "oracle_log_miner", "{}, {}", sql, log_file_name);
connection.execute(sql, &[&str_to_sql!(log_file_name)])?;

let sql = "
if let Some(last_scn) = last_scn {
let start_scn = last_scn + 1;
let sql = "
BEGIN
DBMS_LOGMNR.START_LOGMNR(
STARTSCN => :start_scn,
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
trace!(target: "oracle_log_miner", "{}", sql);
connection.execute(sql, &[])?;
trace!(target: "oracle_log_miner", "{}, {}", sql, start_scn);
connection.execute(sql, &[&start_scn])?;
} else {
let sql = "
BEGIN
DBMS_LOGMNR.START_LOGMNR(
OPTIONS =>
DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG +
DBMS_LOGMNR.PRINT_PRETTY_SQL +
DBMS_LOGMNR.NO_ROWID_IN_STMT
);
END;";
trace!(target: "oracle_log_miner", "{}", sql);
connection.execute(sql, &[])?;
};
let stmt = |sql| {
connection
.statement(sql)
.fetch_array_size(self.fetch_batch_size)
.build()
};

let base_sql = "SELECT SCN, TIMESTAMP, XID, PXID, OPERATION_CODE, SEG_OWNER, TABLE_NAME, RBASQN, RBABLK, RBABYTE, SQL_REDO, CSF FROM V$LOGMNR_CONTENTS";
let rba_filter = "(RBABLK > :last_blk OR (RBABLK = :last_blk AND RBABYTE > :last_byte))";
let base_sql = "SELECT SCN, TIMESTAMP, XID, PXID, OPERATION_CODE, SEG_OWNER, TABLE_NAME, RBASQN, SQL_REDO, CSF FROM V$LOGMNR_CONTENTS";
let operation_code_filter = env::var("DOZER_ORACLE_LOG_MINER_OPERATION_CODE_FILTER").ok();
let con_id_filter = "SRC_CON_ID = :con_id";
let started = std::time::Instant::now();
let result_set = match (last_rba, con_id) {
(Some((last_blk, last_byte)), Some(con_id)) => {
let sql = format!("{} WHERE {} AND {}", base_sql, rba_filter, con_id_filter);
trace!(target: "oracle_log_miner", "{}, {}, {}, {}", sql, last_blk, last_byte, con_id);
stmt(&sql)?.into_result_set_named(&[
("last_blk", &last_blk),
("last_byte", &last_byte),
("con_id", &con_id),
])
let result_set = match (operation_code_filter, con_id) {
(Some(operation_code_filter), Some(con_id)) => {
let sql = format!(
"{} WHERE {} AND {}",
base_sql, operation_code_filter, con_id_filter
);
trace!(target: "oracle_log_miner", "{}, {}", sql, con_id);
stmt(&sql)?.into_result_set_named(&[("con_id", &con_id)])
}
(Some((last_blk, last_byte)), None) => {
let sql = format!("{} WHERE {}", base_sql, rba_filter);
trace!(target: "oracle_log_miner", "{}, {}, {}", sql, last_blk, last_byte);
stmt(&sql)?
.into_result_set_named(&[("last_blk", &last_blk), ("last_byte", &last_byte)])
(Some(operation_code_filter), None) => {
let sql = format!("{} WHERE {}", base_sql, operation_code_filter);
trace!(target: "oracle_log_miner", "{}", sql);
stmt(&sql)?.into_result_set(&[])
}
(None, Some(con_id)) => {
let sql = format!("{} WHERE {}", base_sql, con_id_filter);
Expand Down Expand Up @@ -121,8 +137,6 @@ impl RowValue for LogManagerContent {
seg_owner,
table_name,
rbasqn,
rbablk,
rbabyte,
sql_redo,
csf,
) = <(
Expand All @@ -134,8 +148,6 @@ impl RowValue for LogManagerContent {
Option<String>,
Option<String>,
u32,
u32,
u16,
Option<String>,
u8,
) as RowValue>::get(row)?;
Expand All @@ -148,8 +160,6 @@ impl RowValue for LogManagerContent {
seg_owner,
table_name,
rbasqn,
rbablk,
rbabyte,
sql_redo,
csf,
})
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use oracle::Connection;

use crate::connector::Error;
use crate::connector::{Error, Scn};

/// Given a log file name, a redo reader emits `LogManagerContent` rows
pub trait RedoReader {
type Iterator<'a>: Iterator<Item = Result<LogManagerContent, Error>>;

/// Reads the `LogManagerContent` rows that have:
///
/// - scn >= start_scn
/// - rba > last_rba.0 || (rba == last_rba.0 && rbabyte > last_rba.1)
/// - scn > last_scn
fn read<'a>(
&self,
connection: &'a Connection,
log_file_name: &str,
last_rba: Option<(u32, u16)>,
last_scn: Option<Scn>,
con_id: Option<u32>,
) -> Result<Self::Iterator<'a>, Error>;
}
Expand Down
Loading