Skip to content

Commit

Permalink
Fix test "tests/test_notification_queue_limit.py::test_notification_q…
Browse files Browse the repository at this point in the history
…ueue_limit".
  • Loading branch information
prokher committed May 5, 2023
1 parent abbd0da commit 7d7f7fe
Showing 1 changed file with 39 additions and 43 deletions.
82 changes: 39 additions & 43 deletions tests/test_notification_queue_limit.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@

"""Test that intermediate notifications skipped."""

import time
import asyncio
from typing import List

import graphene
import pytest
Expand All @@ -30,7 +31,7 @@


@pytest.mark.asyncio
async def test_subscription_notification_queue_limit(gql):
async def test_notification_queue_limit(gql):
"""Test it is possible to skip intermediate notifications.
Here we start subscription which send 10 messages and server took
Expand All @@ -39,6 +40,10 @@ async def test_subscription_notification_queue_limit(gql):
sets notification_queue_limit to 1.
"""

# Number of test notifications to send.
msgs_count = 100
msg_proc_delay = 0.001

print("Prepare the test setup: GraphQL backend classes.")

class SendMessages(graphene.Mutation):
Expand All @@ -48,10 +53,10 @@ class SendMessages(graphene.Mutation):

@staticmethod
def mutate(root, info):
"""Broadcast 10 messages."""
"""Broadcast many messages."""
del root, info
for idx in range(10):
OnNewMessage.broadcast(payload={"message": str(idx)})
for idx in range(msgs_count):
OnNewMessage.broadcast(payload={"message": idx})
return SendMessages(is_ok=True)

class OnNewMessage(channels_graphql_ws.Subscription):
Expand All @@ -60,17 +65,17 @@ class OnNewMessage(channels_graphql_ws.Subscription):
# Leave only the last message in the server queue.
notification_queue_limit = 1

message = graphene.String()
message = graphene.Int()

@staticmethod
def publish(payload, info):
async def publish(payload, info):
"""Notify all clients except the author of the message."""
del info
# Emulate server high load. It is bad to use sleep in the
# tests but here it seems ok. If test is running within
# high load builder it will behave the same and skip
# notifications it is unable to process.
time.sleep(1)
await asyncio.sleep(msg_proc_delay)
return OnNewMessage(message=payload["message"])

class Subscription(graphene.ObjectType):
Expand All @@ -85,59 +90,50 @@ class Mutation(graphene.ObjectType):

print("Establish & initialize WebSocket GraphQL connections.")

comm1 = gql(
comm = gql(
mutation=Mutation,
subscription=Subscription,
consumer_attrs={"strict_ordering": True},
)
await comm1.connect_and_init()
await comm.connect_and_init()

comm2 = gql(
mutation=Mutation,
subscription=Subscription,
consumer_attrs={"strict_ordering": True},
)
await comm2.connect_and_init()

print("Subscribe to receive a new message notifications.")
print("Trigger notifications.")

sub_op_id = await comm2.send(
await comm.send(
msg_type="start",
payload={
"query": "subscription op_name { on_new_message { message } }",
"variables": {},
"operationName": "op_name",
},
)
await comm2.assert_no_messages("Subscribe responded with a message!")

print("Start sending notifications.")

mut_op_id = await comm1.send(
mut_op_id = await comm.send(
msg_type="start",
payload={
"query": """mutation op_name { send_messages { is_ok } }""",
"variables": {},
"operationName": "op_name",
},
)
await comm1.receive(assert_id=mut_op_id, assert_type="data")
await comm1.receive(assert_id=mut_op_id, assert_type="complete")

await comm1.assert_no_messages("Self-notification happened!")

# Client will receive only the first and the last notifications.
resp = await comm2.receive(assert_id=sub_op_id, assert_type="data")
assert resp["data"]["on_new_message"]["message"] == "0"

resp = await comm2.receive(assert_id=sub_op_id, assert_type="data")
assert resp["data"]["on_new_message"]["message"] == "9"

await comm1.assert_no_messages(
"Unexpected message received at the end of the test!"
)
await comm2.assert_no_messages(
"Unexpected message received at the end of the test!"
)
await comm1.finalize()
await comm2.finalize()
await comm.receive(assert_id=mut_op_id, assert_type="data")
await comm.receive(assert_id=mut_op_id, assert_type="complete")

# Here we store ids of processed notifications.
received_ids: List[int] = []

while True:
msg = await comm.receive(raw_response=False)
print("Received message:", msg)
received_ids.append(msg["data"]["on_new_message"]["message"])
if msg["data"]["on_new_message"]["message"] == msgs_count - 1:
break

await comm.finalize()
print("Received messages", received_ids)

# Check that there is always the first and the last message.
# Make sure that we received not all of them
assert received_ids[0] == 0, "First message ID != 0!"
assert received_ids[-1] == msgs_count - 1, f"Last message ID != {msgs_count - 1}!"
assert len(received_ids) < msgs_count, "No messages skipped!"

0 comments on commit 7d7f7fe

Please sign in to comment.