-
Notifications
You must be signed in to change notification settings - Fork 3
/
db.py
75 lines (59 loc) · 2.27 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import json
import os
import sys
from app.config.base import db_settings
from dotenv import load_dotenv
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
load_dotenv()
# Set the default values for connecting locally
HOST = db_settings.DB_HOSTNAME
PORT = db_settings.DB_PORT
DBNAME = db_settings.DB_NAME
USERNAME = db_settings.DB_USERNAME
PASSWORD = db_settings.DB_PASSWORD
if "pytest" in sys.modules:
SQLALCHEMY_DATABASE_URL = "sqlite://"
engine = create_engine(
SQLALCHEMY_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
elif "PYTHON_FASTAPI_TEMPLATE_CLUSTER_SECRET" in os.environ:
print("Connecting to database on RDS..\n")
dbSecretJSON = os.environ["PYTHON_FASTAPI_TEMPLATE_CLUSTER_SECRET"]
dbSecretParsed = json.loads(dbSecretJSON)
HOST = dbSecretParsed["host"]
PORT = dbSecretParsed["port"]
DBNAME = dbSecretParsed["dbname"]
USERNAME = dbSecretParsed["username"]
PASSWORD = dbSecretParsed["password"]
engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}/{DBNAME}")
else:
print("Connecting local database..\n")
engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}/{DBNAME}")
meta = MetaData()
# Test the connection and print the status
try:
conn = engine.connect()
print("-------------------------- Database connected ----------------------------")
print(f"{{ \n\tdb_uri: mysql:{USERNAME}@{HOST}/{DBNAME} \n }}")
print("\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
except Exception as e:
print(f"Failed to connect to database. Error: {e}")
raise Exception(f"Failed to connect to database. Error: {e}")
localSession = Session(engine)
def create_local_session() -> Session:
"""Factory function that returns a new session object"""
engine = create_engine(f"mysql+pymysql://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DBNAME}")
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
db = SessionLocal()
try:
yield db
finally:
db.close()
Base = declarative_base()