-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_data.py
66 lines (50 loc) · 1.86 KB
/
load_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sqlite3
import psycopg2
import logging
from config import Config
from psycopg2.extensions import connection as _connection
from psycopg2.extras import DictCursor
from dotenv import load_dotenv
from utils import SQLiteLoader, PostgresSaver
load_dotenv()
def load_from_sqlite(connection: sqlite3.Connection, pg_conn: _connection):
"""Основной метод загрузки данных из SQLite в Postgres"""
cursor = connection.cursor()
pg_cursor = pg_conn.cursor()
sqlite_loader = SQLiteLoader(cursor, connection)
data_names_of_tables = sqlite_loader.load_tables_names()
data_file_paths = []
try:
for name_of_table in data_names_of_tables:
data_table_info = sqlite_loader.load_table_columns_info(name_of_table)
(
attrs,
unconflict_attrs,
) = sqlite_loader.forming_attrs_and_attrs_for_unconflict(
name_of_table, data_table_info
)
sqlite_loader.load_data(name_of_table, attrs, 10, data_file_paths)
cols_slots = sqlite_loader.create_cols_slots(name_of_table)
save_to_postgres = PostgresSaver(pg_cursor)
save_to_postgres.save_all_data(
name_of_table, cols_slots, attrs, unconflict_attrs, sqlite_loader
)
print("done!")
except Exception as e:
logging.exception()
finally:
connection.commit()
connection.close()
if __name__ == "__main__":
dsl = {
"dbname": Config.DB_NAME,
"user": Config.DB_USER,
"password": Config.DB_PASSWORD,
"host": Config.DB_HOST,
"port": Config.DB_PORT,
}
print("dsl", dsl)
with sqlite3.connect("./db.sqlite") as sqlite_conn, psycopg2.connect(
**dsl, cursor_factory=DictCursor
) as pg_conn:
load_from_sqlite(sqlite_conn, pg_conn)