Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions app/api/api_key_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def store_api_key(


@router.get(
"/result",
"/find",
response_model=ResponseMessage[list[APIKeyResponse]],
summary="저장된 모든 API KEY 정보 조회",
description="""
Expand All @@ -67,7 +67,7 @@ def get_all_api_keys(


@router.get(
"/result/{serviceName}",
"/find/{serviceName}",
response_model=ResponseMessage[APIKeyResponse],
summary="특정 서비스의 API KEY 정보 조회",
)
Expand Down
6 changes: 6 additions & 0 deletions app/core/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ class CommonCode(Enum):
"5105",
"디비 제약조건 또는 인덱스 정보 조회 중 에러가 발생했습니다.",
)
FAIL_FIND_CONSTRAINTS = (
status.HTTP_500_INTERNAL_SERVER_ERROR,
"5107",
"디비 제약조건 정보 조회 중 에러가 발생했습니다.",
)
FAIL_FIND_INDEXES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5108", "디비 인덱스 정보 조회 중 에러가 발생했습니다.")
FAIL_FIND_SAMPLE_ROWS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5106", "샘플 데이터 조회 중 에러가 발생했습니다.")
FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.")
FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.")
Expand Down
261 changes: 249 additions & 12 deletions app/repository/user_db_repository.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import sqlite3
from typing import Any

Expand Down Expand Up @@ -188,7 +189,7 @@ def find_tables(self, driver_module: Any, table_query: str, schema_name: str, **
if "%s" in table_query or "?" in table_query:
cursor.execute(table_query, (schema_name,))
elif ":owner" in table_query:
cursor.execute(table_query, {"owner": schema_name})
cursor.execute(table_query, {"owner": schema_name.upper()})
else:
cursor.execute(table_query)

Expand All @@ -212,15 +213,26 @@ def find_columns(
connection = self._connect(driver_module, **kwargs)
cursor = connection.cursor()

if db_type == DBTypesEnum.sqlite.name:
columns = []
db_type_lower = db_type.lower()

if db_type_lower == DBTypesEnum.sqlite.name:
columns = self._find_columns_for_sqlite(cursor, table_name)
elif db_type == DBTypesEnum.postgresql.name:
elif db_type_lower == DBTypesEnum.postgresql.name:
columns = self._find_columns_for_postgresql(cursor, schema_name, table_name)
elif db_type_lower == DBTypesEnum.oracle.name:
columns = self._find_columns_for_oracle(cursor, schema_name, table_name)
elif db_type_lower == DBTypesEnum.mysql.name:
pass
elif db_type_lower == DBTypesEnum.mariadb.name:
pass

else:
columns = self._find_columns_for_general(cursor, column_query, schema_name, table_name)

return ColumnListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_COLUMNS, columns=columns)
except Exception:
except Exception as e:
logging.error(f"Exception in find_columns for {schema_name}.{table_name}: {e}", exc_info=True)
return ColumnListResult(is_successful=False, code=CommonCode.FAIL_FIND_COLUMNS, columns=[])
finally:
if connection:
Expand Down Expand Up @@ -290,6 +302,93 @@ def _find_columns_for_postgresql(self, cursor: Any, schema_name: str, table_name
for c in columns_raw
]

def _find_columns_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[ColumnInfo]:
sql = """
SELECT
c.column_name,
c.data_type,
c.nullable,
c.data_default,
cc.comments,
CASE WHEN cons.constraint_type = 'P' THEN 1 ELSE 0 END AS is_pk,
c.data_length,
c.data_precision,
c.data_scale,
c.column_id as ordinal_position
FROM
user_tab_columns c
LEFT JOIN
user_col_comments cc ON c.table_name = cc.table_name AND c.column_name = cc.column_name
LEFT JOIN
(
SELECT
acc.table_name,
acc.column_name,
ac.constraint_type
FROM
user_constraints ac
JOIN
user_cons_columns acc ON ac.constraint_name = acc.constraint_name
WHERE
ac.constraint_type = 'P'
) cons ON c.table_name = cons.table_name AND c.column_name = cons.column_name
WHERE
c.table_name = :table_name
ORDER BY
c.column_id
"""
try:
logging.info(f"Executing find_columns_for_oracle for table: {table_name.upper()}")
cursor.execute(sql, {"table_name": table_name.upper()})
columns_raw = cursor.fetchall()
logging.info(f"Found {len(columns_raw)} raw columns for table: {table_name.upper()}")

columns = []
for c in columns_raw:
(
name,
data_type,
nullable,
default,
comment,
is_pk,
length,
precision,
scale,
ordinal_position,
) = c

if data_type in ["VARCHAR2", "NVARCHAR2", "CHAR", "RAW"]:
full_type = f"{data_type}({length})"
elif data_type == "NUMBER":
if precision is not None and scale is not None:
if precision == 38 and scale == 0:
full_type = "NUMBER"
else:
full_type = f"NUMBER({precision}, {scale})"
elif precision is not None:
full_type = f"NUMBER({precision})"
else:
full_type = "NUMBER"
else:
full_type = data_type

columns.append(
ColumnInfo(
name=name,
type=full_type,
nullable=(nullable == "Y"),
default=str(default).strip() if default is not None else None,
comment=comment,
is_pk=bool(is_pk),
ordinal_position=ordinal_position,
)
)
return columns
except Exception as e:
logging.error(f"Error in _find_columns_for_oracle for table {table_name}: {e}", exc_info=True)
return []

def _find_columns_for_general(
self, cursor: Any, column_query: str, schema_name: str, table_name: str
) -> list[ColumnInfo]:
Expand Down Expand Up @@ -334,19 +433,21 @@ def find_constraints(
) -> list[ConstraintInfo]:
"""
테이블의 제약 조건 정보를 조회합니다.
- 현재는 SQLite, PostgreSQL만 지원합니다.
- SQLite, PostgreSQL, Oracle을 지원합니다.
- 실패 시 DB 드라이버의 예외를 직접 발생시킵니다.
"""
connection = None
try:
connection = self._connect(driver_module, **kwargs)
cursor = connection.cursor()
db_type_lower = db_type.lower()

if db_type == DBTypesEnum.sqlite.name:
if db_type_lower == DBTypesEnum.sqlite.name:
return self._find_constraints_for_sqlite(cursor, table_name)
elif db_type == DBTypesEnum.postgresql.name:
elif db_type_lower == DBTypesEnum.postgresql.name:
return self._find_constraints_for_postgresql(cursor, schema_name, table_name)
# elif db_type == ...:
elif db_type_lower == DBTypesEnum.oracle.name:
return self._find_constraints_for_oracle(cursor, schema_name, table_name)
return []
finally:
if connection:
Expand Down Expand Up @@ -444,6 +545,85 @@ def _find_constraints_for_postgresql(self, cursor: Any, schema_name: str, table_
for name, data in constraint_map.items()
]

def _find_constraints_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[ConstraintInfo]:
sql = """
SELECT
ac.constraint_name,
ac.constraint_type,
acc.column_name,
ac.search_condition,
r_ac.table_name AS referenced_table,
r_acc.column_name AS referenced_column,
ac.delete_rule
FROM
user_constraints ac
JOIN
user_cons_columns acc ON ac.constraint_name = acc.constraint_name AND ac.table_name = acc.table_name
LEFT JOIN
user_constraints r_ac ON ac.r_constraint_name = r_ac.constraint_name
LEFT JOIN
user_cons_columns r_acc ON ac.r_constraint_name = r_acc.constraint_name AND acc.position = r_acc.position
WHERE
ac.table_name = :table_name
ORDER BY
ac.constraint_name, acc.position
"""
try:
logging.info(f"Executing find_constraints_for_oracle for table: {table_name.upper()}")
cursor.execute(sql, {"table_name": table_name.upper()})
raw_constraints = cursor.fetchall()
logging.info(f"Found {len(raw_constraints)} raw constraints for table: {table_name.upper()}")

constraint_map = {}
for row in raw_constraints:
name, const_type_char, column, check_expr, ref_table, ref_column, on_delete = row

const_type_map = {"P": "PRIMARY KEY", "R": "FOREIGN KEY", "U": "UNIQUE", "C": "CHECK"}
const_type = const_type_map.get(const_type_char)

if not const_type:
continue

if const_type == "CHECK":
check_expr_str = (str(check_expr) if check_expr else "").upper()
# "COL" IS NOT NULL 또는 COL IS NOT NULL 형식 모두 처리
if (
f'"{column.upper()}" IS NOT NULL' in check_expr_str
or f"{column.upper()} IS NOT NULL" in check_expr_str
):
continue

if name not in constraint_map:
constraint_map[name] = {
"type": const_type,
"columns": [],
"referenced_table": ref_table,
"referenced_columns": [],
"check_expression": check_expr if const_type == "CHECK" else None,
"on_delete": on_delete if const_type == "FOREIGN KEY" else None,
}

if column and column not in constraint_map[name]["columns"]:
constraint_map[name]["columns"].append(column)
if ref_column and ref_column not in constraint_map[name]["referenced_columns"]:
constraint_map[name]["referenced_columns"].append(ref_column)

return [
ConstraintInfo(
name=name,
type=data["type"],
columns=data["columns"],
referenced_table=data["referenced_table"],
referenced_columns=data["referenced_columns"] if data["referenced_columns"] else None,
check_expression=data["check_expression"],
on_delete=data["on_delete"],
)
for name, data in constraint_map.items()
]
except Exception as e:
logging.error(f"Error in _find_constraints_for_oracle for table {table_name}: {e}", exc_info=True)
return []

def find_indexes(
self, driver_module: Any, db_type: str, schema_name: str, table_name: str, **kwargs: Any
) -> list[IndexInfo]:
Expand All @@ -455,12 +635,14 @@ def find_indexes(
try:
connection = self._connect(driver_module, **kwargs)
cursor = connection.cursor()
db_type_lower = db_type.lower()

if db_type == DBTypesEnum.sqlite.name:
if db_type_lower == DBTypesEnum.sqlite.name:
return self._find_indexes_for_sqlite(cursor, table_name)
elif db_type == DBTypesEnum.postgresql.name:
elif db_type_lower == DBTypesEnum.postgresql.name:
return self._find_indexes_for_postgresql(cursor, schema_name, table_name)
# elif db_type == ...:
elif db_type_lower == DBTypesEnum.oracle.name:
return self._find_indexes_for_oracle(cursor, schema_name, table_name)
return []
finally:
if connection:
Expand Down Expand Up @@ -530,6 +712,45 @@ def _find_indexes_for_postgresql(self, cursor: Any, schema_name: str, table_name
for name, data in index_map.items()
]

def _find_indexes_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[IndexInfo]:
sql = """
SELECT
i.index_name,
i.uniqueness,
ic.column_name
FROM
user_indexes i
JOIN
user_ind_columns ic ON i.index_name = ic.index_name
LEFT JOIN
user_constraints ac ON i.index_name = ac.constraint_name AND ac.constraint_type = 'P'
WHERE
i.table_name = :table_name
AND ac.constraint_name IS NULL
ORDER BY
i.index_name, ic.column_position
"""
try:
logging.info(f"Executing find_indexes_for_oracle for table: {table_name.upper()}")
cursor.execute(sql, {"table_name": table_name.upper()})
raw_indexes = cursor.fetchall()
logging.info(f"Found {len(raw_indexes)} raw indexes for table: {table_name.upper()}")

index_map = {}
for row in raw_indexes:
index_name, uniqueness, column_name = row
if index_name not in index_map:
index_map[index_name] = {"columns": [], "is_unique": uniqueness == "UNIQUE"}
index_map[index_name]["columns"].append(column_name)

return [
IndexInfo(name=name, columns=data["columns"], is_unique=data["is_unique"])
for name, data in index_map.items()
]
except Exception:
# logging.error(f"Error in _find_indexes_for_oracle for table {table_name}: {e}", exc_info=True)
return []

def find_sample_rows(
self, driver_module: Any, db_type: str, schema_name: str, table_names: list[str], **kwargs: Any
) -> dict[str, list[dict[str, Any]]]:
Expand All @@ -546,7 +767,8 @@ def find_sample_rows(
return self._find_sample_rows_for_sqlite(cursor, table_names)
elif db_type == DBTypesEnum.postgresql.name:
return self._find_sample_rows_for_postgresql(cursor, schema_name, table_names)
# elif db_type == ...:
elif db_type == DBTypesEnum.oracle.name:
return self._find_sample_rows_for_oracle(cursor, schema_name, table_names)
return {table_name: [] for table_name in table_names}
finally:
if connection:
Expand Down Expand Up @@ -583,6 +805,21 @@ def _find_sample_rows_for_postgresql(
sample_rows_map[table_name] = []
return sample_rows_map

def _find_sample_rows_for_oracle(
self, cursor: Any, schema_name: str, table_names: list[str]
) -> dict[str, list[dict[str, Any]]]:
sample_rows_map = {}
for table_name in table_names:
try:
query = f'SELECT * FROM "{schema_name.upper()}"."{table_name.upper()}" FETCH FIRST 3 ROWS ONLY'
cursor.execute(query)
columns = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
sample_rows_map[table_name] = [dict(zip(columns, row, strict=False)) for row in rows]
except Exception:
sample_rows_map[table_name] = []
return sample_rows_map

# ─────────────────────────────
# DB 연결 메서드
# ─────────────────────────────
Expand Down
Loading