-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathbootstrap.py
114 lines (88 loc) · 2.88 KB
/
bootstrap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import asyncio
import logging
from adapters import orm, steam
from adapters.faceit import FACEITAPI
from bot import bot, config
from messages import commands, events
from messages.broker import Broker
from messages.bus import MessageBus
from services.uow import SqlUnitOfWork
from shared.log import logging_config
from shared.utils import add_signal_handlers, sentry_init
logging_config(config.DEBUG)
log = logging.getLogger(__name__)
async def bootstrap(
uow_type,
start_orm: bool,
start_steam: bool,
start_faceit: bool,
start_bot: bool,
restore: bool,
) -> MessageBus:
# close_tasks = add_signal_handlers()
close_tasks = []
if start_orm:
await orm.start_orm()
bus = MessageBus(
dependencies=dict(video_upload_url=config.VIDEO_UPLOAD_URL, tokens=config.TOKENS),
factories=dict(uow=uow_type),
)
broker = Broker(
bus=bus,
identifier="bot",
publish_commands={
commands.RequestDemoParse,
commands.RequestPresignedUrl,
commands.RequestRecording,
},
consume_events={
events.PresignedUrlGenerated,
},
)
gather = asyncio.Event()
waiters = list()
if start_steam:
client, fetcher, steam_waiter = await steam.get_match_fetcher(config.STEAM_REFRESH_TOKEN)
close_tasks.append(client.close)
bus.add_dependencies(sharecode_resolver=fetcher)
waiters.append(steam_waiter)
else:
# TODO: remove this
bus.add_dependencies(sharecode_resolver=None)
if start_faceit:
faceit_api = FACEITAPI(api_key=config.FACEIT_API_KEY)
bus.add_dependencies(faceit_resolver=faceit_api.match)
bus.register_decos()
if start_bot:
bot_instance = bot.start_bot(bus, gather)
close_tasks.append(bot_instance.close)
waiters.append(bot_instance.wait_until_ready())
await asyncio.gather(*waiters)
await broker.start(config.RABBITMQ_HOST)
gather.set()
close_tasks.append(broker.connection.close)
log.info("Ready to bot!")
# this restarts any jobs that were in selectland
# within the last 12 (at the time of writing, anyway)
# minutes last we restarted
if restore:
await bus.dispatch(commands.Restore())
return bus
def main():
if config.SENTRY_DSN:
sentry_init(config.SENTRY_DSN)
loop = asyncio.get_event_loop()
logging.getLogger("aio_pika").setLevel(logging.INFO)
logging.getLogger("aiormq.connection").setLevel(logging.INFO)
coro = bootstrap(
uow_type=SqlUnitOfWork,
start_orm=True,
start_steam=True,
start_faceit=True,
start_bot=True,
restore=True,
)
loop.create_task(coro)
loop.run_forever()
if __name__ == "__main__":
main()