Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion task-sdk/src/airflow/sdk/bases/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from airflow.exceptions import RemovedInAirflow4Warning
from airflow.sdk import TriggerRule, timezone
from airflow.sdk._shared.secrets_masker import redact
from airflow.sdk.definitions._internal.abstractoperator import (
DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST,
DEFAULT_OWNER,
Expand Down Expand Up @@ -1050,7 +1051,7 @@ def __init__(
if kwargs:
raise TypeError(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
f"Invalid arguments were:\n**kwargs: {kwargs}",
f"Invalid arguments were:\n**kwargs: {redact(kwargs)}",
)
validate_key(self.task_id)

Expand Down
17 changes: 17 additions & 0 deletions task-sdk/tests/task_sdk/bases/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,23 @@ def test_illegal_args_forbidden(self):
illegal_argument_1234="hello?",
)

@mock.patch("airflow.sdk.bases.operator.redact")
def test_illegal_args_with_secrets(self, mock_redact):
"""
Tests that operators on illegal arguments with secrets are correctly masked.
"""
secret = "secretP4ssw0rd!"
mock_redact.side_effect = ["***"]

msg = r"Invalid arguments were passed to BaseOperator"
with pytest.raises(TypeError, match=msg) as exc_info:
BaseOperator(
task_id="test_illegal_args",
secret_argument=secret,
)
assert "***" in str(exc_info.value)
assert secret not in str(exc_info.value)

def test_invalid_type_for_default_arg(self):
error_msg = "'max_active_tis_per_dag' for task 'test' expects <class 'int'>, got <class 'str'> with value 'not_an_int'"
with pytest.raises(TypeError, match=error_msg):
Expand Down