This repository has been archived by the owner on Jun 27, 2022. It is now read-only.
generated from thoth-station/template-project
-
Notifications
You must be signed in to change notification settings - Fork 8
/
app.py
executable file
·335 lines (258 loc) · 12.8 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
#!/usr/bin/env python3
# Qeb-Hwt GitHub App webhook receiver
# Copyright(C) 2019, 2020 Red Hat, Inc.
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This is Qeb-Hwt GitHub App webhook receiver."""
import os
import json
import logging
import pathlib
import random
import re
import time
import aiohttp
import asyncio
import gidgethub
from datetime import datetime
from urllib.parse import urljoin
from octomachinery.app.server.runner import run as run_app
from octomachinery.app.routing import process_event_actions, process_event
from octomachinery.app.routing.decorators import process_webhook_payload
from octomachinery.app.runtime.context import RUNTIME_CONTEXT
from octomachinery.github.config.app import GitHubAppIntegrationConfig
from octomachinery.github.api.app_client import GitHubApp
from octomachinery.github.api.raw_client import RawGitHubAPI
from octomachinery.utils.versiontools import get_version_from_scm_tag
from thoth.common import init_logging
from thoth.common import WorkflowManager
from thoth.report_processing.components.adviser import Adviser
from thoth.qeb_hwt.version import __version__ as qeb_hwt_version
init_logging()
_LOGGER = logging.getLogger("aicoe.sesheta")
_LOGGER.info(f"Qeb-Hwt GitHub App, v{qeb_hwt_version}")
logging.getLogger("octomachinery").setLevel(logging.DEBUG)
CHECK_RUN_NAME = "Thoth: Advise (Developer Preview)"
# no trailing / !
ADVISE_API_URL = os.getenv("ADVISE_API_URL", "https://khemenu.thoth-station.ninja/api/v1/advise/python/adviser_id")
USER_API_URL = os.getenv("USER_API_URL", "https://khemenu.thoth-station.ninja/api/v1/qeb-hwt")
MAX_CHARACTERS_LENGTH = 65535
tracer = None
@process_event("ping")
@process_webhook_payload
async def on_ping(*, hook, hook_id, zen):
"""React to ping webhook event."""
app_id = hook["app_id"]
_LOGGER.info("Processing hwtping for App ID %s " "with Hook ID %s " "sharing Zen: %s", app_id, hook_id, zen)
_LOGGER.info("GitHub App from context in ping handler: %s", RUNTIME_CONTEXT.github_app)
@process_event("integration_installation", action="created")
@process_webhook_payload
async def on_install(
action, # pylint: disable=unused-argument
installation,
sender, # pylint: disable=unused-argument
repositories=None, # pylint: disable=unused-argument
):
"""React to GitHub App integration installation webhook event."""
_LOGGER.info("installed event install id %s", installation["id"])
_LOGGER.info("installation=%s", RUNTIME_CONTEXT.app_installation)
@process_event_actions("pull_request", {"opened", "reopened", "synchronize", "edited"})
@process_webhook_payload
async def on_pr_open_or_sync(*, action, number, pull_request, repository, sender, installation, **kwargs):
"""React to an opened or changed PR event.
Send a status update to GitHub via Checks API.
"""
_LOGGER.info(f"on_pr_open_or_sync: working on PR {pull_request['html_url']}")
github_api = RUNTIME_CONTEXT.app_installation_client
pr_head_sha = pull_request["head"]["sha"]
base_repo_url = pull_request["base"]["repo"]["url"]
repo_url = pull_request["head"]["repo"]["html_url"]
check_runs_base_uri = f"{base_repo_url}/check-runs"
if pr_head_sha is None:
_LOGGER.error(f"on_pr_open_or_sync: no Pull Request head sha found, stopped working!")
return
_LOGGER.info(f"on_pr_open_or_sync: head_repo_url/origin {repo_url} will be used for check-run")
_LOGGER.info(f"on_pr_open_or_sync: base_repo_url {base_repo_url} will be used for check-run")
_LOGGER.info(f"on_pr_open_or_sync: PR commit id {pr_head_sha} will be used for check-run")
resp = await github_api.post(
check_runs_base_uri,
preview_api_version="antiope",
data={
"name": CHECK_RUN_NAME,
"head_sha": pr_head_sha,
"status": "queued",
"started_at": f"{datetime.utcnow().isoformat()}Z",
},
)
check_run_id = int(resp["id"]) # TODO do we need some marshaling here?
check_runs_updates_uri = f"{check_runs_base_uri}/{check_run_id}"
_LOGGER.info(f"on_pr_open_or_sync: check_run_id: {check_run_id}")
data = {
"github_event_type": "thoth_thamos_advise",
"github_check_run_id": check_run_id,
"github_installation_id": installation["id"],
"github_base_repo_url": base_repo_url,
"github_head_repo_url": repo_url,
"origin": repo_url,
"revision": pr_head_sha,
}
async with aiohttp.ClientSession() as session:
resp = await session.post(USER_API_URL, json=data)
_LOGGER.info(f"on_pr_open_or_sync: user-api resp: {resp}")
# TODO: add timeout to keep the github check status sane
resp = await github_api.patch(
check_runs_updates_uri,
preview_api_version="antiope",
data={"name": CHECK_RUN_NAME, "head_sha": pr_head_sha, "status": "in_progress"},
)
# We simply extend the GitHub Event set for our use case ;)
@process_event("thoth_thamos_advise", action="finished")
@process_webhook_payload
async def on_thamos_workflow_finished(*, action, base_repo_url, check_run_id, installation, payload, **kwargs):
"""Advise workflow has finished, now we need to send a check-run to the PR."""
_LOGGER.info("on_thamos_workflow_finished: %s", kwargs)
github_api: RawGitHubAPI = RUNTIME_CONTEXT.app_installation_client
_LOGGER.info("on_thamos_workflow_finished: github_api=%s", github_api)
repo = base_repo_url.split("/", 4)[-1] # i.e.: thoth-station/Qeb-Hwt
check_runs_url = f"https://api.github.com/repos/{repo}/check-runs/{check_run_id}"
_LOGGER.info("on_thamos_workflow_finished: check_runs_url=%s", check_runs_url)
advise_url: str
conclusion: str
justification: str
report: str
text: str
report_message: str
async with aiohttp.ClientSession() as session:
_LOGGER.info("on_thamos_workflow_finished: payload=%s", payload)
if "exception" in payload:
exception = payload["exception"]
else:
exception = None
analysis_id = payload["analysis_id"]
_LOGGER.info("on_thamos_workflow_finished: analysis_id=%s", analysis_id)
advise_url = urljoin(ADVISE_API_URL, analysis_id)
_LOGGER.info("on_thamos_workflow_finished: advise_url=%s", advise_url)
if exception:
_LOGGER.info("on_thamos_workflow_finished: exception=%s", exception)
if "error_type" in payload:
error_type: str = payload["error_type"]
if error_type and error_type == "MissingThothYamlFile":
conclusion = "action_required"
else:
conclusion = "failure"
else:
conclusion = "failure"
justification = exception
if exception == "Internal server error occurred, please contact administrator with provided details.":
justification += (
"\nThoth Team is working to solve the issue as soon as possible. Thanks for your patience!"
)
report = "Report not produced."
text = report
report_message = ""
if analysis_id:
# TODO: Find alternative solution to this workround
attempts = 1
max_attempts = 6
while attempts < max_attempts:
try:
async with session.get(advise_url) as response:
_LOGGER.info("on_thamos_workflow_finished: response=%s", response)
_LOGGER.info("on_thamos_workflow_finished: attempts=%s", attempts)
if response.status == 200:
attempts = max_attempts
else:
attempts += 1
except Exception:
continue
async with session.get(advise_url) as response:
if response.status != 200:
conclusion = "failure"
justification = "Could not retrieve analysis results."
report = ""
text = "Report cannot be provided, Please open an issue on Qeb-Hwt."
report_message = ""
else:
adviser_payload: dict = await response.json()
adviser_result: dict = adviser_payload["result"]
if adviser_result["error"]:
error_msg: str = adviser_result["error_msg"]
conclusion = "failure"
justification = f"Analysis has encountered errors: {error_msg}."
if adviser_result["report"]:
report = adviser_result["report"]
text = "See the report below for more details."
report_message = "See the document below for more details."
else:
conclusion = "failure"
justification = f"Analysis has encountered errors: {error_msg}."
if adviser_result["report"]:
report = adviser_result["report"]
text = "See the report below for more details."
report_message = "See the document below for more details."
else:
text = "Analysis report is missing."
report_message = "See the document below for more details."
else:
conclusion = "success"
adviser_report: dict = adviser_result["report"]
justification = Adviser.create_pretty_report_from_json(report=adviser_report, is_justification=True)
# Complete report
report = Adviser.create_pretty_report_from_json(report=adviser_report)
_LOGGER.info("on_thamos_workflow_finished: len(report)=%s", len(report))
# TODO: Split report results to include only relevant information
if len(report) > MAX_CHARACTERS_LENGTH:
_LOGGER.warning("on_thamos_workflow_finished: reduced len(report)=%s", len(report))
text = f"Analysis report:\n{report}"
report_message = "See the document below for more details."
else:
analysis_id = "No-analysis-run"
_LOGGER.info("on_thamos_workflow_finished: sending check run: check_runs_url=%s", check_runs_url)
_LOGGER.info("on_thamos_workflow_finished: sending check run: conclusion=%s", conclusion)
_LOGGER.info("on_thamos_workflow_finished: sending check run: advise_url=%s", advise_url)
_LOGGER.info("on_thamos_workflow_finished: sending check run: analysis_id=%s", analysis_id)
_LOGGER.info("on_thamos_workflow_finished: sending check run: text=%s", text)
_LOGGER.info("on_thamos_workflow_finished: sending check run: text=%s", report_message)
try:
_LOGGER.info("on_thamos_workflow_finished: installation_id=%s, check_run_url=%s", installation, check_runs_url)
await github_api.patch(
check_runs_url,
preview_api_version="antiope",
data={
"name": CHECK_RUN_NAME,
"status": "completed",
"conclusion": conclusion,
"completed_at": f"{datetime.utcnow().isoformat()}Z",
"details_url": advise_url,
"external_id": analysis_id,
"output": {
"title": "Thoth's Advise",
"text": text,
"summary": (
f"Thoth's adviser finished with conclusion: '{conclusion}'\n\n"
f"Justification:\n{justification}\n\n"
f"{report_message}"
),
},
},
)
except gidgethub.BadRequest as exc:
_LOGGER.error(exc)
_LOGGER.info(f"on_thamos_workflow_finished: finished with `thamos advise`, updated %s", check_run_id)
if __name__ == "__main__":
_LOGGER.setLevel(logging.DEBUG)
_LOGGER.debug("Debug mode turned on")
run_app( # pylint: disable=expression-not-assigned
name="Qeb-Hwt GitHub App", version=qeb_hwt_version, url="https://github.com/apps/qeb-hwt",
)