This repository has been archived by the owner on Jun 22, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
200 lines (141 loc) · 5.69 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
import http
import logging
import os
import xml.etree.ElementTree as ET
from collections import namedtuple
import mysql.connector
import requests
from apscheduler.schedulers.blocking import BlockingScheduler
from dotenv import load_dotenv
FORMAT = "%(asctime)-15s %(name)s:%(lineno)s %(levelname)s:%(message)s"
logging.basicConfig(format=FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
SS = "ss"
MLWH = "mlwh"
def get_config(testing: bool = False): # type: ignore
"""
Get the required config parameters
"""
if testing:
load_dotenv()
required_config = (
"MLWH_DB_HOST",
"MLWH_DB_PORT",
"MLWH_DB_USER",
"MLWH_DB_PASSWORD",
"MLWH_DB_DBNAME",
"SS_DB_HOST",
"SS_DB_PORT",
"SS_DB_USER",
"SS_DB_PASSWORD",
"SS_DB_DBNAME",
"SS_URL_HOST",
)
for config in required_config:
if not os.getenv(config):
raise Exception(f"A required config is missing: {config}")
ss_config_keys = list(filter(lambda x: "SS_" in x, required_config))
mlwh_config_keys = list(filter(lambda x: "MLWH_" in x, required_config))
Config = namedtuple("Config", [SS, MLWH]) # type: ignore
SsConfig = namedtuple("SsConfig", [conf[conf.find("_") + 1 :].lower() for conf in ss_config_keys]) # type: ignore # noqa: E501, E203
MlwhConfig = namedtuple("MlwhConfig", [conf[conf.find("_") + 1 :].lower() for conf in mlwh_config_keys]) # type: ignore # noqa: E501, E203
ss_dict = {item[item.find("_") + 1 :].lower(): os.getenv(item) for item in ss_config_keys} # noqa: E203
mlwh_dict = {item[item.find("_") + 1 :].lower(): os.getenv(item) for item in mlwh_config_keys} # noqa: E203
ss_config = SsConfig(**ss_dict) # type: ignore
mlwh_config = MlwhConfig(**mlwh_dict) # type: ignore
config = Config(**{SS: ss_config, MLWH: mlwh_config}) # type: ignore
logger.debug(config)
return config
def connect_to_db(config, db):
"""
Connects to the given database using the MySQL connector.
"""
db_config = getattr(config, db)
connection = mysql.connector.connect(
user=db_config.db_user,
password=db_config.db_password,
host=db_config.db_host,
port=db_config.db_port,
database=db_config.db_dbname,
)
cursor = connection.cursor()
return connection, cursor
def past_tense(status):
"""Get the past tense of the words pass and fail ONLY"""
return status + "ed"
def qc_status(qc_seq):
"""Simple rule to determine the QC status using the 'qc_seq' field"""
if qc_seq in (None, 1):
return "pass"
return "fail"
def build_xml_document(asset_id, status):
"""Build a XML document used to send with the POST request to SS"""
qc_information_tag = ET.Element("qc_information")
message_tag = ET.SubElement(qc_information_tag, "message")
message_tag.text = f"Asset {asset_id} {past_tense(status)} manual qc"
xml_doc_string = ET.tostring(qc_information_tag, encoding="unicode", method="xml", xml_declaration=True)
logger.debug(xml_doc_string)
return xml_doc_string
def url_for_action(config, asset_id, status):
"""Get the URL required for the status update"""
url = f"{config.ss.url_host}/npg_actions/assets/{asset_id}/{status}_qc_state"
logger.debug(f"URL: {url}")
return url
def send_npg_action(config, asset_id, status):
"""Send an HTTP POST request to the SS endpoint provided for NPG"""
xml_document = build_xml_document(asset_id, status)
logger.info(f"Sending {status} for asset {asset_id}")
headers = {"Content-Type": "application/xml"}
url = url_for_action(config, asset_id, status)
response = requests.post(url, data=xml_document, headers=headers)
if response.status_code != http.HTTPStatus.OK:
raise Exception(f"Error while processing {url}")
def get_lanes_info(config):
"""Get the information about the lanes from SS"""
lanes_info = []
connection, cursor = connect_to_db(config, SS)
with open("ss_script.sql") as f:
cursor.execute(f.read())
for (target_asset_id, position, batch_id) in cursor:
record = {"target_asset_id": target_asset_id, "position": position, "batch_id": batch_id}
lanes_info.append(record)
logger.info(f"Lanes to process: {len(lanes_info)}")
cursor.close()
connection.close()
return lanes_info
def find_and_complete(config, lanes_info):
"""Find the run in MLWH and send PASS or FAIL to SS"""
connection, cursor = connect_to_db(config, MLWH)
with open("mlwh_script.sql") as f:
mlwh_query_template = f.read()
for lane in lanes_info:
logger.info(f'{"-" * 80}')
logger.info(f"lane info: {lane}")
mlwh_query = mlwh_query_template
for r in (("__BATCH_ID__", str(lane["batch_id"])), ("__POSITION__", str(lane["position"]))):
mlwh_query = mlwh_query.replace(*r)
cursor.execute(mlwh_query)
for (id_run, position, qc_seq) in cursor:
logger.debug(f"id_run: {id_run}, position: {position}, qc_seq: {qc_seq}")
send_npg_action(config, lane["target_asset_id"], qc_status(qc_seq))
cursor.close()
connection.close()
def magic(config):
try:
lanes_info = get_lanes_info(config)
find_and_complete(config, lanes_info)
logger.info("DONE")
except (Exception, mysql.connector.Error) as error:
logging.error(error)
logger.exception(error)
def main():
scheduler = BlockingScheduler()
try:
config = get_config()
scheduler.add_job(magic, "interval", (config,), hours=12)
scheduler.start()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == "__main__":
main()