Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broker concurrent limit is invalid #396

Open
LL-Ling opened this issue Dec 31, 2024 · 1 comment
Open

Broker concurrent limit is invalid #396

LL-Ling opened this issue Dec 31, 2024 · 1 comment

Comments

@LL-Ling
Copy link

LL-Ling commented Dec 31, 2024

Env

python: 3.12.3
os: window 10
taskiq: 0.11.10

Q

I want to limit the concurrent execution of tasks in the broker, so I set sync_tasks_pool_size=1 and max_async_tasks=1, but it didn't work as expected, and the tasks are still running all at once. I noticed that the Semaphore defined by these two options in the Receiver is not applied in callback and run_task. Is my usage incorrect, or is there a way to limit the number of concurrent tasks? Below is my code. I really like the design of TaskIQ, and I look forward to your response.

Code

import asyncio

from taskiq import InMemoryBroker
from taskiq.kicker import AsyncKicker

from taskiq_example.middleware import TestMiddleware

broker = InMemoryBroker(
sync_tasks_pool_size=1,
max_async_tasks=1
)
broker.add_middlewares(TestMiddleware())

async def best_task_ever(value: int) -> bool:
"""Solve all problems in the world."""
print("execute best_task_ever", value)
await asyncio.sleep(5)
return True

async def main():
await broker.register_task(best_task_ever).kiq(1)
await broker.register_task(best_task_ever).kiq(2)
await broker.register_task(best_task_ever).kiq(3)
await broker.register_task(best_task_ever).kiq(4)

task = broker.register_task(best_task_ever)
await AsyncKicker(
    task.task_name,
    broker,
    task.labels
).with_labels(
    func_value=best_task_ever.__name__,
    func_label="test func"
).kiq(
    value=5
)

await broker.wait_all()

if name == "main":
asyncio.run(main())

Result

It executes 5 tasks concurrently, taking about 5-6 seconds.
I need to control it so that the tasks are executed one by one.

@s3rius
Copy link
Member

s3rius commented Feb 27, 2025

Are you sure that you run it with only one worker process? Most probably you are starting multiple worker processes and they all have this semaphor, therefore number of tasks executed simultaneously will be equal to number of worker processes. You can change it by specifying -w argument.

I have tested it out and indeed you can achieve such behavior, by limiting number of async tasks.

Code I used to verify:

from taskiq_redis import ListQueueBroker
import asyncio
from datetime import datetime

broker = ListQueueBroker("redis://localhost")


@broker.task
async def meme():
    print(f"Start {datetime.now()}")
    await asyncio.sleep(1)
    print(f"Finish {datetime.now()}")


async def main():
    print("Sending")
    await broker.startup()

    for _ in range(5):
        await meme.kiq()

    await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

Here's terminal output:

❯ taskiq worker b:broker -w 1 --max-async-tasks 1
[2025-02-27 21:27:06,046][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 88108
[2025-02-27 21:27:06,046][taskiq.worker][INFO   ][MainProcess] Starting 1 worker processes.
[2025-02-27 21:27:06,050][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 88128 
[2025-02-27 21:27:06,084][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2025-02-27 21:27:06,093][taskiq.receiver.receiver][INFO   ][worker-0] Executing task b:meme with ID: 8e10089193cd46b9ae41f4b7d2bb7a70
Start 2025-02-27 21:27:06.093212
Finish 2025-02-27 21:27:07.093863
[2025-02-27 21:27:07,094][taskiq.receiver.receiver][INFO   ][worker-0] Executing task b:meme with ID: 3dbdf2d699184334af7f88e159f89017
Start 2025-02-27 21:27:07.094954
Finish 2025-02-27 21:27:08.095884
[2025-02-27 21:27:08,096][taskiq.receiver.receiver][INFO   ][worker-0] Executing task b:meme with ID: c997754565d448aaa6ca3a7c23ccb074
Start 2025-02-27 21:27:08.096466
Finish 2025-02-27 21:27:09.098120
[2025-02-27 21:27:09,098][taskiq.receiver.receiver][INFO   ][worker-0] Executing task b:meme with ID: 969240ccd1804f93a0d5b89b0d5a9b67
Start 2025-02-27 21:27:09.098773
Finish 2025-02-27 21:27:10.100585
[2025-02-27 21:27:10,101][taskiq.receiver.receiver][INFO   ][worker-0] Executing task b:meme with ID: 8c99db59b7e54ea6b7ee59c1b64ae9e5
Start 2025-02-27 21:27:10.101450
Finish 2025-02-27 21:27:11.102767

As I understood the question, this is exact behavior you are looking for.

P.S. Sorry for such a late reply. And thanks for your warm words about the lib.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants