forked from nim65s/matrix-webhook
-
Notifications
You must be signed in to change notification settings - Fork 0
/
matrix_webhook.py
executable file
·104 lines (81 loc) · 2.99 KB
/
matrix_webhook.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python3
"""
Matrix Webhook.
Post a message to a matrix room with a simple HTTP POST
v1: matrix-client & http.server
v2: matrix-nio & aiohttp & markdown
"""
import asyncio
import json
import os
import logging
from http import HTTPStatus
from signal import SIGINT, SIGTERM
from aiohttp import web
from markdown import markdown
from nio import AsyncClient
SERVER_ADDRESS = ('', int(os.environ.get('PORT', 4785)))
MATRIX_URL = os.environ.get('MATRIX_URL', 'https://matrix.org')
MATRIX_ID = os.environ.get('MATRIX_ID', '@wwm:matrix.org')
MATRIX_PW = os.environ['MATRIX_PW']
API_KEY = os.environ['API_KEY']
CLIENT = AsyncClient(MATRIX_URL, MATRIX_ID)
async def handler(request):
"""
Coroutine given to the server, st. it knows what to do with an HTTP request.
This one handles a POST, checks its content, and forwards it to the matrix room.
"""
data = await request.read()
try:
data = json.loads(data.decode())
status, ret = HTTPStatus.BAD_REQUEST, 'I need a json dict with text & key'
except json.decoder.JSONDecodeError:
data = {}
status, ret = HTTPStatus.BAD_REQUEST, 'This was not valid JSON'
if all(key in data for key in ['text', 'key', 'room']):
status, ret = HTTPStatus.UNAUTHORIZED, 'I need the good "key"'
if data['key'] == API_KEY:
status, ret = HTTPStatus.OK, 'OK'
await CLIENT.room_send(room_id=data['room'],
# str(request.rel_url)[1:],
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": data['text'],
"format": "org.matrix.custom.html",
"formatted_body": markdown(data['text'], extensions=['extra']),
})
return web.Response(text='{"status": %i, "ret": "%s"}' % (status, ret),
content_type='application/json',
status=status)
async def main(event):
"""
Launch main coroutine.
matrix client login & start web server
"""
await CLIENT.login(MATRIX_PW)
server = web.Server(handler)
runner = web.ServerRunner(server)
logging.basicConfig(level=logging.DEBUG)
await runner.setup()
site = web.TCPSite(runner, *SERVER_ADDRESS)
await site.start()
# Run until we get a shutdown request
await event.wait()
# Cleanup
await runner.cleanup()
await CLIENT.close()
def terminate(event, signal):
"""Close handling stuff."""
event.set()
asyncio.get_event_loop().remove_signal_handler(signal)
def run():
"""Launch everything."""
loop = asyncio.get_event_loop()
event = asyncio.Event()
for sig in (SIGINT, SIGTERM):
loop.add_signal_handler(sig, terminate, event, sig)
loop.run_until_complete(main(event))
loop.close()
if __name__ == '__main__':
run()