Skip to content

Commit

Permalink
Fix #13660 - Metadata Backup Memory Exhaustion (#13935)
Browse files Browse the repository at this point in the history
  • Loading branch information
pmbrull committed Nov 10, 2023
1 parent c00160e commit 2d32146
Showing 1 changed file with 17 additions and 15 deletions.
32 changes: 17 additions & 15 deletions ingestion/src/metadata/cli/db_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
import json
from functools import singledispatch
from pathlib import Path
from typing import List, Optional, Union
from typing import Iterable, List, Optional, Union

from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from sqlalchemy.engine import Engine, Row

from metadata.utils.constants import UTF_8

Expand Down Expand Up @@ -121,6 +121,13 @@ def get_hash_column_name(engine: Engine, table_name: str) -> Optional[str]:
return None


def run_query_iter(engine: Engine, query: str) -> Iterable[Row]:
"""Return a generator of rows, one row at a time, with a limit of 100 in-mem rows"""

for row in engine.execute(text(query)).yield_per(100):
yield row


def dump_json(tables: List[str], engine: Engine, output: Path) -> None:
"""
Dumps JSON data.
Expand All @@ -135,14 +142,10 @@ def dump_json(tables: List[str], engine: Engine, output: Path) -> None:

hash_column_name = get_hash_column_name(engine=engine, table_name=table)
if hash_column_name:
res = engine.execute(
text(
STATEMENT_HASH_JSON.format(
table=table, hash_column_name=hash_column_name
)
)
).all()
for row in res:
query = STATEMENT_HASH_JSON.format(
table=table, hash_column_name=hash_column_name
)
for row in run_query_iter(engine=engine, query=query):
insert = f"INSERT INTO {table} (json, {hash_column_name}) VALUES ({clean_col(row.json, engine)}, {clean_col(row[1], engine)});\n" # pylint: disable=line-too-long
file.write(insert)
else:
Expand All @@ -161,8 +164,8 @@ def dump_all(tables: List[str], engine: Engine, output: Path) -> None:
truncate = STATEMENT_TRUNCATE.format(table=table)
file.write(truncate)

res = engine.execute(text(STATEMENT_ALL.format(table=table))).all()
for row in res:
query = STATEMENT_ALL.format(table=table)
for row in run_query_iter(engine=engine, query=query):
data = ",".join(clean_col(col, engine) for col in row)

insert = f"INSERT INTO {table} VALUES ({data});\n"
Expand All @@ -180,16 +183,15 @@ def dump_entity_custom(engine: Engine, output: Path, inspector) -> None:

columns = inspector.get_columns(table_name=table)

statement = STATEMENT_ALL_NEW.format(
query = STATEMENT_ALL_NEW.format(
cols=",".join(
col["name"]
for col in columns
if col["name"] not in data["exclude_columns"]
),
table=table,
)
res = engine.execute(text(statement)).all()
for row in res:
for row in run_query_iter(engine=engine, query=query):
# Let's use .format here to not add more variables
# pylint: disable=consider-using-f-string
insert = "INSERT INTO {table} ({cols}) VALUES ({data});\n".format(
Expand Down

0 comments on commit 2d32146

Please sign in to comment.