-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
210 lines (149 loc) · 6.08 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
import pandas as pd
import glob
import os
import psycopg2
from sqlalchemy import create_engine
import time
import frictionless
from config import remove_path_redundancies, dtypes_from_datapackage
DB_NAME = 'dadosmg'
DATASETS_DIR = 'despesa/'
DATASET_DIR = 'datasets/'
DATA_DIR = 'data/'
DATA_PATH = DATASET_DIR + DATASETS_DIR + DATA_DIR
# True Dropa todas as tabelas atuais da database
DROP_TABLES = True
def drop_tables(con=None):
"""
Drop all tables from a database.
:param con: connection object to the database.
:return: none.
"""
cur = con.cursor()
cur.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'""")
res = cur.fetchall()
if res:
for table_name in res:
cur.execute(f"""DROP TABLE {table_name[0]} """)
print(f"Tabela {table_name} apagada.")
else:
print(f"Não há tabelas em {DB_NAME}")
con.commit()
def create_database(database_name):
"""
Creates a database with the given name, if not exists already.
:param database_name: Name of the database to be created.
:return: none.
"""
con = psycopg2.connect("user=postgres password=postgres host=localhost")
con.autocommit = True # To avoid the need of rollbacks
cur = con.cursor()
cur.execute(f"SELECT 1 FROM pg_catalog.pg_database WHERE datname = '{database_name}'")
exists = cur.fetchone()
if not exists:
cur.execute(f"CREATE DATABASE {database_name}")
con.autocommit = False
con.commit()
cur.close()
con.close()
def tables_from_csv(package, connection_string):
"""
Reads csv files and create one table with them.
:param file_paths: list of complete paths to each csv file.
:param con: connection object to the database.
:return: none.
"""
db = create_engine(connection_string)
conn = db.connect()
for resource in package.resources:
#resource = package.get_resource(name)
file = remove_path_redundancies(resource)
print(f"Lendo {file}")
df = pd.read_csv(file, delimiter=';', decimal='.')
df = dtypes_from_datapackage(resource, df)
# if_exists{‘fail’, ‘replace’, ‘append’}, default ‘fail’
df.to_sql(resource.name, conn, if_exists='replace', index=False, )
print(f"Arquivo {file} carregado para tabela {resource.name}\n")
conn.commit()
print('-------------------------------------------------------\n')
def append_from_csv(package, connection_string, tbl_agg_name):
"""
Reads csv files and append in the same table.
:param file_paths_append: list of complete paths to each csv file.
:param tbl_agg_name: name of the table that will be created and the data appended
:return: none.
"""
df_agg = pd.DataFrame()
num_linhas = 0
exec_error = False
files_error = []
# postgres_engine = create_engine(f"postgresql+psycopg2://postgres:postgres@localhost/{DB_NAME}")
db = create_engine(connection_string)
conn = db.connect()
for resource in package.resources:
file = remove_path_redundancies(resource)
print(f"Lendo arquivo {file}")
df = pd.read_csv(file, delimiter=';', decimal='.')
df = dtypes_from_datapackage(resource, df )
# try:
# if_exists{‘fail’, ‘replace’, ‘append’}, default ‘fail’
df.to_sql(tbl_agg_name, conn, if_exists='append', index=False, )
conn.commit()
print(f"Arquivo {file} concatenado na tabela {tbl_agg_name}\n")
num_linhas += len(df)
print('-------------------------------------------------------\n')
#except:
# print(f"ERRO: Arquivo {file} está vazio ou contém schema divergente dos demais.\n")
# exec_error = True
# files_error.append(file)
print('-------------------------------------------------------')
print('Total de linhas das tabelas lidas:', num_linhas)
# Alerta para falha no carregamento de arquivos. Arquivos somente com cabeçalhos geram esse erro.
if exec_error:
print(f"ATENÇÃO! os seguintes arquivos não foram carregados para a base de dados:")
print(files_error, sep='\n')
print('\n\n')
def show_tables(con=None):
"""
Show all tables from a database.
:param con: connection object to the database.
:return: none.
"""
cur = con.cursor()
cur.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'""")
res = cur.fetchall()
if res:
print(res)
else:
print(f"Não há tabelas na database {DB_NAME}")
cur.close()
if __name__ == '__main__':
start_time = time.time()
create_database(DB_NAME)
con = psycopg2.connect(user="postgres",
password="postgres",
host="localhost",
database=DB_NAME)
connection_string = f"postgresql+psycopg2://postgres:postgres@localhost/{DB_NAME}"
package = frictionless.Package('datasets/despesa/data/datapackage.json')
# paths de bases csv que sao separadas por anos
resources_desp = [package.get_resource(x) for x in package.resource_names if "dm_empenho_desp_" in x]
resources_ft = [package.get_resource(x) for x in package.resource_names if "ft_despesa_" in x]
# paths de bases csv que não são separadas por anos
singleTable_resources = [r for r in package.resources if r not in resources_desp and r not in resources_ft]
singleTable_resources = frictionless.Package(resources=singleTable_resources)
resources_desp = frictionless.Package(resources=resources_desp)
resources_ft = frictionless.Package(resources=resources_ft)
if DROP_TABLES:
drop_tables(con)
tables_from_csv(singleTable_resources, connection_string)
append_from_csv(resources_desp, connection_string, 'dm_empenho_desp')
append_from_csv(resources_ft, connection_string, 'ft_despesa')
end_time = time.time()
print(f"Tempo Total de execução: {end_time - start_time}")