Skip to content

Commit

Permalink
Add note about large workflows (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
pencil authored Dec 20, 2024
1 parent aaf43ed commit 2f0a94a
Showing 1 changed file with 44 additions and 0 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,50 @@ workflow = Workflow(
In this example, Task 2 will run roughly 1 second after Task 1 finishes, and
Task 3 and will run 2 seconds after Task 2 finishes.

### Large Workflows

Because of how `dramatiq-workflow` is implemented, each task in a workflow has
to know about the remaining tasks in the workflow that could potentially run
after it. When a workflow has a large number of tasks, this can lead to an
increase of memory usage in the broker and increased network traffic between
the broker and the workers, especially when using `Group` tasks: Each task in a
`Group` can potentially be the last one to finish, so each task has to retain a
copy of the remaining tasks that run after the `Group`.

There are a few things you can do to alleviate this issue:

- Minimize the usage of parameters in the `message` method. Instead, consider
using a database to store data that is required by your tasks.
- Limit the size of groups to a reasonable number of tasks. Instead of
scheduling one task with 1000 tasks in a group, consider scheduling 10 groups
with 100 tasks each and chaining them together.
- Consider breaking down large workflows into smaller partial workflows that
then schedule a subsequent workflow at the very end of the outermost `Chain`.

Lastly, you can use compression to reduce the size of the messages in your
queue. While dramatiq does not provide a compression implementation by default,
one can be added with just a few lines of code. For example:

```python
import dramatiq
from dramatiq.encoder import JSONEncoder, MessageData
import lz4.frame

class DramatiqLz4JSONEncoder(JSONEncoder):
def encode(self, data: MessageData) -> bytes:
return lz4.frame.compress(super().encode(data))

def decode(self, data: bytes) -> MessageData:
try:
decompressed = lz4.frame.decompress(data)
except RuntimeError:
# Uncompressed data from before the switch to lz4
decompressed = data
return super().decode(decompressed)

dramatiq.set_encoder(DramatiqLz4JSONEncoder())
```

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file
Expand Down

0 comments on commit 2f0a94a

Please sign in to comment.