Skip to content

Commit 1f7308c

Browse files
SARomanchukSARomanchuk
authored andcommitted
issue:493
add: task_id for retry scheduled task
1 parent 34e280a commit 1f7308c

File tree

4 files changed

+9
-1
lines changed

4 files changed

+9
-1
lines changed

taskiq/kicker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,10 @@ def with_labels(
6969
self.labels.update(labels)
7070
return self
7171

72-
def with_task_id(self, task_id: str) -> "AsyncKicker[_FuncParams, _ReturnType]":
72+
def with_task_id(
73+
self,
74+
task_id: Optional[str],
75+
) -> "AsyncKicker[_FuncParams, _ReturnType]":
7376
"""
7477
Set task_id for current execution.
7578
@@ -208,6 +211,7 @@ async def schedule_by_cron(
208211
labels=message.labels,
209212
args=message.args,
210213
kwargs=message.kwargs,
214+
task_id=self.custom_task_id,
211215
cron=cron_str,
212216
cron_offset=cron_offset,
213217
)
@@ -239,6 +243,7 @@ async def schedule_by_time(
239243
labels=message.labels,
240244
args=message.args,
241245
kwargs=message.kwargs,
246+
task_id=self.custom_task_id,
242247
time=time,
243248
)
244249
await source.add_schedule(scheduled)

taskiq/scheduler/scheduled_task/v1.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class ScheduledTask(BaseModel):
1212
labels: Dict[str, Any]
1313
args: List[Any]
1414
kwargs: Dict[str, Any]
15+
task_id: Optional[str] = None
1516
schedule_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
1617
cron: Optional[str] = None
1718
cron_offset: Optional[Union[str, timedelta]] = None

taskiq/scheduler/scheduled_task/v2.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class ScheduledTask(BaseModel):
1313
labels: Dict[str, Any]
1414
args: List[Any]
1515
kwargs: Dict[str, Any]
16+
task_id: Optional[str] = None
1617
schedule_id: str = Field(default_factory=lambda: uuid.uuid4().hex)
1718
cron: Optional[str] = None
1819
cron_offset: Optional[Union[str, timedelta]] = None

taskiq/scheduler/scheduler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ async def on_ready(self, source: "ScheduleSource", task: ScheduledTask) -> None:
5151
.with_labels(
5252
schedule_id=task.schedule_id,
5353
)
54+
.with_task_id(task_id=task.task_id)
5455
.kiq(
5556
*task.args,
5657
**task.kwargs,

0 commit comments

Comments
 (0)