diff --git a/interface/lang2sql.py b/interface/lang2sql.py index 1779a5a..973831a 100644 --- a/interface/lang2sql.py +++ b/interface/lang2sql.py @@ -121,7 +121,6 @@ def display_result( db = ConnectDB() -db.connect_to_clickhouse() st.title("Lang2SQL") diff --git a/llm_utils/connect_db.py b/llm_utils/connect_db.py index aa2c099..447c7b2 100644 --- a/llm_utils/connect_db.py +++ b/llm_utils/connect_db.py @@ -1,44 +1,103 @@ +""" +이 모듈은 ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하여 결과를 pandas DataFrame으로 반환하는 기능을 제공합니다. + +구성 요소: +- 환경 변수에서 접속 정보를 불러와 ClickHouse 서버에 연결합니다. +- SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. +- 연결 실패 및 쿼리 오류에 대해 로깅을 통해 디버깅을 지원합니다. +""" + +import logging import os -from typing import Union +from typing import Optional + import pandas as pd from clickhouse_driver import Client from dotenv import load_dotenv -# 환경변수 load_dotenv() +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) +logger = logging.getLogger(__name__) + class ConnectDB: + """ + ClickHouse 데이터베이스에 연결하고 SQL 쿼리를 실행하는 클래스입니다. + + 환경 변수에서 접속 정보를 로드하여 ClickHouse 서버에 연결하며, + SQL 쿼리 실행 결과를 pandas DataFrame으로 반환합니다. + """ + def __init__(self): - self.client = None + """ + ConnectDB 클래스의 인스턴스를 초기화합니다. + + 환경 변수에서 ClickHouse 접속 정보를 읽고, 즉시 서버에 연결을 시도합니다. + """ + + self.client: Optional[Client] = None self.host = os.getenv("CLICKHOUSE_HOST") self.dbname = os.getenv("CLICKHOUSE_DATABASE") self.user = os.getenv("CLICKHOUSE_USER") self.password = os.getenv("CLICKHOUSE_PASSWORD") self.port = os.getenv("CLICKHOUSE_PORT") - def connect_to_clickhouse(self): - - # ClickHouse 서버 정보 - self.client = Client( - host=self.host, - port=self.port, - user=self.user, - password=self.password, - database=self.dbname, # 예: '127.0.0.1' # 기본 TCP 포트 - ) - - def run_sql(self, sql: str) -> Union[pd.DataFrame, None]: - if self.client: - try: - result = self.client.execute(sql, with_column_types=True) - # 결과와 컬럼 정보 분리 - rows, columns = result - column_names = [col[0] for col in columns] - - # Create a pandas dataframe from the results - df = pd.DataFrame(rows, columns=column_names) - return df - - except Exception as e: - raise e + self.connect_to_clickhouse() + + def connect_to_clickhouse(self) -> None: + """ + ClickHouse 서버에 연결을 시도합니다. + + 연결에 성공하면 client 객체가 설정되며, 실패 시 예외를 발생시킵니다. + 연결 상태는 로깅을 통해 출력됩니다. + """ + + try: + self.client = Client( + host=self.host, + port=self.port, + user=self.user, + password=self.password, + database=self.dbname, + ) + logger.info("Successfully connected to ClickHouse.") + except Exception as e: + logger.error("Failed to connect to ClickHouse: %s", e) + raise + + def run_sql(self, sql: str) -> pd.DataFrame: + """ + SQL 쿼리를 실행하고 결과를 pandas DataFrame으로 반환합니다. + 내부적으로 ClickHouse 클라이언트가 없으면 자동으로 재연결을 시도합니다. + + Parameters: + sql (str): 실행할 SQL 쿼리 문자열 + + Returns: + pd.DataFrame: 쿼리 실행 결과를 담은 DataFrame 객체 + + Raises: + Exception: SQL 실행 중 오류 발생 시 예외를 발생시킵니다. + """ + + if not self.client: + logger.warning( + "ClickHouse client is not initialized. Attempting to reconnect..." + ) + self.connect_to_clickhouse() + + try: + result = self.client.execute(sql, with_column_types=True) + rows, columns = result + column_names = [col[0] for col in columns] + df = pd.DataFrame(rows, columns=column_names) + return df + + except Exception as e: + logger.exception("An error occurred while executing SQL: %s", e) + raise