This repository has been archived by the owner on Jan 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
182 lines (169 loc) · 6.36 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import os
import asyncio
import glob
from typing import List, Dict, Union
from itertools import groupby
import xml.etree.ElementTree as ET
import aiohttp
from aiolimiter import AsyncLimiter
import pandas as pd
import numpy as np
import sys
sys.path.insert(0, "..")
from src.kmeans import KMeans, Score
async def download_xml(year: int, semaphore: asyncio.Semaphore) -> None:
"""
Realiza download assíncrono de xml para um único ano.
Argumentos:
year (int): Ano do arquivo xml.
semaphore (asyncio.Semaphore): Controlador de acesso concorrente.
"""
limiter = AsyncLimiter(1, 0.125)
USER_AGENT = ""
headers = {"User-Agent": USER_AGENT}
DATA_DIR = os.path.join(os.getcwd(), "../data")
if not os.path.exists(DATA_DIR):
os.mkdir(DATA_DIR)
url = f"https://www.al.sp.gov.br/repositorioDados/deputados/despesas_gabinetes_{str(year)}.xml"
async with aiohttp.ClientSession(headers=headers) as session:
await semaphore.acquire()
async with limiter:
async with session.get(url) as resp:
content = await resp.read()
semaphore.release()
file = f"despesas_gabinetes_{str(year)}.xml"
with open(os.path.join(DATA_DIR, file), "wb") as f:
f.write(content)
async def fetch_expenses(year_start: int, year_end: int) -> None:
"""
Realiza download assíncrono de xml para um período.
Argumentos:
year_start (int): Início do período.
year_end (int): Fim do período.
"""
tasks = set()
semaphore = asyncio.Semaphore(value=10)
for i in range(int(year_start), int(year_end) + 1):
task = asyncio.create_task(download_xml(i, semaphore))
tasks.add(task)
await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
def parse_data(list_files: List[str]) -> List[Dict[str, Union[str, None]]]:
"""
Interpreta dados dos arquivos xml e extrai informações relevantes.
Argumentos:
list_files (list): Lista dos caminhos para os arquivos xml.
Retorna:
data (list): Lista de dicionários de despesas.
"""
data = list()
for file in list_files:
tree = ET.parse(file)
xroot = tree.getroot()
for child in xroot.iter("despesa"):
cols = [elem.tag for elem in child]
values = [elem.text for elem in child]
data.append(dict(zip(cols, values)))
return data
# executa `fetch_expenses` no período de 2013 a 2022
asyncio.run(fetch_expenses(2013, 2022))
# observa se há o diretório `data`
if os.path.exists(os.path.join(os.getcwd(), "../data")):
# acessa diretório
os.chdir("../data")
# lista arquivos xml
files = glob.glob("*.xml")
# interpreta os arquivos
load = parse_data(files)
# armazena os dados na variável `despesas`
despesas = pd.DataFrame.from_dict(load)
# leitura dos data de IPCA
ipca = pd.read_csv("../data/ipca.csv")
# conversão da variável Data para datetime
ipca["Data"] = pd.to_datetime(ipca["Data"])
# parseamento da data
despesas["Data"] = pd.to_datetime(
despesas["Ano"].astype(str) + (despesas["Mes"].astype(str)).str.zfill(2) + "01"
)
# filtro da categoria de despesa
despesas = despesas[
despesas["Tipo"] == "I - HOSPEDAGEM, ALIMENTAÇÃO E DESPESAS DE LOCOMOÇÃO"
]
# manutenção das colunas estritamente necessárias
despesas = despesas[["Data", "CNPJ", "Valor"]]
# filtro a partir de 2018
despesas = despesas[despesas["Data"].dt.year > 2017]
# junção das duas bases
data = pd.merge(left=despesas, right=ipca, on="Data", how="inner")
# ajuste para o valor de dezembro de 2022
data["Valor_ref"] = ipca[ipca["Data"] == "2022-12-01"]["Valor"].values[0]
# cálculo da deflação
data["Valor_corrigido"] = round(
(data["Valor_ref"].astype(float) / data["Valor_y"].astype(float))
* data["Valor_x"].astype(float),
2,
)
# remoção de variáveis desnecessárias
data = data[["CNPJ", "Valor_corrigido"]]
# remoção de linhas com CNPJ nulos
data = data[data["CNPJ"].notnull()]
# filtro para CNPJs com apenas >= 20 entradas
data = data.groupby("CNPJ").filter(lambda x: len(x) >= 20)
# criação de listas para comportar os valores do método de silhueta e
# índice de Davies-Bouldin
sils, dbs = list(), list()
# inicialização do algoritmo de K-Means
kmeans = KMeans()
# organização dos dados
selecao_dados = sorted(zip(data["CNPJ"], data["Valor_corrigido"]), key=lambda x: x[0])
# lista vazia para resultados finais
resultados_lista = []
# iteração por CNPJ e coleção de despesas
for cnpj, grupo in groupby(selecao_dados, key=lambda x: x[0]):
# lista vazia de centroides
centroids_list = []
# conversão para array
values = np.array([item[1] for item in grupo])
# obtenção do k ideal
kmeans.k = kmeans.get_optimal_k(values.reshape(-1, 1))
# ajuste de dados ao algoritmo
kmeans.fit(values.reshape(-1, 1))
# detecção de anomalias
anomalies_kmeans = kmeans.detect(values.reshape(-1, 1))
# cálculo do método de silhueta
silhouette_score = Score.silhouette(
values.reshape(-1, 1), kmeans.get_labels(values.reshape(-1, 1))
)
# cálculo do índice de Davies-Bouldin
db_score = Score.daviesbouldin(
values.reshape(-1, 1), kmeans.get_labels(values.reshape(-1, 1))
)
# obtenção de labels
labels = kmeans.get_labels(values.reshape(-1, 1))
# iteração sobre labels e valores
for value, label in zip(values, labels):
# adição de label no dicionário
centroids_list.append({"centroid": kmeans.centroids[label][0]})
# contador zerado
centroid_idx = 0
# iteração sobre despesas
for value in values:
# atribuição de 1 para anomalia, 0 para não anomalia
is_anomaly = 1 if value in anomalies_kmeans else 0
# adição de dicionário na lista final
resultados_lista.append(
{
"CNPJ": cnpj,
"Valor": value,
"Anomalia": is_anomaly,
"Centroide": centroids_list[centroid_idx]["centroid"],
"Clusters": kmeans.k,
"Silhueta": silhouette_score,
"Davies_Bouldin": db_score,
}
)
# incremento do contador
centroid_idx += 1
# conversão dos resultados em dataframe
resultados = pd.DataFrame(resultados_lista)
# salvamento como csv
resultados.to_csv("../prd/resultado.csv", index=False, encoding="utf-8")