diff --git a/app/api/api_key_api.py b/app/api/api_key_api.py index 0617237..d14cf52 100644 --- a/app/api/api_key_api.py +++ b/app/api/api_key_api.py @@ -40,7 +40,7 @@ def store_api_key( @router.get( - "/result", + "/find", response_model=ResponseMessage[list[APIKeyResponse]], summary="저장된 모든 API KEY 정보 조회", description=""" @@ -67,7 +67,7 @@ def get_all_api_keys( @router.get( - "/result/{serviceName}", + "/find/{serviceName}", response_model=ResponseMessage[APIKeyResponse], summary="특정 서비스의 API KEY 정보 조회", ) diff --git a/app/core/status.py b/app/core/status.py index 130614f..2acf92c 100644 --- a/app/core/status.py +++ b/app/core/status.py @@ -129,6 +129,12 @@ class CommonCode(Enum): "5105", "디비 제약조건 또는 인덱스 정보 조회 중 에러가 발생했습니다.", ) + FAIL_FIND_CONSTRAINTS = ( + status.HTTP_500_INTERNAL_SERVER_ERROR, + "5107", + "디비 제약조건 정보 조회 중 에러가 발생했습니다.", + ) + FAIL_FIND_INDEXES = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5108", "디비 인덱스 정보 조회 중 에러가 발생했습니다.") FAIL_FIND_SAMPLE_ROWS = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5106", "샘플 데이터 조회 중 에러가 발생했습니다.") FAIL_SAVE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5130", "디비 정보 저장 중 에러가 발생했습니다.") FAIL_UPDATE_PROFILE = (status.HTTP_500_INTERNAL_SERVER_ERROR, "5150", "디비 정보 업데이트 중 에러가 발생했습니다.") diff --git a/app/repository/user_db_repository.py b/app/repository/user_db_repository.py index d30d7c7..eb3421b 100644 --- a/app/repository/user_db_repository.py +++ b/app/repository/user_db_repository.py @@ -1,3 +1,4 @@ +import logging import sqlite3 from typing import Any @@ -188,7 +189,7 @@ def find_tables(self, driver_module: Any, table_query: str, schema_name: str, ** if "%s" in table_query or "?" in table_query: cursor.execute(table_query, (schema_name,)) elif ":owner" in table_query: - cursor.execute(table_query, {"owner": schema_name}) + cursor.execute(table_query, {"owner": schema_name.upper()}) else: cursor.execute(table_query) @@ -212,15 +213,26 @@ def find_columns( connection = self._connect(driver_module, **kwargs) cursor = connection.cursor() - if db_type == DBTypesEnum.sqlite.name: + columns = [] + db_type_lower = db_type.lower() + + if db_type_lower == DBTypesEnum.sqlite.name: columns = self._find_columns_for_sqlite(cursor, table_name) - elif db_type == DBTypesEnum.postgresql.name: + elif db_type_lower == DBTypesEnum.postgresql.name: columns = self._find_columns_for_postgresql(cursor, schema_name, table_name) + elif db_type_lower == DBTypesEnum.oracle.name: + columns = self._find_columns_for_oracle(cursor, schema_name, table_name) + elif db_type_lower == DBTypesEnum.mysql.name: + pass + elif db_type_lower == DBTypesEnum.mariadb.name: + pass + else: columns = self._find_columns_for_general(cursor, column_query, schema_name, table_name) return ColumnListResult(is_successful=True, code=CommonCode.SUCCESS_FIND_COLUMNS, columns=columns) - except Exception: + except Exception as e: + logging.error(f"Exception in find_columns for {schema_name}.{table_name}: {e}", exc_info=True) return ColumnListResult(is_successful=False, code=CommonCode.FAIL_FIND_COLUMNS, columns=[]) finally: if connection: @@ -290,6 +302,93 @@ def _find_columns_for_postgresql(self, cursor: Any, schema_name: str, table_name for c in columns_raw ] + def _find_columns_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[ColumnInfo]: + sql = """ + SELECT + c.column_name, + c.data_type, + c.nullable, + c.data_default, + cc.comments, + CASE WHEN cons.constraint_type = 'P' THEN 1 ELSE 0 END AS is_pk, + c.data_length, + c.data_precision, + c.data_scale, + c.column_id as ordinal_position + FROM + user_tab_columns c + LEFT JOIN + user_col_comments cc ON c.table_name = cc.table_name AND c.column_name = cc.column_name + LEFT JOIN + ( + SELECT + acc.table_name, + acc.column_name, + ac.constraint_type + FROM + user_constraints ac + JOIN + user_cons_columns acc ON ac.constraint_name = acc.constraint_name + WHERE + ac.constraint_type = 'P' + ) cons ON c.table_name = cons.table_name AND c.column_name = cons.column_name + WHERE + c.table_name = :table_name + ORDER BY + c.column_id + """ + try: + logging.info(f"Executing find_columns_for_oracle for table: {table_name.upper()}") + cursor.execute(sql, {"table_name": table_name.upper()}) + columns_raw = cursor.fetchall() + logging.info(f"Found {len(columns_raw)} raw columns for table: {table_name.upper()}") + + columns = [] + for c in columns_raw: + ( + name, + data_type, + nullable, + default, + comment, + is_pk, + length, + precision, + scale, + ordinal_position, + ) = c + + if data_type in ["VARCHAR2", "NVARCHAR2", "CHAR", "RAW"]: + full_type = f"{data_type}({length})" + elif data_type == "NUMBER": + if precision is not None and scale is not None: + if precision == 38 and scale == 0: + full_type = "NUMBER" + else: + full_type = f"NUMBER({precision}, {scale})" + elif precision is not None: + full_type = f"NUMBER({precision})" + else: + full_type = "NUMBER" + else: + full_type = data_type + + columns.append( + ColumnInfo( + name=name, + type=full_type, + nullable=(nullable == "Y"), + default=str(default).strip() if default is not None else None, + comment=comment, + is_pk=bool(is_pk), + ordinal_position=ordinal_position, + ) + ) + return columns + except Exception as e: + logging.error(f"Error in _find_columns_for_oracle for table {table_name}: {e}", exc_info=True) + return [] + def _find_columns_for_general( self, cursor: Any, column_query: str, schema_name: str, table_name: str ) -> list[ColumnInfo]: @@ -334,19 +433,21 @@ def find_constraints( ) -> list[ConstraintInfo]: """ 테이블의 제약 조건 정보를 조회합니다. - - 현재는 SQLite, PostgreSQL만 지원합니다. + - SQLite, PostgreSQL, Oracle을 지원합니다. - 실패 시 DB 드라이버의 예외를 직접 발생시킵니다. """ connection = None try: connection = self._connect(driver_module, **kwargs) cursor = connection.cursor() + db_type_lower = db_type.lower() - if db_type == DBTypesEnum.sqlite.name: + if db_type_lower == DBTypesEnum.sqlite.name: return self._find_constraints_for_sqlite(cursor, table_name) - elif db_type == DBTypesEnum.postgresql.name: + elif db_type_lower == DBTypesEnum.postgresql.name: return self._find_constraints_for_postgresql(cursor, schema_name, table_name) - # elif db_type == ...: + elif db_type_lower == DBTypesEnum.oracle.name: + return self._find_constraints_for_oracle(cursor, schema_name, table_name) return [] finally: if connection: @@ -444,6 +545,85 @@ def _find_constraints_for_postgresql(self, cursor: Any, schema_name: str, table_ for name, data in constraint_map.items() ] + def _find_constraints_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[ConstraintInfo]: + sql = """ + SELECT + ac.constraint_name, + ac.constraint_type, + acc.column_name, + ac.search_condition, + r_ac.table_name AS referenced_table, + r_acc.column_name AS referenced_column, + ac.delete_rule + FROM + user_constraints ac + JOIN + user_cons_columns acc ON ac.constraint_name = acc.constraint_name AND ac.table_name = acc.table_name + LEFT JOIN + user_constraints r_ac ON ac.r_constraint_name = r_ac.constraint_name + LEFT JOIN + user_cons_columns r_acc ON ac.r_constraint_name = r_acc.constraint_name AND acc.position = r_acc.position + WHERE + ac.table_name = :table_name + ORDER BY + ac.constraint_name, acc.position + """ + try: + logging.info(f"Executing find_constraints_for_oracle for table: {table_name.upper()}") + cursor.execute(sql, {"table_name": table_name.upper()}) + raw_constraints = cursor.fetchall() + logging.info(f"Found {len(raw_constraints)} raw constraints for table: {table_name.upper()}") + + constraint_map = {} + for row in raw_constraints: + name, const_type_char, column, check_expr, ref_table, ref_column, on_delete = row + + const_type_map = {"P": "PRIMARY KEY", "R": "FOREIGN KEY", "U": "UNIQUE", "C": "CHECK"} + const_type = const_type_map.get(const_type_char) + + if not const_type: + continue + + if const_type == "CHECK": + check_expr_str = (str(check_expr) if check_expr else "").upper() + # "COL" IS NOT NULL 또는 COL IS NOT NULL 형식 모두 처리 + if ( + f'"{column.upper()}" IS NOT NULL' in check_expr_str + or f"{column.upper()} IS NOT NULL" in check_expr_str + ): + continue + + if name not in constraint_map: + constraint_map[name] = { + "type": const_type, + "columns": [], + "referenced_table": ref_table, + "referenced_columns": [], + "check_expression": check_expr if const_type == "CHECK" else None, + "on_delete": on_delete if const_type == "FOREIGN KEY" else None, + } + + if column and column not in constraint_map[name]["columns"]: + constraint_map[name]["columns"].append(column) + if ref_column and ref_column not in constraint_map[name]["referenced_columns"]: + constraint_map[name]["referenced_columns"].append(ref_column) + + return [ + ConstraintInfo( + name=name, + type=data["type"], + columns=data["columns"], + referenced_table=data["referenced_table"], + referenced_columns=data["referenced_columns"] if data["referenced_columns"] else None, + check_expression=data["check_expression"], + on_delete=data["on_delete"], + ) + for name, data in constraint_map.items() + ] + except Exception as e: + logging.error(f"Error in _find_constraints_for_oracle for table {table_name}: {e}", exc_info=True) + return [] + def find_indexes( self, driver_module: Any, db_type: str, schema_name: str, table_name: str, **kwargs: Any ) -> list[IndexInfo]: @@ -455,12 +635,14 @@ def find_indexes( try: connection = self._connect(driver_module, **kwargs) cursor = connection.cursor() + db_type_lower = db_type.lower() - if db_type == DBTypesEnum.sqlite.name: + if db_type_lower == DBTypesEnum.sqlite.name: return self._find_indexes_for_sqlite(cursor, table_name) - elif db_type == DBTypesEnum.postgresql.name: + elif db_type_lower == DBTypesEnum.postgresql.name: return self._find_indexes_for_postgresql(cursor, schema_name, table_name) - # elif db_type == ...: + elif db_type_lower == DBTypesEnum.oracle.name: + return self._find_indexes_for_oracle(cursor, schema_name, table_name) return [] finally: if connection: @@ -530,6 +712,45 @@ def _find_indexes_for_postgresql(self, cursor: Any, schema_name: str, table_name for name, data in index_map.items() ] + def _find_indexes_for_oracle(self, cursor: Any, schema_name: str, table_name: str) -> list[IndexInfo]: + sql = """ + SELECT + i.index_name, + i.uniqueness, + ic.column_name + FROM + user_indexes i + JOIN + user_ind_columns ic ON i.index_name = ic.index_name + LEFT JOIN + user_constraints ac ON i.index_name = ac.constraint_name AND ac.constraint_type = 'P' + WHERE + i.table_name = :table_name + AND ac.constraint_name IS NULL + ORDER BY + i.index_name, ic.column_position + """ + try: + logging.info(f"Executing find_indexes_for_oracle for table: {table_name.upper()}") + cursor.execute(sql, {"table_name": table_name.upper()}) + raw_indexes = cursor.fetchall() + logging.info(f"Found {len(raw_indexes)} raw indexes for table: {table_name.upper()}") + + index_map = {} + for row in raw_indexes: + index_name, uniqueness, column_name = row + if index_name not in index_map: + index_map[index_name] = {"columns": [], "is_unique": uniqueness == "UNIQUE"} + index_map[index_name]["columns"].append(column_name) + + return [ + IndexInfo(name=name, columns=data["columns"], is_unique=data["is_unique"]) + for name, data in index_map.items() + ] + except Exception: + # logging.error(f"Error in _find_indexes_for_oracle for table {table_name}: {e}", exc_info=True) + return [] + def find_sample_rows( self, driver_module: Any, db_type: str, schema_name: str, table_names: list[str], **kwargs: Any ) -> dict[str, list[dict[str, Any]]]: @@ -546,7 +767,8 @@ def find_sample_rows( return self._find_sample_rows_for_sqlite(cursor, table_names) elif db_type == DBTypesEnum.postgresql.name: return self._find_sample_rows_for_postgresql(cursor, schema_name, table_names) - # elif db_type == ...: + elif db_type == DBTypesEnum.oracle.name: + return self._find_sample_rows_for_oracle(cursor, schema_name, table_names) return {table_name: [] for table_name in table_names} finally: if connection: @@ -583,6 +805,21 @@ def _find_sample_rows_for_postgresql( sample_rows_map[table_name] = [] return sample_rows_map + def _find_sample_rows_for_oracle( + self, cursor: Any, schema_name: str, table_names: list[str] + ) -> dict[str, list[dict[str, Any]]]: + sample_rows_map = {} + for table_name in table_names: + try: + query = f'SELECT * FROM "{schema_name.upper()}"."{table_name.upper()}" FETCH FIRST 3 ROWS ONLY' + cursor.execute(query) + columns = [desc[0] for desc in cursor.description] + rows = cursor.fetchall() + sample_rows_map[table_name] = [dict(zip(columns, row, strict=False)) for row in rows] + except Exception: + sample_rows_map[table_name] = [] + return sample_rows_map + # ───────────────────────────── # DB 연결 메서드 # ───────────────────────────── diff --git a/app/services/annotation_service.py b/app/services/annotation_service.py index e10fe0b..5da83e2 100644 --- a/app/services/annotation_service.py +++ b/app/services/annotation_service.py @@ -1,3 +1,4 @@ +import logging import sqlite3 from datetime import datetime from typing import Any @@ -34,7 +35,6 @@ from app.schemas.user_db.result_model import TableInfo as UserDBTableInfo from app.services.user_db_service import UserDbService, user_db_service -annotation_repository_dependency = Depends(lambda: annotation_repository) user_db_service_dependency = Depends(lambda: user_db_service) # AI 서버의 주소 (임시) @@ -63,6 +63,7 @@ async def create_annotation(self, request: AnnotationCreateRequest) -> FullAnnot 3. TODO: AI 서버에 요청 (현재는 Mock 데이터 사용) 4. 트랜잭션 내에서 전체 어노테이션 정보 저장 및 DB 프로필 업데이트 """ + logging.info(f"Starting annotation creation for db_profile_id: {request.db_profile_id}") try: request.validate() except ValueError as e: @@ -70,15 +71,23 @@ async def create_annotation(self, request: AnnotationCreateRequest) -> FullAnnot # 1. DB 프로필, 전체 스키마 정보, 샘플 데이터 조회 db_profile = self.user_db_service.find_profile(request.db_profile_id) + logging.info("Successfully fetched DB profile.") + full_schema_info = self.user_db_service.get_full_schema_info(db_profile) + logging.info(f"Successfully fetched full schema info with {len(full_schema_info)} tables.") + sample_rows = self.user_db_service.get_sample_rows(db_profile, full_schema_info) + logging.info(f"Successfully fetched sample rows for {len(sample_rows)} tables.") # 2. AI 서버에 요청할 데이터 모델 생성 ai_request_body = self._prepare_ai_request_body(db_profile, full_schema_info, sample_rows) - print(ai_request_body.model_dump_json(indent=2)) + logging.info("Prepared AI request body.") + logging.debug(f"AI Request Body: {ai_request_body.model_dump_json(indent=2)}") # 3. AI 서버에 요청 (현재는 Mock 데이터 사용) ai_response = await self._request_annotation_to_ai_server(ai_request_body) + logging.info("Received AI response.") + logging.debug(f"AI Response: {ai_response}") # 4. 트랜잭션 내에서 전체 어노테이션 정보 저장 및 DB 프로필 업데이트 db_path = get_db_path() @@ -90,23 +99,29 @@ async def create_annotation(self, request: AnnotationCreateRequest) -> FullAnnot db_models = self._transform_ai_response_to_db_models( ai_response, db_profile, request.db_profile_id, full_schema_info ) + logging.info("Transformed AI response to DB models.") self.repository.create_full_annotation(db_conn=conn, **db_models) + logging.info("Successfully saved full annotation to the database.") annotation_id = db_models["db_annotation"].id self.repository.update_db_profile_annotation_id( db_conn=conn, db_profile_id=request.db_profile_id, annotation_id=annotation_id ) + logging.info(f"Updated db_profile with new annotation_id: {annotation_id}") conn.commit() + logging.info("Database transaction committed.") except sqlite3.Error as e: if conn: conn.rollback() + logging.error("Database transaction failed and rolled back.", exc_info=True) raise APIException(CommonCode.FAIL_CREATE_ANNOTATION, detail=f"Database transaction failed: {e}") from e finally: if conn: conn.close() + logging.info(f"Annotation creation process completed for annotation_id: {annotation_id}") return self.get_full_annotation(annotation_id) def get_annotation_by_db_profile_id(self, db_profile_id: str) -> FullAnnotationResponse: @@ -225,6 +240,9 @@ def _transform_ai_response_to_db_models( for tbl_data in ai_response.get("tables", []): original_table = schema_lookup.get(tbl_data["table_name"]) if not original_table: + logging.warning( + f"Table '{tbl_data['table_name']}' from AI response not found in original schema. Skipping." + ) continue ( diff --git a/app/services/user_db_service.py b/app/services/user_db_service.py index 782f396..e139ced 100644 --- a/app/services/user_db_service.py +++ b/app/services/user_db_service.py @@ -1,6 +1,7 @@ # app/service/driver_service.py import importlib +import logging import sqlite3 from typing import Any @@ -180,68 +181,112 @@ def get_full_schema_info( DB 프로필 정보를 받아 해당 데이터베이스의 전체 스키마 정보 (테이블, 컬럼, 제약조건, 인덱스)를 조회하여 반환합니다. """ + logging.info(f"Starting schema scan for db_profile: {db_info.id}") try: driver_module = self._get_driver_module(db_info.type) connect_kwargs = self._prepare_connection_args(db_info) - - # 1. 모든 스키마(DB) 목록 조회 - schemas_result = repository.find_schemas( - driver_module, self._get_schema_query(db_info.type), **connect_kwargs - ) - if not schemas_result.is_successful: - raise APIException(schemas_result.code) + schemas_to_scan = self._get_schemas_to_scan(db_info, repository, driver_module, connect_kwargs) full_schema_info = [] - - # 2. 각 스키마의 모든 테이블 목록 조회 - for schema_name in schemas_result.schemas: + for schema_name in schemas_to_scan: tables_result = repository.find_tables( driver_module, self._get_table_query(db_info.type), schema_name, **connect_kwargs ) + logging.info( + f"Found {len(tables_result.tables)} tables in schema '{schema_name}': {tables_result.tables}" + ) + if not tables_result.is_successful: - # 특정 스키마에서 테이블 조회 실패 시 건너뛰거나 로깅 + logging.warning(f"Failed to find tables for schema '{schema_name}'. Skipping.") continue - # 3. 각 테이블의 상세 정보 조회 for table_name in tables_result.tables: - columns_result = repository.find_columns( - driver_module, - self._get_column_query(db_info.type), - schema_name, - db_info.type, - table_name, - **connect_kwargs, - ) - - try: - constraints = repository.find_constraints( - driver_module, db_info.type, schema_name, table_name, **connect_kwargs - ) - indexes = repository.find_indexes( - driver_module, db_info.type, schema_name, table_name, **connect_kwargs - ) - except (sqlite3.Error, self._get_driver_module(db_info.type).Error) as e: - # 레포지토리에서 발생한 DB 예외를 서비스에서 처리 - raise APIException(CommonCode.FAIL_FIND_CONSTRAINTS_OR_INDEXES) from e - - table_info = TableInfo( - name=table_name, - columns=columns_result.columns if columns_result.is_successful else [], - constraints=constraints, - indexes=indexes, - comment=None, # 테이블 코멘트는 현재 조회 로직에 없음 + table_info = self._get_table_details( + driver_module, db_info, schema_name, table_name, connect_kwargs, repository ) full_schema_info.append(table_info) + logging.info( + f"Finished schema scan. Total tables found: {len(full_schema_info)}. " + f"Table names: {[t.name for t in full_schema_info]}" + ) return full_schema_info - except APIException: - # 이미 APIException인 경우 그대로 전달 raise except Exception as e: - # 그 외 모든 예외는 일반 실패로 처리 + logging.error("An unexpected error occurred in get_full_schema_info", exc_info=True) raise APIException(CommonCode.FAIL) from e + def _get_schemas_to_scan( + self, + db_info: AllDBProfileInfo, + repository: UserDbRepository, + driver_module: Any, + connect_kwargs: dict[str, Any], + ) -> set[str]: + """어노테이션을 위해 스캔할 스키마 목록을 결정합니다.""" + schemas_to_scan = set() + if db_info.type.lower() == "oracle": + if not db_info.username: + logging.error("Oracle profile is missing a username, cannot determine schema.") + raise APIException(CommonCode.FAIL_FIND_SCHEMAS) + schemas_to_scan.add(db_info.username) + logging.info(f"Oracle DB detected. Limiting schema scan to user: {db_info.username}") + else: + logging.info(f"'{db_info.type}' DB detected. Fetching all available schemas.") + schemas_result = repository.find_schemas( + driver_module, self._get_schema_query(db_info.type), **connect_kwargs + ) + if schemas_result.is_successful and schemas_result.schemas: + schemas_to_scan.update(schemas_result.schemas) + else: + logging.warning("Could not retrieve schema list. Falling back to username if available.") + if db_info.username: + schemas_to_scan.add(db_info.username) + logging.info(f"Final schemas to scan: {list(schemas_to_scan)}") + return schemas_to_scan + + def _get_table_details( + self, + driver_module: Any, + db_info: AllDBProfileInfo, + schema_name: str, + table_name: str, + connect_kwargs: dict[str, Any], + repository: UserDbRepository, + ) -> TableInfo: + """단일 테이블의 상세 정보(컬럼, 제약조건, 인덱스)를 조회합니다.""" + logging.info(f"Fetching details for table: {schema_name}.{table_name}") + columns_result = repository.find_columns( + driver_module, self._get_column_query(db_info.type), schema_name, db_info.type, table_name, **connect_kwargs + ) + try: + constraints = repository.find_constraints( + driver_module, db_info.type, schema_name, table_name, **connect_kwargs + ) + except (sqlite3.Error, self._get_driver_module(db_info.type).DatabaseError) as e: + logging.error(f"Error finding constraints for {schema_name}.{table_name}: {e}", exc_info=True) + raise APIException(CommonCode.FAIL_FIND_CONSTRAINTS) from e + try: + indexes = repository.find_indexes(driver_module, db_info.type, schema_name, table_name, **connect_kwargs) + except (sqlite3.Error, self._get_driver_module(db_info.type).DatabaseError) as e: + raise APIException(CommonCode.FAIL_FIND_INDEXES) from e + + table_info = TableInfo( + name=table_name, + columns=columns_result.columns if columns_result.is_successful else [], + constraints=constraints, + indexes=indexes, + comment=None, + ) + logging.info( + f"Successfully fetched details for {schema_name}.{table_name}. " + f"Columns: {len(table_info.columns)}, " + f"Constraints: {len(table_info.constraints)}, " + f"Indexes: {len(table_info.indexes)}" + ) + return table_info + def get_sample_rows( self, db_info: AllDBProfileInfo, table_infos: list[TableInfo], repository: UserDbRepository = user_db_repository ) -> dict[str, list[dict[str, Any]]]: @@ -249,11 +294,19 @@ def get_sample_rows( 테이블 정보 목록을 받아 각 테이블의 샘플 데이터를 조회하여 반환합니다. """ try: + if not table_infos: + return {} + driver_module = self._get_driver_module(db_info.type) connect_kwargs = self._prepare_connection_args(db_info) - # SQLite는 스키마 이름이 필요 없음 - schema_name = db_info.name if db_info.type != "sqlite" else "" + schema_name = "" + if db_info.type == "oracle": + schema_name = db_info.username # Oracle은 사용자 이름이 스키마 + elif db_info.type != "sqlite": + # TODO: PostgreSQL 등 다른 DB는 여러 스키마를 가질 수 있어 리팩토링 필요 + schema_name = db_info.name + table_names = [table.name for table in table_infos] return repository.find_sample_rows(driver_module, db_info.type, schema_name, table_names, **connect_kwargs) @@ -301,8 +354,9 @@ def _prepare_connection_args(self, db_info: DBProfileInfo) -> dict[str, Any]: kwargs["dbname"] = db_info.name elif db_info.type in ["mysql", "mariadb"]: kwargs["database"] = db_info.name - elif db_info.type == "oracle": - kwargs["dsn"] = f"{db_info.host}:{db_info.port}/{db_info.name}" + elif db_info.type.lower() == "oracle": + # dsn을 직접 사용하는 대신 service_name을 명시적으로 전달 + kwargs["service_name"] = db_info.name return kwargs @@ -316,7 +370,7 @@ def _get_schema_query(self, db_type: str) -> str | None: elif db_type in ["mysql", "mariadb"]: return "SELECT schema_name FROM information_schema.schemata" elif db_type == "oracle": - return "SELECT username FROM all_users" + return "SELECT username FROM all_users WHERE ORACLE_MAINTAINED = 'N'" elif db_type == "sqlite": return None return None @@ -346,7 +400,7 @@ def _get_table_query(self, db_type: str, for_all_schemas: bool = False) -> str | WHERE table_type = 'BASE TABLE' AND table_schema = %s """ elif db_type == "oracle": - return "SELECT table_name FROM all_tables WHERE owner = :owner" + return "SELECT table_name FROM user_tables" elif db_type == "sqlite": return "SELECT name FROM sqlite_master WHERE type='table'" return None @@ -395,11 +449,7 @@ def _get_column_query(self, db_type: str) -> str | None: WHERE table_schema = %s AND table_name = %s """ elif db_type == "oracle": - return """ - SELECT column_name, data_type, nullable, data_default, table_name - FROM all_tab_columns - WHERE owner = :owner AND table_name = :table - """ + return "SELECT column_name FROM user_tab_columns WHERE table_name = :table" elif db_type == "sqlite": return None return None