Skip to content

Commit 04e3185

Browse files
✨ Added the --app-dir param to specify the application/sources directory (#490)
Co-authored-by: Pavel Kirilin <win10@list.ru>
1 parent 4cbdd31 commit 04e3185

File tree

5 files changed

+30
-5
lines changed

5 files changed

+30
-5
lines changed

taskiq/cli/scheduler/args.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ class SchedulerArgs:
1212

1313
scheduler: Union[str, TaskiqScheduler]
1414
modules: List[str]
15+
app_dir: Optional[str] = None
1516
log_level: LogLevel = LogLevel.INFO
1617
configure_logging: bool = True
1718
fs_discover: bool = False
@@ -42,6 +43,16 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
4243
help="List of modules where to look for tasks.",
4344
nargs=ZERO_OR_MORE,
4445
)
46+
parser.add_argument(
47+
"--app-dir",
48+
"-d",
49+
default=None,
50+
help=(
51+
"Path to application directory. "
52+
"This path will be used to import tasks modules. "
53+
"If not specified, current working directory will be used."
54+
),
55+
)
4556
parser.add_argument(
4657
"--fs-discover",
4758
"-fsd",

taskiq/cli/scheduler/run.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ async def run_scheduler(args: SchedulerArgs) -> None:
249249
getLogger("taskiq").setLevel(level=args.log_level)
250250

251251
if isinstance(args.scheduler, str):
252-
scheduler = import_object(args.scheduler)
252+
scheduler = import_object(args.scheduler, app_dir=args.app_dir)
253253
if inspect.isfunction(scheduler):
254254
scheduler = scheduler()
255255
else:

taskiq/cli/utils.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from importlib import import_module
55
from logging import getLogger
66
from pathlib import Path
7-
from typing import Any, Generator, List, Sequence, Union
7+
from typing import Any, Generator, List, Sequence, Union, Optional
88

99
logger = getLogger("taskiq.worker")
1010

@@ -35,18 +35,21 @@ def add_cwd_in_path() -> Generator[None, None, None]:
3535
logger.warning(f"Cannot remove '{cwd}' from sys.path")
3636

3737

38-
def import_object(object_spec: str) -> Any:
38+
def import_object(object_spec: str, app_dir: Optional[str] = None) -> Any:
3939
"""
4040
It parses python object spec and imports it.
4141
4242
:param object_spec: string in format like `package.module:variable`
43+
:param app_dir: directory to add in sys.path for importing.
4344
:raises ValueError: if spec has unknown format.
4445
:returns: imported broker.
4546
"""
4647
import_spec = object_spec.split(":")
4748
if len(import_spec) != 2:
4849
raise ValueError("You should provide object path in `module:variable` format.")
4950
with add_cwd_in_path():
51+
if app_dir:
52+
sys.path.insert(0, app_dir)
5053
module = import_module(import_spec[0])
5154
return getattr(module, import_spec[1])
5255

taskiq/cli/worker/args.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ class WorkerArgs:
2626

2727
broker: str
2828
modules: List[str]
29+
app_dir: Optional[str] = None
2930
tasks_pattern: Sequence[str] = ("**/tasks.py",)
3031
fs_discover: bool = False
3132
configure_logging: bool = True
@@ -73,6 +74,16 @@ def from_cli(
7374
"'module.module:variable' format."
7475
),
7576
)
77+
parser.add_argument(
78+
"--app-dir",
79+
"-d",
80+
default=None,
81+
help=(
82+
"Path to application directory. "
83+
"This path will be used to import tasks modules. "
84+
"If not specified, current working directory will be used."
85+
),
86+
)
7687
parser.add_argument(
7788
"--receiver",
7889
default="taskiq.receiver:Receiver",

taskiq/cli/worker/run.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]:
6464
:raises ValueError: if receiver is not a Receiver type.
6565
:return: Receiver type.
6666
"""
67-
receiver_type = import_object(args.receiver)
67+
receiver_type = import_object(args.receiver, app_dir=args.app_dir)
6868
if not (isinstance(receiver_type, type) and issubclass(receiver_type, Receiver)):
6969
raise ValueError("Unknown receiver type. Please use Receiver class.")
7070
return receiver_type
@@ -133,7 +133,7 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
133133
# We must set this field before importing tasks,
134134
# so broker will remember all tasks it's related to.
135135

136-
broker = import_object(args.broker)
136+
broker = import_object(args.broker, app_dir=args.app_dir)
137137
if inspect.isfunction(broker):
138138
broker = broker()
139139
if not isinstance(broker, AsyncBroker):

0 commit comments

Comments
 (0)