Skip to content

Commit 024b1b9

Browse files
authored
Merge pull request allan-simon#12 from weenect/feature-9
fixes allan-simon#9 - add basic nack support
2 parents ddd41d8 + ce1171f commit 024b1b9

File tree

5 files changed

+78
-1
lines changed

5 files changed

+78
-1
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ After some searching, it was a bug in the 3rd party library and my code, that wa
6161

6262
* `GET /authentification-done-with-success-on/$USERNAME` return if a client has successfully connected with the given username
6363
* `GET /messages-acknowledged/$DELIVERY_TAG` this call will block until the given delivery tag was acknowledged as consumed by a worker, if the message has already been consumed before the call, the call directly returns. => this call is super pratical to write your integration tests (i.e to check if you worker has written a record in database after the message was consumed)
64+
* `GET /messages-not-acknowledged/$DELIVERY_TAG` this call will block until the given delivery tag was marked as rejected by a worker (basic_nack), if the message has already been rejected before the call, the call directly returns. => this call is super practical to write your integration tests (i.e worker rejected the message for any reason)
65+
* `GET /messages-requeued/$DELIVERY_TAG` this call will block until the given delivery tag was marked as rejected by a worker with requeue parameter to true (basic_nack with requeue), if the message has already been requeued before the call, the call directly returns. => this call is super practical to write your integration tests (i.e worker rejected the message to be treated later)
6466
* `GET /messages-in-queue/$QUEUE_NAME` get all the messages waiting to be consumed in a given queue
6567
* `GET /messages-in-exchange/$EXCHANGE_NAME` get all the messages waiting to be consumed in a given exchange
6668
* `GET /queue-bound-to-exchange/$QUEUE_NAME/$EXCHANGE_NAME` wait until a given queue is bound to the given exchange

service/http_protocol.py

+28-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,34 @@ def _on_get(self, target):
109109
future.add_done_callback(self._on_get_done)
110110
return
111111

112+
###
113+
# Wait for a message identified by a delivery_tag to be nack
114+
# by the consumer or timeout
115+
###
116+
if target.startswith(b'/messages-not-acknowledged/'):
117+
delivery_tag = target.split(b'/', maxsplit=2)[2]
118+
future = asyncio.ensure_future(
119+
self._global_state.wait_message_not_acknowledged(
120+
int(delivery_tag.decode('utf-8')),
121+
)
122+
)
123+
future.add_done_callback(self._on_get_done)
124+
return
125+
126+
###
127+
# Wait for a message identified by a delivery_tag to be nack and requeued
128+
# by the consumer or timeout
129+
###
130+
if target.startswith(b'/messages-requeued/'):
131+
delivery_tag = target.split(b'/', maxsplit=2)[2]
132+
future = asyncio.ensure_future(
133+
self._global_state.wait_message_requeued(
134+
int(delivery_tag.decode('utf-8')),
135+
)
136+
)
137+
future.add_done_callback(self._on_get_done)
138+
return
139+
112140
###
113141
# Inspect the content of a queue where the program we test
114142
# publish messages.
@@ -248,7 +276,6 @@ def _on_put(self, target, data):
248276
self._send_http_response_not_found()
249277

250278
def _on_get_done(self, future):
251-
252279
try:
253280
success = future.result()
254281
if not success:

service/method.py

+15
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class MethodIDs(IntEnum):
1818
BASIC_PUBLISH = 0x003C0028
1919
BASIC_CONSUME = 0x003C0014
2020
BASIC_ACK = 0x003C0050
21+
BASIC_NACK = 0x003C0078
2122

2223
EXCHANGE_DECLARE = 0x0028000A
2324

@@ -296,6 +297,19 @@ def _decode_basic_ack(payload):
296297
}
297298

298299

300+
def _decode_basic_nack(payload):
301+
values, _ = loads(
302+
'Lbb',
303+
payload,
304+
offset=4,
305+
)
306+
return {
307+
'delivery-tag': values[0],
308+
'multiple': values[1],
309+
'requeue': values[2],
310+
}
311+
312+
299313
def _decode_basic_cancel(payload):
300314

301315
values, _ = loads(
@@ -323,6 +337,7 @@ def _decode_basic_cancel(payload):
323337
0x003C0028: _decode_basic_publish,
324338
MethodIDs.BASIC_CONSUME: _decode_basic_consume,
325339
MethodIDs.BASIC_ACK: _decode_basic_ack,
340+
MethodIDs.BASIC_NACK: _decode_basic_nack,
326341

327342
MethodIDs.BASIC_CANCEL: _decode_basic_cancel,
328343

service/protocol.py

+7
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,13 @@ def _treat_channel_frame(self, frame_value):
333333
)
334334
return
335335

336+
if frame_value.method_id == MethodIDs.BASIC_NACK:
337+
self._global_state.message_nack(
338+
frame_value.properties['delivery-tag'],
339+
frame_value.properties['requeue'],
340+
)
341+
return
342+
336343
if frame_value.method_id == MethodIDs.BASIC_CANCEL:
337344
send_basic_cancel_ok(
338345
self.transport,

service/state.py

+26
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ def __init__(self):
2222
self._queues_bound_exhanges = {}
2323
self._authentication_tried_on = {}
2424
self._message_acknowledged = set()
25+
self._message_not_acknowledged = set()
26+
self._message_requeued = set()
2527

2628
def check_credentials(self, username, password):
2729
is_authentified = self._users.get(username, None) == password
@@ -270,6 +272,12 @@ def publish_message_in_queue(
270272
def message_ack(self, delivery_tag):
271273
self._message_acknowledged.add(delivery_tag)
272274

275+
def message_nack(self, delivery_tag, requeue: bool = False):
276+
if requeue:
277+
self._message_requeued.add(delivery_tag)
278+
else:
279+
self._message_not_acknowledged.add(delivery_tag)
280+
273281
async def wait_authentication_performed_on(self, username, timeout=10):
274282
for _ in range(timeout):
275283
decoded_username = username.decode('utf-8')
@@ -289,6 +297,24 @@ async def wait_message_acknoledged(self, delivery_tag, timeout=10):
289297

290298
raise WaitTimeout()
291299

300+
async def wait_message_not_acknowledged(self, delivery_tag, timeout=10):
301+
for _ in range(timeout):
302+
if delivery_tag in self._message_not_acknowledged:
303+
return True
304+
305+
await asyncio.sleep(1)
306+
307+
raise WaitTimeout()
308+
309+
async def wait_message_requeued(self, delivery_tag, timeout=10):
310+
for _ in range(timeout):
311+
if delivery_tag in self._message_requeued:
312+
return True
313+
314+
await asyncio.sleep(1)
315+
316+
raise WaitTimeout()
317+
292318
async def wait_queue_bound(self, queue, exchange, timeout=10):
293319
for _ in range(timeout):
294320

0 commit comments

Comments
 (0)