Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8c3e205

Browse files
committedAug 29, 2021
Accept matchmaking requests from rabbitmq
1 parent fa1b2bf commit 8c3e205

File tree

7 files changed

+463
-29
lines changed

7 files changed

+463
-29
lines changed
 

‎server/ladder_service.py

+152-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from collections import defaultdict
99
from typing import Dict, List, Optional, Set, Tuple
1010

11+
import aio_pika
1112
import aiocron
1213
from sqlalchemy import and_, func, select, text, true
1314

@@ -30,9 +31,12 @@
3031
matchmaker_queue_map_pool
3132
)
3233
from .decorators import with_logger
34+
from .factions import Faction
3335
from .game_service import GameService
3436
from .games import Game, InitMode, LadderGame
3537
from .matchmaker import MapPool, MatchmakerQueue, OnMatchedCallback, Search
38+
from .message_queue_service import MessageQueueService
39+
from .player_service import PlayerService
3640
from .players import Player, PlayerState
3741
from .protocol import DisconnectedError
3842
from .types import GameLaunchOptions, Map, NeroxisGeneratedMap
@@ -55,17 +59,35 @@ def __init__(
5559
self,
5660
database: FAFDatabase,
5761
game_service: GameService,
62+
player_service: PlayerService,
63+
message_queue_service: MessageQueueService
5864
):
5965
self._db = database
6066
self._informed_players: Set[Player] = set()
6167
self.game_service = game_service
68+
self.player_service = player_service
69+
self.message_queue_service = message_queue_service
6270
self.queues = {}
71+
self._initialized = False
6372

6473
self._searches: Dict[Player, Dict[str, Search]] = defaultdict(dict)
6574

6675
async def initialize(self) -> None:
76+
if self._initialized:
77+
return
78+
6779
await self.update_data()
80+
await self.message_queue_service.declare_exchange(
81+
config.MQ_EXCHANGE_NAME
82+
)
83+
await self.message_queue_service.consume(
84+
config.MQ_EXCHANGE_NAME,
85+
"request.match.create",
86+
self.handle_mq_matchmaking_request
87+
)
88+
6889
self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
90+
self._initialized = True
6991

7092
async def update_data(self) -> None:
7193
async with self._db.acquire() as conn:
@@ -325,6 +347,135 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None:
325347
)
326348
})
327349

