Skip to content

Commit

Permalink
Merge branch 'release/0.10.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
s3rius committed Oct 17, 2023
2 parents 3e9ff9d + e9431bd commit 0adfcee
Show file tree
Hide file tree
Showing 28 changed files with 899 additions and 167 deletions.
13 changes: 9 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
name: Testing taskiq

on: [pull_request]
on:
pull_request:
push:
branches:
- develop
- master

jobs:
lint:
Expand All @@ -21,7 +26,7 @@ jobs:
python-version: "3.11"
cache: "poetry"
- name: Install deps
run: poetry install
run: poetry install --all-extras
- name: Run lint check
run: poetry run pre-commit run -a ${{ matrix.cmd }}
pytest:
Expand All @@ -31,7 +36,7 @@ jobs:
contents: write
strategy:
matrix:
py_version: ["3.8", "3.9", "3.10", "3.11"]
py_version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
os: [ubuntu-latest, windows-latest]
runs-on: "${{ matrix.os }}"
steps:
Expand All @@ -44,7 +49,7 @@ jobs:
python-version: "${{ matrix.py_version }}"
cache: "poetry"
- name: Install deps
run: poetry install
run: poetry install --all-extras
- name: Run pytest check
run: poetry run pytest -vv -n auto --cov="taskiq" .
- name: Generate report
Expand Down
9 changes: 3 additions & 6 deletions docs/examples/extending/schedule_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,14 @@ async def get_schedules(self) -> List["ScheduledTask"]:
args=[],
kwargs={},
cron="* * * * *",
#
# We need point on self source for calling pre_send / post_send when
# task is ready to be enqueued.
source=self,
),
]

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
async def add_schedule(self, schedule: "ScheduledTask") -> None:
return await super().add_schedule(schedule)
# This method can be either sync or async.
def add_schedule(self, schedule: "ScheduledTask") -> None:
print("New schedule added:", schedule)

# This method is optional. You may not implement this.
# It's just a helper to people to be able to interact with your source.
Expand Down
2 changes: 1 addition & 1 deletion docs/examples/schedule/intro.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from taskiq_aio_pika import AioPikaBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq.scheduler import TaskiqScheduler
from taskiq import TaskiqScheduler

broker = AioPikaBroker("amqp://guest:guest@localhost:5672/")

Expand Down
113 changes: 113 additions & 0 deletions docs/guide/message-format.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
---
order: 11
---

# Taskiq message format

Taskiq doesn't force you to use any specific message format. We define default message format,
but you can use any format you want.

The default message format is:


::: tabs

@tab example

```json
{
"task_name": "my_project.module1.task",
"args": [1, 2, 3],
"kwargs": {"a": 1, "b": 2, "c": 3},
"labels": {
"label1": "value1",
"label2": "value2"
}
}
```

@tab json schema

```json
{
"properties": {
"task_id": {
"title": "Task Id",
"type": "string"
},
"task_name": {
"title": "Name of the task",
"type": "string"
},
"labels": {
"title": "Additional labels",
"type": "object"
},
"args": {
"items": {},
"title": "Arguments",
"type": "array"
},
"kwargs": {
"title": "Keyword arguments",
"type": "object"
}
},
"required": [
"task_id",
"task_name",
"labels",
"args",
"kwargs"
],
"type": "object"
}
```

:::

But this can be easily changed by creating your own implementation of the TaskiqFormatter class or TaskiqSerializer class.


### Serializers

Serializers define the format of the message but not the structure. For example, if you want to use msgpack or ORJson to serialize your message, you should update the serializer of your broker.

Be default, Taskiq uses JSON serializer. But we also have some implementations of other serializers:

* ORJSONSerializer - faster [JSON implementation](https://pypi.org/project/orjson/). Also, it supports datetime and UUID serialization.
* MSGPackSerializer - [MsgPack](https://pypi.org/project/msgpack/) format serializer. It might be useful to send less data over the network.
* CBORSerializer - [CBOR](https://pypi.org/project/cbor2/) format serializer. It is also has a smaller size than JSON.

To define your own serializer, you have to subclass the TaskiqSerializer class and implement `dumpb` and `loadb` methods. You can take a look at the existing implementations from the `taskiq.serializers` module.

To install taskiq with libraries for non-JSON serializers, you should install taskiq with extras.

::: tabs

@tab orjson

```bash
pip install "taskiq[orjson]"
```

@tab msgpack

```bash
pip install "taskiq[msgpack]"
```

@tab cbor

```bash
pip install "taskiq[cbor]"
```

:::

### Formatters

Formatters define the format of the message. It might be useful if you'd like to send a task to a celery worker for a different project. You can do it in seriazier as well, but formatters give you correct type hints.

By default we use a formatter that dumps the message to dict and serializes it using serializer. But you can define your own formatter to send a message in any format you want. To define a new formatter, you have to subclass the TaskiqFormatter class and implement `dumps` and `loads` methods.
As an example, you can take a look at the `JSONFormatter` from `taskiq.formatters` implementation.
Loading

0 comments on commit 0adfcee

Please sign in to comment.