Skip to content

Commit

Permalink
Refactor oracle temp table
Browse files Browse the repository at this point in the history
  • Loading branch information
karolisg committed Mar 28, 2024
1 parent 7a772b0 commit f800a4b
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 65 deletions.
5 changes: 3 additions & 2 deletions dozer-sink-aerospike/src/aerospike.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use itertools::Itertools;

use aerospike_client_sys::*;
use dozer_types::log::{debug, info};
use dozer_types::log::debug;
use dozer_types::{
chrono::{DateTime, NaiveDate},
geo::{Coord, Point},
Expand Down Expand Up @@ -258,6 +258,8 @@ impl Client {
&self,
batch: *mut as_batch_records,
) -> Result<(), AerospikeError> {
debug!(target: "aerospike_sink", "Writing batch of size {}", batch.as_ref().unwrap().list.size);

let started = Instant::now();
let policy = self.inner.as_ref().config.policies.batch;
as_try(|err| {
Expand Down Expand Up @@ -323,7 +325,6 @@ impl Client {
request: &CStr,
response: &mut *mut i8,
) -> Result<(), AerospikeError> {
info!("Info");
as_try(|err| {
aerospike_info_any(
self.inner.as_ptr(),
Expand Down
3 changes: 1 addition & 2 deletions dozer-sink-aerospike/src/denorm_dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use dozer_core::petgraph::visit::{
EdgeRef, IntoEdgesDirected, IntoNeighborsDirected, IntoNodeReferences,
};
use dozer_types::indexmap::IndexMap;
use dozer_types::log::debug;

use dozer_types::models::sink::{AerospikeSet, AerospikeSinkTable};
use dozer_types::thiserror;
use dozer_types::types::{Field, Record, Schema, TableOperation};
Expand Down Expand Up @@ -499,7 +499,6 @@ impl DenormalizationState {
.sum();

let batch_size: u32 = batch_size_upper_bound.try_into().unwrap();
debug!(target: "aerospike_sink", "Writing denorm batch of size {}", batch_size);
let mut write_batch = RecordBatch::new(batch_size, batch_size);

for node in self.dag.node_weights_mut() {
Expand Down
3 changes: 1 addition & 2 deletions dozer-sink-aerospike/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub use crate::aerospike::Client;
use aerospike_client_sys::*;
use denorm_dag::DenormalizationState;
use dozer_core::event::EventHub;
use dozer_types::log::{debug, error};
use dozer_types::log::error;
use dozer_types::models::connection::AerospikeConnection;
use dozer_types::node::OpIdentifier;
use dozer_types::thiserror;
Expand Down Expand Up @@ -440,7 +440,6 @@ impl AerospikeSinkWorker {
// Write denormed tables
let mut batch = RecordBatch::new(batch_size_est as u32, batch_size_est as u32);

debug!(target: "aerospike_sink", "Sink batch size {batch_size_est}");
for table in denormalized_tables {
for (key, record) in table.records {
batch.add_write(
Expand Down
89 changes: 30 additions & 59 deletions dozer-sink-oracle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ struct OracleSink {
select_metadata: String,
latest_txid: Option<u64>,
insert_statement: String,
delete_statement: String,
}

#[derive(Debug)]
Expand Down Expand Up @@ -270,11 +271,7 @@ impl OracleSinkFactory {
temp_table: Option<&Table>,
schema: &Schema,
) -> Result<(), Error> {
if self.validate_table(connection, table, schema)? {
return Ok(());
}

let mut column_defs = Vec::with_capacity(schema.fields.len() + 2);
let mut column_defs = Vec::with_capacity(schema.fields.len());
for field in &schema.fields {
let name = &field.name;
let col_type = match field.typ {
Expand All @@ -300,13 +297,16 @@ impl OracleSinkFactory {
if field.nullable { "" } else { " NOT NULL" }
));
}
let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n"));
info!("### CREATE TABLE #### \n: {:?}", table_query);
connection.execute(&table_query, &[])?;

if !(self.validate_table(connection, table, schema)?) {
let table_query = format!("CREATE TABLE {table} ({})", column_defs.join(",\n"));
info!("### CREATE TABLE ####\n{}", table_query);
connection.execute(&table_query, &[])?;
}

if let Some(temp_table) = temp_table {
let temp_table_query = format!("CREATE GLOBAL TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT DELETE ROWS", column_defs.join(",\n"));
info!("### CREATE TEMPORARY TABLE #### \n: {:?}", temp_table_query);
let temp_table_query = format!("CREATE PRIVATE TEMPORARY TABLE {temp_table} ({},\n {OPKIND_COL} NUMBER(1)) ON COMMIT PRESERVE DEFINITION", column_defs.join(",\n")).replace("NOT NULL", "");
info!("### CREATE TEMPORARY TABLE ####\n{}", temp_table_query);
connection.execute(&temp_table_query, &[])?;
}

Expand Down Expand Up @@ -357,44 +357,8 @@ impl OracleSinkFactory {

Ok(())
}

fn create_pk(
&self,
connection: &Connection,
table: &Table,
schema: &Schema,
) -> Result<(), Error> {
let mut columns = schema
.primary_index
.iter()
.map(|ix| schema.fields[*ix].name.clone())
.collect::<Vec<_>>();

columns.iter_mut().for_each(|col| {
*col = col.to_uppercase();
});

let index_name =
format!("{}_{}_{}_PK", table.owner, table.name, columns.join("_")).replace('#', "");

let query = "SELECT index_name FROM all_indexes WHERE table_name = :1 AND owner = :2";
info!("Index check query {query}");

let mut index_exist = connection.query(query, &[&table.name, &table.owner])?;
if index_exist.next().is_some() {
info!("Index {index_name} already exist");
} else {
let query = format!(
"ALTER TABLE {table} ADD CONSTRAINT {index_name} PRIMARY KEY ({})",
columns.join(", ")
);
info!("### CREATE PK #### \n: {index_name}. Query: {query}");
connection.execute(&query, &[])?;
}

Ok(())
}
}

fn generate_merge_statement(table: &Table, temp_table: &Table, schema: &Schema) -> String {
let field_names = schema
.fields
Expand Down Expand Up @@ -461,12 +425,6 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String {
.map(|field| field.name.as_str())
.chain([TXN_ID_COL, TXN_SEQ_COL, OPKIND_COL]);

let destination_columns = field_names
.clone()
.map(|name| format!("\"{name}\""))
.collect::<Vec<_>>()
.join(", ");

let mut parameter_index = 1usize..;
let input_fields = field_names
.clone()
Expand All @@ -480,13 +438,16 @@ fn generate_insert_statement(table: &Table, schema: &Schema) -> String {
// If the record exists, but the txid is higher than the operation's txid,
// do nothing (if the op is INSERT,
format!(
r#"INSERT INTO {table} ({destination_columns})
r#"INSERT INTO {table}
SELECT *
FROM
(SELECT {input_fields} FROM DUAL)
"#
)
}
fn generate_delete_statement(table: &Table) -> String {
format!(r#"DELETE FROM {table}"#)
}

#[derive(Debug, Clone)]
struct Table {
Expand Down Expand Up @@ -550,7 +511,7 @@ impl SinkFactory for OracleSinkFactory {

let temp_table = Table {
owner: self.table.owner.clone(),
name: format!("{}_TEMP", &self.table.name),
name: format!("ORA$PTT_{}", &self.table.name),
};

self.validate_or_create_table(
Expand All @@ -560,7 +521,6 @@ impl SinkFactory for OracleSinkFactory {
&amended_schema,
)?;
self.create_index(&connection, &self.table, &amended_schema)?;
self.create_pk(&connection, &temp_table, &amended_schema)?;
let meta_table = Table {
owner: self.table.owner.clone(),
name: METADATA_TABLE.to_owned(),
Expand Down Expand Up @@ -602,11 +562,21 @@ impl SinkFactory for OracleSinkFactory {

let field_types = schema.fields.iter().map(|field| field.typ).collect();

let merge_statement = generate_merge_statement(&self.table, &temp_table, &schema);
info!(target: "oracle_sink", "Merge statement {}", merge_statement);

let insert_statement = generate_insert_statement(&temp_table, &schema);
info!(target: "oracle_sink", "Insert statement {}", insert_statement);

let delete_statement = generate_delete_statement(&temp_table);
info!(target: "oracle_sink", "Delete statement {}", delete_statement);

Ok(Box::new(OracleSink {
conn: connection,
insert_append,
merge_statement: generate_merge_statement(&self.table, &temp_table, &schema),
insert_statement: generate_insert_statement(&temp_table, &schema),
merge_statement,
insert_statement,
delete_statement,
field_types,
pk: schema.primary_index,
batch_params: Vec::new(),
Expand Down Expand Up @@ -710,7 +680,7 @@ impl OracleSink {

self.conn.execute(&self.merge_statement, &[])?;

self.conn.commit()?;
self.conn.execute(&self.delete_statement, &[])?;

debug!(target: "oracle_sink", "Execution took {:?}", started.elapsed());
Ok(())
Expand Down Expand Up @@ -739,6 +709,7 @@ impl Sink for OracleSink {
&mut self,
_epoch_details: &dozer_core::epoch::Epoch,
) -> Result<(), dozer_types::errors::internal::BoxedError> {
// Ok(self.conn.commit()?)
Ok(())
}

Expand Down

0 comments on commit f800a4b

Please sign in to comment.