350+
async def handle_mq_matchmaking_request(
351+
self,
352+
message: aio_pika.IncomingMessage
353+
):
354+
try:
355+
game = await self._handle_mq_matchmaking_request(message)
356+
except Exception as e:
357+
if isinstance(e, GameLaunchError):
358+
code = "launch_failed"
359+
args = [{"player_id": player.id} for player in e.players]
360+
elif isinstance(e, json.JSONDecodeError):
361+
code = "malformed_request"
362+
args = [{"message": str(e)}]
363+
elif isinstance(e, KeyError):
364+
code = "malformed_request"
365+
args = [{"message": f"missing {e.args[0]}"}]
366+
else:
367+
code, *args = e.args
368+
369+
await self.message_queue_service.publish(
370+
config.MQ_EXCHANGE_NAME,
371+
"error.match.create",
372+
{"error_code": code, "args": args},
373+
correlation_id=message.correlation_id
374+
)
375+
else:
376+
await self.message_queue_service.publish(
377+
config.MQ_EXCHANGE_NAME,
378+
"success.match.create",
379+
{"game_id": game.id},
380+
correlation_id=message.correlation_id
381+
)
382+
383+
async def _handle_mq_matchmaking_request(
384+
self,
385+
message: aio_pika.IncomingMessage
386+
):
387+
self._logger.debug(
388+
"Got matchmaking request: %s", message.correlation_id
389+
)
390+
request = json.loads(message.body)
391+
# TODO: Use id instead of name?
392+
queue_name = request.get("matchmaker_queue")
393+
map_name = request["map_name"]
394+
game_name = request["game_name"]
395+
participants = request["participants"]
396+
featured_mod = request.get("featured_mod")
397+
if not featured_mod and not queue_name:
398+
raise KeyError("featured_mod")
399+
400+
if queue_name and queue_name not in self.queues:
401+
raise Exception("invalid_request", "invalid queue")
402+
403+
if not participants:
404+
raise Exception("invalid_request", "empty participants")
405+
406+
player_ids = [participant["player_id"] for participant in participants]
407+
missing_players = [
408+
id for id in player_ids if self.player_service[id] is None
409+
]
410+
if missing_players:
411+
raise Exception(
412+
"players_not_found",
413+
*[{"player_id": id} for id in missing_players]
414+
)
415+
416+
all_players = [
417+
self.player_service[player_id] for player_id in player_ids
418+
]
419+
non_idle_players = [
420+
player for player in all_players
421+
if player.state != PlayerState.IDLE
422+
]
423+
if non_idle_players:
424+
raise Exception(
425+
"invalid_state",
426+
[
427+
{"player_id": player.id, "state": player.state.name}
428+
for player in all_players
429+
]
430+
)
431+
432+
queue = self.queues[queue_name] if queue_name else None
433+
featured_mod = featured_mod or queue.featured_mod
434+
host = all_players[0]
435+
guests = all_players[1:]
436+
437+
for player in all_players:
438+
player.state = PlayerState.STARTING_AUTOMATCH
439+
440+
try:
441+
game = self.game_service.create_game(
442+
game_class=LadderGame,
443+
game_mode=featured_mod,
444+
host=host,
445+
name="Matchmaker Game",
446+
mapname=map_name,
447+
matchmaker_queue_id=queue.id if queue else None,
448+
rating_type=queue.rating_type if queue else None,
449+
max_players=len(participants)
450+
)
451+
game.init_mode = InitMode.AUTO_LOBBY
452+
game.set_name_unchecked(game_name)
453+
454+
for participant in participants:
455+
player_id = participant["player_id"]
456+
faction = Faction.from_value(participant["faction"])
457+
team = participant["team"]
458+
slot = participant["slot"]
459+
460+
game.set_player_option(player_id, "Faction", faction.value)
461+
game.set_player_option(player_id, "Team", team)
462+
game.set_player_option(player_id, "StartSpot", slot)
463+
game.set_player_option(player_id, "Army", slot)
464+
game.set_player_option(player_id, "Color", slot)
465+
466+
await self.launch_game(game, host, guests)
467+
468+
return game
469+
except Exception:
470+
self._logger.exception("")
471+
await game.on_game_end()
472+
473+
for player in all_players:
474+
if player.state == PlayerState.STARTING_AUTOMATCH:
475+
player.state = PlayerState.IDLE
476+
477+
raise
478+
328479
def on_match_found(
329480
self,
330481
s1: Search,
@@ -465,7 +616,7 @@ async def launch_game(
465616
def game_options(player: Player) -> GameLaunchOptions:
466617
return options._replace(
467618
team=game.get_player_option(player.id, "Team"),
468-
faction=player.faction,
619+
faction=game.get_player_option(player.id, "Faction"),
469620
map_position=game.get_player_option(player.id, "StartSpot")
470621
)
471622

‎server/message_queue_service.py

+43-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import asyncio
66
import json
7-
from typing import Dict
7+
from typing import Callable, Dict, Optional
88

99
import aio_pika
1010
from aio_pika import DeliveryMode, ExchangeType
@@ -125,34 +125,68 @@ async def _shutdown(self) -> None:
125125
async def publish(
126126
self,
127127
exchange_name: str,
128-
routing: str,
128+
routing_key: str,
129129
payload: Dict,
130130
mandatory: bool = False,
131131
delivery_mode: DeliveryMode = DeliveryMode.PERSISTENT,
132+
correlation_id: Optional[str] = None
132133
) -> None:
133134
if not self._is_ready:
134135
self._logger.warning(
135136
"Not connected to RabbitMQ, unable to publish message."
136137
)
137138
return
138139

139-
exchange = self._exchanges.get(exchange_name)
140-
if exchange is None:
141-
raise KeyError(f"Unknown exchange {exchange_name}.")
140+
exchange = self._get_exchange(exchange_name)
142141

143142
message = aio_pika.Message(
144-
json.dumps(payload).encode(), delivery_mode=delivery_mode
143+
json.dumps(payload).encode(),
144+
delivery_mode=delivery_mode,
145+
correlation_id=correlation_id,
145146
)
146147

147148
async with self._channel.transaction():
148149
await exchange.publish(
149150
message,
150-
routing_key=routing,
151-
mandatory=mandatory
151+
routing_key=routing_key,
152+
mandatory=mandatory,
152153
)
153154
self._logger.log(
154-
TRACE, "Published message %s to %s/%s", payload, exchange_name, routing
155+
TRACE, "Published message %s to %s/%s",
156+
payload,
157+
exchange_name,
158+
routing_key
159+
)
160+
161+
async def consume(
162+
self,
163+
exchange_name: str,
164+
routing_key: str,
165+
process_message: Callable[[aio_pika.IncomingMessage], None]
166+
) -> None:
167+
await self.initialize()
168+
if not self._is_ready:
169+
self._logger.warning(
170+
"Not connected to RabbitMQ, unable to declare queue."
155171
)
172+
return
173+
174+
exchange = self._get_exchange(exchange_name)
175+
queue = await self._channel.declare_queue(
176+
None,
177+
auto_delete=True,
178+
durable=False
179+
)
180+
181+
await queue.bind(exchange, routing_key)
182+
await queue.consume(process_message, exclusive=True)
183+
184+
def _get_exchange(self, exchange_name: str) -> aio_pika.Exchange:
185+
exchange = self._exchanges.get(exchange_name)
186+
if exchange is None:
187+
raise KeyError(f"Unknown exchange {exchange_name}.")
188+
189+
return exchange
156190

157191
@synchronizedmethod("initialization_lock")
158192
async def reconnect(self) -> None:

‎tests/integration_tests/conftest.py

+24-8
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,20 @@ def mock_games():
3434

3535

3636
@pytest.fixture
37-
async def ladder_service(mocker, database, game_service):
37+
async def ladder_service(
38+
mocker,
39+
database,
40+
game_service,
41+
player_service,
42+
message_queue_service
43+
):
3844
mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1)
39-
ladder_service = LadderService(database, game_service)
45+
ladder_service = LadderService(
46+
database,
47+
game_service,
48+
player_service,
49+
message_queue_service
50+
)
4051
await ladder_service.initialize()
4152
yield ladder_service
4253
await ladder_service.shutdown()
@@ -384,17 +395,22 @@ async def channel():
384395
await connection.close()
385396

386397

387-
async def connect_mq_consumer(server, channel, routing_key):
388-
"""
389-
Returns a subclass of Protocol that yields messages read from a rabbitmq
390-
exchange.
391-
"""
398+
async def connect_mq_queue(channel, routing_key):
392399
exchange = await channel.declare_exchange(
393400
config.MQ_EXCHANGE_NAME,
394401
aio_pika.ExchangeType.TOPIC
395402
)
396-
queue = await channel.declare_queue("", exclusive=True)
403+
queue = await channel.declare_queue(None, exclusive=True)
397404
await queue.bind(exchange, routing_key=routing_key)
405+
return queue
406+
407+
408+
async def connect_mq_consumer(channel, routing_key):
409+
"""
410+
Returns a subclass of Protocol that yields messages read from a rabbitmq
411+
exchange.
412+
"""
413+
queue = await connect_mq_queue(channel, routing_key)
398414
proto = AioQueueProtocol(queue)
399415
await proto.consume()
400416

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# External matchmaker requests over rabbitmq
2+
import asyncio
3+
import json
4+
import uuid
5+
6+
import pytest
7+
8+
from server.config import config
9+
from tests.utils import fast_forward
10+
11+
from .conftest import connect_and_sign_in, connect_mq_queue, read_until_command
12+
from .test_game import client_response
13+
14+
pytestmark = [pytest.mark.asyncio, pytest.mark.rabbitmq]
15+
16+
17+
@fast_forward(10)
18+
async def test_valid_request_1v1(
19+
lobby_server,
20+
channel,
21+
message_queue_service
22+
):
23+
test_id, _, proto1 = await connect_and_sign_in(
24+
("test", "test_password"), lobby_server
25+
)
26+
rhiza_id, _, proto2 = await connect_and_sign_in(
27+
("Rhiza", "puff_the_magic_dragon"), lobby_server
28+
)
29+
success_queue = await connect_mq_queue(channel, "success.match.create")
30+
error_queue = await connect_mq_queue(channel, "error.match.create")
31+
32+
await asyncio.gather(*(
33+
read_until_command(proto, "game_info")
34+
for proto in (proto1, proto2)
35+
))
36+
37+
# Include all the information we could possibly need
38+
correlation_id = str(uuid.uuid4())
39+
await message_queue_service.publish(
40+
config.MQ_EXCHANGE_NAME,
41+
"request.match.create",
42+
{
43+
"matchmaker_queue": "ladder1v1",
44+
"featured_mod": "ladder1v1",
45+
"game_name": "test VERSUS Rhiza",
46+
"map_name": "scmp_003",
47+
"participants": [
48+
{
49+
"player_id": test_id,
50+
"team": 2,
51+
"slot": 1,
52+
"faction": "uef"
53+
},
54+
{
55+
"player_id": rhiza_id,
56+
"team": 3,
57+
"slot": 2,
58+
"faction": "cybran"
59+
}
60+
]
61+
},
62+
correlation_id=correlation_id
63+
)
64+
65+
msg1, msg2 = await asyncio.gather(
66+
client_response(proto1),
67+
client_response(proto2)
68+
)
69+
assert msg1["uid"] == msg2["uid"]
70+
assert msg1["mapname"] == msg2["mapname"]
71+
assert msg1["name"] == msg2["name"]
72+
assert msg1["mod"] == msg2["mod"]
73+
assert msg1["rating_type"] == msg2["rating_type"]
74+
assert msg1["expected_players"] == msg2["expected_players"]
75+
76+
assert msg1["mapname"] == "scmp_003"
77+
assert msg1["name"] == "test VERSUS Rhiza"
78+
assert msg1["mod"] == "ladder1v1"
79+
assert msg1["rating_type"] == "ladder_1v1"
80+
assert msg1["expected_players"] == 2
81+
82+
assert msg1["team"] == 2
83+
assert msg1["map_position"] == 1
84+
assert msg1["faction"] == 1
85+
86+
assert msg2["team"] == 3
87+
assert msg2["map_position"] == 2
88+
assert msg2["faction"] == 3
89+
90+
await proto1.send_message({
91+
"target": "game",
92+
"command": "GameState",
93+
"args": ["Launching"]
94+
})
95+
96+
message = await success_queue.iterator(timeout=5).__anext__()
97+
assert message.correlation_id == correlation_id
98+
assert json.loads(message.body.decode()) == {
99+
"game_id": msg1["uid"]
100+
}
101+
assert await error_queue.get(fail=False) is None
102+
103+
104+
@fast_forward(10)
105+
async def test_player_offline(
106+
lobby_server,
107+
channel,
108+
message_queue_service
109+
):
110+
rhiza_id, _, proto = await connect_and_sign_in(
111+
("Rhiza", "puff_the_magic_dragon"), lobby_server
112+
)
113+
success_queue = await connect_mq_queue(channel, "success.match.create")
114+
error_queue = await connect_mq_queue(channel, "error.match.create")
115+
116+
await read_until_command(proto, "game_info")
117+
118+
# Include all the information we could possibly need
119+
correlation_id = str(uuid.uuid4())
120+
await message_queue_service.publish(
121+
config.MQ_EXCHANGE_NAME,
122+
"request.match.create",
123+
{
124+
"matchmaker_queue": "ladder1v1",
125+
"game_name": "test VERSUS Rhiza",
126+
"map_name": "scmp_003",
127+
"participants": [
128+
{
129+
"player_id": 1,
130+
"team": 2,
131+
"slot": 1,
132+
"faction": "uef"
133+
},
134+
{
135+
"player_id": rhiza_id,
136+
"team": 3,
137+
"slot": 2,
138+
"faction": "cybran"
139+
}
140+
]
141+
},
142+
correlation_id=correlation_id
143+
)
144+
145+
message = await error_queue.iterator(timeout=5).__anext__()
146+
assert message.correlation_id == correlation_id
147+
assert json.loads(message.body.decode()) == {
148+
"error_code": "players_not_found", "args": [{"player_id": 1}]
149+
}
150+
assert await success_queue.get(fail=False) is None
151+
152+
153+
@fast_forward(100)
154+
async def test_players_dont_connect(
155+
lobby_server,
156+
channel,
157+
message_queue_service
158+
):
159+
test_id, _, proto1 = await connect_and_sign_in(
160+
("test", "test_password"), lobby_server
161+
)
162+
rhiza_id, _, proto2 = await connect_and_sign_in(
163+
("Rhiza", "puff_the_magic_dragon"), lobby_server
164+
)
165+
success_queue = await connect_mq_queue(channel, "success.match.create")
166+
error_queue = await connect_mq_queue(channel, "error.match.create")
167+
168+
await asyncio.gather(*(
169+
read_until_command(proto, "game_info")
170+
for proto in (proto1, proto2)
171+
))
172+
173+
# Include all the information we could possibly need
174+
correlation_id = str(uuid.uuid4())
175+
await message_queue_service.publish(
176+
config.MQ_EXCHANGE_NAME,
177+
"request.match.create",
178+
{
179+
"matchmaker_queue": "ladder1v1",
180+
"featured_mod": "faf",
181+
"game_name": "test VERSUS Rhiza",
182+
"map_name": "scmp_003",
183+
"participants": [
184+
{
185+
"player_id": test_id,
186+
"team": 2,
187+
"slot": 1,
188+
"faction": "aeon"
189+
},
190+
{
191+
"player_id": rhiza_id,
192+
"team": 3,
193+
"slot": 2,
194+
"faction": "seraphim"
195+
}
196+
]
197+
},
198+
correlation_id=correlation_id
199+
)
200+
201+
msg = await client_response(proto1)
202+
assert msg["faction"] == 2
203+
# Mod field sould override the mod from queue
204+
assert msg["mod"] == "faf"
205+
206+
message = await error_queue.iterator(timeout=85).__anext__()
207+
assert message.correlation_id == correlation_id
208+
assert json.loads(message.body.decode()) == {
209+
"error_code": "launch_failed", "args": [{"player_id": rhiza_id}]
210+
}
211+
assert await success_queue.get(fail=False) is None

‎tests/integration_tests/test_server.py

-3
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,10 @@ async def test_player_info_broadcast(lobby_server):
9999
@fast_forward(5)
100100
async def test_player_info_broadcast_to_rabbitmq(lobby_server, channel):
101101
mq_proto = await connect_mq_consumer(
102-
lobby_server,
103102
channel,
104103
"broadcast.playerInfo.update"
105104
)
106105
mq_proto_all = await connect_mq_consumer(
107-
lobby_server,
108106
channel,
109107
"broadcast.*.update"
110108
)
@@ -334,7 +332,6 @@ async def test_game_info_broadcast_to_players_in_lobby(lobby_server):
334332
@fast_forward(10)
335333
async def test_info_broadcast_to_rabbitmq(lobby_server, channel):
336334
mq_proto_all = await connect_mq_consumer(
337-
lobby_server,
338335
channel,
339336
"broadcast.*.update"
340337
)

‎tests/unit_tests/conftest.py

+21-6
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,25 @@ def ladder_and_game_service_context(
2222
@asynccontextmanager
2323
async def make_ladder_and_game_service():
2424
async with database_context(request) as database:
25+
player_service = mock.Mock()
26+
message_queue_service = mock.Mock(
27+
declare_exchange=CoroutineMock(),
28+
consume=CoroutineMock()
29+
)
2530
with mock.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1):
2631
game_service = GameService(
2732
database,
28-
player_service=mock.Mock(),
33+
player_service=player_service,
2934
game_stats_service=mock.Mock(),
3035
rating_service=mock.Mock(),
31-
message_queue_service=mock.Mock(
32-
declare_exchange=CoroutineMock()
33-
)
36+
message_queue_service=message_queue_service
37+
)
38+
ladder_service = LadderService(
39+
database,
40+
game_service,
41+
player_service,
42+
message_queue_service
3443
)
35-
ladder_service = LadderService(database, game_service)
3644

3745
await game_service.initialize()
3846
await ladder_service.initialize()
@@ -50,9 +58,16 @@ async def ladder_service(
5058
mocker,
5159
database,
5260
game_service: GameService,
61+
player_service,
62+
message_queue_service
5363
):
5464
mocker.patch("server.matchmaker.pop_timer.config.QUEUE_POP_TIME_MAX", 1)
55-
ladder_service = LadderService(database, game_service)
65+
ladder_service = LadderService(
66+
database,
67+
game_service,
68+
player_service,
69+
message_queue_service
70+
)
5671
await ladder_service.initialize()
5772

5873
yield ladder_service

‎tests/unit_tests/test_ladder_service.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,18 @@ def game(database, game_service, game_stats_service):
3535
)
3636

3737

38-
async def test_queue_initialization(database, game_service):
39-
ladder_service = LadderService(database, game_service)
38+
async def test_queue_initialization(
39+
database,
40+
game_service,
41+
player_service,
42+
message_queue_service
43+
):
44+
ladder_service = LadderService(
45+
database,
46+
game_service,
47+
player_service,
48+
message_queue_service
49+
)
4050

4151
def make_mock_queue(*args, **kwargs):
4252
queue = create_autospec(MatchmakerQueue)

0 commit comments

Comments
 (0)
Please sign in to comment.