-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'main' into issue-1869-remove-old-deliverables
- Loading branch information
Showing
97 changed files
with
3,150 additions
and
2,281 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,173 @@ | ||
# | ||
# Load data from legacy (Oracle) tables to staging tables. | ||
# | ||
|
||
import logging | ||
import time | ||
|
||
import sqlalchemy | ||
|
||
import src.db.foreign | ||
import src.db.models.staging | ||
import src.logging | ||
import src.task.task | ||
from src.adapters import db | ||
from src.util import datetime_util | ||
|
||
from . import sql | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class LoadOracleDataTask(src.task.task.Task): | ||
"""Task to load data from legacy tables to staging tables.""" | ||
|
||
def __init__( | ||
self, | ||
db_session: db.Session, | ||
foreign_tables: dict[str, sqlalchemy.Table], | ||
staging_tables: dict[str, sqlalchemy.Table], | ||
) -> None: | ||
if foreign_tables.keys() != staging_tables.keys(): | ||
raise ValueError("keys of foreign_tables and staging_tables must be equal") | ||
|
||
super().__init__(db_session) | ||
self.foreign_tables = foreign_tables | ||
self.staging_tables = staging_tables | ||
|
||
def run_task(self) -> None: | ||
"""Main task process, called by run().""" | ||
with self.db_session.begin(): | ||
self.log_database_settings() | ||
self.load_data() | ||
|
||
def log_database_settings(self) -> None: | ||
"""Log database settings related to foreign tables for easier troubleshooting.""" | ||
metadata = sqlalchemy.MetaData() | ||
engine = self.db_session.bind | ||
|
||
# Use reflection to define built-in views as Table objects. | ||
foreign_servers = sqlalchemy.Table( | ||
"foreign_servers", metadata, autoload_with=engine, schema="information_schema" | ||
) | ||
foreign_server_options = sqlalchemy.Table( | ||
"foreign_server_options", metadata, autoload_with=engine, schema="information_schema" | ||
) | ||
|
||
logger.info( | ||
"foreign server settings", | ||
extra={ | ||
"foreign_servers": self.db_session.execute( | ||
sqlalchemy.select(foreign_servers) | ||
).all(), | ||
"foreign_server_options": self.db_session.execute( | ||
sqlalchemy.select(foreign_server_options) | ||
).all(), | ||
}, | ||
) | ||
|
||
def load_data(self) -> None: | ||
"""Load the data for all tables defined in the mapping.""" | ||
for table_name in self.foreign_tables: | ||
try: | ||
with self.db_session.begin(): | ||
self.load_data_for_table(table_name) | ||
except Exception: | ||
logger.exception("table load error", extra={"table": table_name}) | ||
|
||
def load_data_for_table(self, table_name: str) -> None: | ||
"""Load new and updated rows for a single table from the foreign table to the staging table.""" | ||
logger.info("process table", extra={"table": table_name}) | ||
foreign_table = self.foreign_tables[table_name] | ||
staging_table = self.staging_tables[table_name] | ||
|
||
self.log_row_count("before", foreign_table, staging_table) | ||
|
||
self.do_update(foreign_table, staging_table) | ||
self.do_insert(foreign_table, staging_table) | ||
self.do_mark_deleted(foreign_table, staging_table) | ||
|
||
self.log_row_count("after", staging_table) | ||
|
||
def do_insert(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table) -> int: | ||
"""Determine new rows by primary key, and copy them into the staging table.""" | ||
|
||
insert_from_select_sql, select_sql = sql.build_insert_select_sql( | ||
foreign_table, staging_table | ||
) | ||
|
||
# COUNT has to be a separate query as INSERTs don't return a rowcount. | ||
insert_count = self.db_session.query(select_sql.subquery()).count() | ||
|
||
self.increment("count.insert.total", insert_count) | ||
self.set_metrics({f"count.insert.{staging_table.name}": insert_count}) | ||
|
||
# Execute the INSERT. | ||
t0 = time.monotonic() | ||
self.db_session.execute(insert_from_select_sql) | ||
t1 = time.monotonic() | ||
|
||
self.set_metrics({f"time.insert.{staging_table.name}": round(t1 - t0, 3)}) | ||
|
||
return insert_count | ||
|
||
def do_update(self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table) -> int: | ||
"""Find updated rows using last_upd_date, copy them, and reset transformed_at to NULL.""" | ||
|
||
update_sql = sql.build_update_sql(foreign_table, staging_table).values(transformed_at=None) | ||
|
||
t0 = time.monotonic() | ||
result = self.db_session.execute(update_sql) | ||
t1 = time.monotonic() | ||
update_count = result.rowcount | ||
|
||
self.increment("count.update.total", update_count) | ||
self.set_metrics({f"count.update.{staging_table.name}": update_count}) | ||
self.set_metrics({f"time.update.{staging_table.name}": round(t1 - t0, 3)}) | ||
|
||
return update_count | ||
|
||
def do_mark_deleted( | ||
self, foreign_table: sqlalchemy.Table, staging_table: sqlalchemy.Table | ||
) -> int: | ||
"""Find deleted rows, set is_deleted=TRUE, and reset transformed_at to NULL.""" | ||
|
||
update_sql = sql.build_mark_deleted_sql(foreign_table, staging_table).values( | ||
transformed_at=None, | ||
deleted_at=datetime_util.utcnow(), | ||
) | ||
|
||
t0 = time.monotonic() | ||
result = self.db_session.execute(update_sql) | ||
t1 = time.monotonic() | ||
delete_count = result.rowcount | ||
|
||
self.increment("count.delete.total", delete_count) | ||
self.set_metrics({f"count.delete.{staging_table.name}": delete_count}) | ||
self.set_metrics({f"time.delete.{staging_table.name}": round(t1 - t0, 3)}) | ||
|
||
return delete_count | ||
|
||
def log_row_count(self, message: str, *tables: sqlalchemy.Table) -> None: | ||
"""Log the number of rows in each of the tables using SQL COUNT().""" | ||
extra = {} | ||
for table in tables: | ||
count = self.db_session.query(table).count() | ||
extra[f"count.{table.schema}.{table.name}"] = count | ||
self.set_metrics({f"count.{message}.{table.schema}.{table.name}": count}) | ||
logger.info(f"row count {message}", extra=extra, stacklevel=2) | ||
|
||
|
||
def main() -> None: | ||
with src.logging.init(__package__): | ||
db_client = db.PostgresDBClient() | ||
|
||
foreign_tables = {t.name: t for t in src.db.foreign.metadata.tables.values()} | ||
staging_tables = {t.name: t for t in src.db.models.staging.metadata.tables.values()} | ||
|
||
with db_client.get_session() as db_session: | ||
LoadOracleDataTask(db_session, foreign_tables, staging_tables).run() | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
# | ||
# SQL building for data load process. | ||
# | ||
|
||
import sqlalchemy | ||
|
||
|
||
def build_insert_select_sql( | ||
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table | ||
) -> tuple[sqlalchemy.Insert, sqlalchemy.Select]: | ||
"""Build an `INSERT INTO ... SELECT ... FROM ...` query for new rows.""" | ||
|
||
all_columns = tuple(c.name for c in source_table.columns) | ||
|
||
# Optimization: use a Common Table Expression (`WITH`) marked as MATERIALIZED. This directs the PostgreSQL | ||
# optimizer to run it first (prevents folding it into the parent query), so it only fetches the primary keys and | ||
# last_upd_date columns from Oracle to perform the date comparison. Without this materialized CTE, it fetches all | ||
# columns and all rows from Oracle before applying the WHERE, which is very slow for large tables. | ||
# | ||
# See https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-CTE-MATERIALIZATION | ||
|
||
# `WITH insert_pks AS MATERIALIZED (` | ||
cte = ( | ||
# `SELECT id1, id2, id3, ... FROM <source_table>` (id1, id2, ... is the multipart primary key) | ||
sqlalchemy.select(*source_table.primary_key.columns) | ||
.where( | ||
# `WHERE (id1, id2, id3, ...) NOT IN` | ||
sqlalchemy.tuple_(*source_table.primary_key.columns).not_in( | ||
# `(SELECT (id1, id2, id3, ...) FROM <destination_table>)` (subquery) | ||
sqlalchemy.select(*destination_table.primary_key.columns) | ||
) | ||
) | ||
.cte("insert_pks") | ||
.prefix_with("MATERIALIZED") | ||
) | ||
|
||
# `SELECT col1, col2, ..., FALSE AS is_deleted FROM <source_table>` | ||
select_sql = sqlalchemy.select( | ||
source_table, sqlalchemy.literal_column("FALSE").label("is_deleted") | ||
).where( | ||
# `WHERE (id1, id2, ...) | ||
# IN (SELECT insert_pks.id1, insert_pks.id2 | ||
# FROM insert_pks)` | ||
sqlalchemy.tuple_(*source_table.primary_key.columns).in_(sqlalchemy.select(*cte.columns)), | ||
) | ||
# `INSERT INTO <destination_table> (col1, col2, ..., is_deleted) SELECT ...` | ||
insert_from_select_sql = sqlalchemy.insert(destination_table).from_select( | ||
all_columns + (destination_table.c.is_deleted,), select_sql | ||
) | ||
|
||
return insert_from_select_sql, select_sql | ||
|
||
|
||
def build_update_sql( | ||
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table | ||
) -> sqlalchemy.Update: | ||
"""Build an `UPDATE ... SET ... WHERE ...` statement for updated rows.""" | ||
|
||
# Optimization: use a Common Table Expression (`WITH`) marked as MATERIALIZED. This directs the PostgreSQL | ||
# optimizer to run it first (prevents folding it into the parent query), so it only fetches the primary keys and | ||
# last_upd_date columns from Oracle to perform the date comparison. Without this materialized CTE, it fetches all | ||
# columns and all rows from Oracle before applying the WHERE, which is very slow for large tables. | ||
# | ||
# See https://www.postgresql.org/docs/current/queries-with.html#QUERIES-WITH-CTE-MATERIALIZATION | ||
|
||
# `WITH update_pks AS MATERIALIZED (` | ||
cte = ( | ||
# `SELECT id1, id2, id3, ... FROM <destination_table>` | ||
sqlalchemy.select(*destination_table.primary_key.columns) | ||
.join( | ||
# `JOIN <source_table> | ||
# ON (id1, id2, ...) = (id1, id2, ...)` | ||
source_table, | ||
sqlalchemy.tuple_(*destination_table.primary_key.columns) | ||
== sqlalchemy.tuple_(*source_table.primary_key.columns), | ||
) | ||
# `WHERE ...` | ||
.where(destination_table.c.last_upd_date < source_table.c.last_upd_date) | ||
.cte("update_pks") | ||
.prefix_with("MATERIALIZED") | ||
) | ||
|
||
return ( | ||
# `UPDATE <destination_table>` | ||
sqlalchemy.update(destination_table) | ||
# `SET col1=source_table.col1, col2=source_table.col2, ...` | ||
.values(dict(source_table.columns)) | ||
# `WHERE ...` | ||
.where( | ||
sqlalchemy.tuple_(*destination_table.primary_key.columns) | ||
== sqlalchemy.tuple_(*source_table.primary_key.columns), | ||
sqlalchemy.tuple_(*destination_table.primary_key.columns).in_( | ||
sqlalchemy.select(*cte.columns) | ||
), | ||
) | ||
) | ||
|
||
|
||
def build_mark_deleted_sql( | ||
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table | ||
) -> sqlalchemy.Update: | ||
"""Build an `UPDATE ... SET is_deleted = TRUE WHERE ...` statement for deleted rows.""" | ||
return ( | ||
# `UPDATE <destination_table>` | ||
sqlalchemy.update(destination_table) | ||
# `SET is_deleted = TRUE` | ||
.values(is_deleted=True) | ||
# `WHERE` | ||
.where( | ||
# `is_deleted == FALSE` | ||
destination_table.c.is_deleted == False, # noqa: E712 | ||
# `AND (id1, id2, id3, ...) NOT IN` (id1, id2, ... is the multipart primary key) | ||
sqlalchemy.tuple_(*destination_table.primary_key.columns).not_in( | ||
# `(SELECT (id1, id2, id3, ...) FROM <source_table>)` (subquery) | ||
sqlalchemy.select(*source_table.primary_key.columns) | ||
), | ||
) | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from typing import TypeAlias | ||
|
||
from src.db.models.staging.forecast import Tforecast, TforecastHist | ||
from src.db.models.staging.synopsis import Tsynopsis, TsynopsisHist | ||
|
||
SourceSummary: TypeAlias = Tforecast | Tsynopsis | TforecastHist | TsynopsisHist |
Oops, something went wrong.