From 7d7f7fe482500a2a18324f39c2c1b4c9c776c71a Mon Sep 17 00:00:00 2001 From: "Alexander A. Prokhorov" Date: Fri, 5 May 2023 15:37:04 +0300 Subject: [PATCH] Fix test "tests/test_notification_queue_limit.py::test_notification_queue_limit". --- tests/test_notification_queue_limit.py | 82 ++++++++++++-------------- 1 file changed, 39 insertions(+), 43 deletions(-) diff --git a/tests/test_notification_queue_limit.py b/tests/test_notification_queue_limit.py index e73b2b8..78c0258 100644 --- a/tests/test_notification_queue_limit.py +++ b/tests/test_notification_queue_limit.py @@ -21,7 +21,8 @@ """Test that intermediate notifications skipped.""" -import time +import asyncio +from typing import List import graphene import pytest @@ -30,7 +31,7 @@ @pytest.mark.asyncio -async def test_subscription_notification_queue_limit(gql): +async def test_notification_queue_limit(gql): """Test it is possible to skip intermediate notifications. Here we start subscription which send 10 messages and server took @@ -39,6 +40,10 @@ async def test_subscription_notification_queue_limit(gql): sets notification_queue_limit to 1. """ + # Number of test notifications to send. + msgs_count = 100 + msg_proc_delay = 0.001 + print("Prepare the test setup: GraphQL backend classes.") class SendMessages(graphene.Mutation): @@ -48,10 +53,10 @@ class SendMessages(graphene.Mutation): @staticmethod def mutate(root, info): - """Broadcast 10 messages.""" + """Broadcast many messages.""" del root, info - for idx in range(10): - OnNewMessage.broadcast(payload={"message": str(idx)}) + for idx in range(msgs_count): + OnNewMessage.broadcast(payload={"message": idx}) return SendMessages(is_ok=True) class OnNewMessage(channels_graphql_ws.Subscription): @@ -60,17 +65,17 @@ class OnNewMessage(channels_graphql_ws.Subscription): # Leave only the last message in the server queue. notification_queue_limit = 1 - message = graphene.String() + message = graphene.Int() @staticmethod - def publish(payload, info): + async def publish(payload, info): """Notify all clients except the author of the message.""" del info # Emulate server high load. It is bad to use sleep in the # tests but here it seems ok. If test is running within # high load builder it will behave the same and skip # notifications it is unable to process. - time.sleep(1) + await asyncio.sleep(msg_proc_delay) return OnNewMessage(message=payload["message"]) class Subscription(graphene.ObjectType): @@ -85,23 +90,16 @@ class Mutation(graphene.ObjectType): print("Establish & initialize WebSocket GraphQL connections.") - comm1 = gql( + comm = gql( mutation=Mutation, subscription=Subscription, consumer_attrs={"strict_ordering": True}, ) - await comm1.connect_and_init() + await comm.connect_and_init() - comm2 = gql( - mutation=Mutation, - subscription=Subscription, - consumer_attrs={"strict_ordering": True}, - ) - await comm2.connect_and_init() - - print("Subscribe to receive a new message notifications.") + print("Trigger notifications.") - sub_op_id = await comm2.send( + await comm.send( msg_type="start", payload={ "query": "subscription op_name { on_new_message { message } }", @@ -109,11 +107,8 @@ class Mutation(graphene.ObjectType): "operationName": "op_name", }, ) - await comm2.assert_no_messages("Subscribe responded with a message!") - - print("Start sending notifications.") - mut_op_id = await comm1.send( + mut_op_id = await comm.send( msg_type="start", payload={ "query": """mutation op_name { send_messages { is_ok } }""", @@ -121,23 +116,24 @@ class Mutation(graphene.ObjectType): "operationName": "op_name", }, ) - await comm1.receive(assert_id=mut_op_id, assert_type="data") - await comm1.receive(assert_id=mut_op_id, assert_type="complete") - - await comm1.assert_no_messages("Self-notification happened!") - - # Client will receive only the first and the last notifications. - resp = await comm2.receive(assert_id=sub_op_id, assert_type="data") - assert resp["data"]["on_new_message"]["message"] == "0" - - resp = await comm2.receive(assert_id=sub_op_id, assert_type="data") - assert resp["data"]["on_new_message"]["message"] == "9" - - await comm1.assert_no_messages( - "Unexpected message received at the end of the test!" - ) - await comm2.assert_no_messages( - "Unexpected message received at the end of the test!" - ) - await comm1.finalize() - await comm2.finalize() + await comm.receive(assert_id=mut_op_id, assert_type="data") + await comm.receive(assert_id=mut_op_id, assert_type="complete") + + # Here we store ids of processed notifications. + received_ids: List[int] = [] + + while True: + msg = await comm.receive(raw_response=False) + print("Received message:", msg) + received_ids.append(msg["data"]["on_new_message"]["message"]) + if msg["data"]["on_new_message"]["message"] == msgs_count - 1: + break + + await comm.finalize() + print("Received messages", received_ids) + + # Check that there is always the first and the last message. + # Make sure that we received not all of them + assert received_ids[0] == 0, "First message ID != 0!" + assert received_ids[-1] == msgs_count - 1, f"Last message ID != {msgs_count - 1}!" + assert len(received_ids) < msgs_count, "No messages skipped!"