Skip to content

Commit

Permalink
Add support for timedelta as actor delay (#569)
Browse files Browse the repository at this point in the history
  • Loading branch information
h3nnn4n authored Aug 20, 2023
1 parent 82e09c3 commit 7434977
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 4 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,4 @@ of those changes to CLEARTYPE SRL.
| [@dancardin](https://github.com/dancardin) | Dan Cardin |
| [@caspervdw](https://github.com/caspervdw) | Casper van der Wel |
| [@jenstroeger](https://github.com/jenstroeger/) | Jens Troeger |
| [@h3nnn4n](https://github.com/h3nnn4n/) | Renan S Silva |
8 changes: 6 additions & 2 deletions dramatiq/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import re
import time
from datetime import timedelta
from inspect import iscoroutinefunction
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar, Union, overload

Expand Down Expand Up @@ -141,7 +142,7 @@ def send_with_options(
self, *,
args: tuple = (),
kwargs: Optional[Dict[str, Any]] = None,
delay: Optional[int] = None,
delay: Optional[timedelta | int] = None,
**options,
) -> Message[R]:
"""Asynchronously send a message to this actor, along with an
Expand All @@ -152,13 +153,16 @@ def send_with_options(
args(tuple): Positional arguments that are passed to the actor.
kwargs(dict): Keyword arguments that are passed to the actor.
delay(int): The minimum amount of time, in milliseconds, the
message should be delayed by.
message should be delayed by. Also accepts a timedelta.
**options: Arbitrary options that are passed to the
broker and any registered middleware.
Returns:
Message: The enqueued message.
"""
if isinstance(delay, timedelta):
delay = delay.total_seconds() * 1000

message = self.message_with_options(args=args, kwargs=kwargs, **options)
return self.broker.enqueue(message, delay=delay)

Expand Down
5 changes: 3 additions & 2 deletions tests/test_actors.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from datetime import timedelta
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -258,8 +259,8 @@ def append(x):
# If I send it a delayed message
append.send_with_options(args=(1,), delay=1500)

# And then another delayed message with a smaller delay
append.send_with_options(args=(2,), delay=1000)
# And then another delayed message with a smaller delay and using a timedelta
append.send_with_options(args=(2,), delay=timedelta(seconds=1))

# Then join on the queue
stub_broker.join(append.queue_name)
Expand Down

0 comments on commit 7434977

Please sign in to comment.