Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
from airflow.utils.decorators import fixup_decorator_warning_stack
from airflow.utils.edgemodifier import EdgeModifier
from airflow.utils.helpers import validate_instance_args, validate_key
from airflow.utils.log.secrets_masker import redact
from airflow.utils.operator_helpers import ExecutionCallableRunner
from airflow.utils.operator_resources import Resources
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down Expand Up @@ -958,12 +959,12 @@ def __init__(
if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
raise AirflowException(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
f"Invalid arguments were:\n**kwargs: {kwargs}",
f"Invalid arguments were:\n**kwargs: {redact(kwargs)}",
)
warnings.warn(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
"Support for passing such arguments will be dropped in future. "
f"Invalid arguments were:\n**kwargs: {kwargs}",
f"Invalid arguments were:\n**kwargs: {redact(kwargs)}",
category=RemovedInAirflow3Warning,
stacklevel=3,
)
Expand Down
17 changes: 17 additions & 0 deletions tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,23 @@ def test_logging_propogated_by_default(self, caplog):
# leaking a lot of state)
assert caplog.messages == ["test"]

@mock.patch("airflow.models.baseoperator.redact")
def test_illegal_args_with_secrets(self, mock_redact):
"""
Tests that operators on illegal arguments with secrets are correctly masked.
"""
secret = "secretP4ssw0rd!"
mock_redact.side_effect = ["***"]

msg = r"Invalid arguments were passed to BaseOperator"
with pytest.raises(AirflowException, match=msg) as exc_info:
BaseOperator(
task_id="test_illegal_args",
secret_argument=secret,
)
assert "***" in str(exc_info.value)
assert secret not in str(exc_info.value)

def test_invalid_type_for_default_arg(self):
error_msg = "'max_active_tis_per_dag' has an invalid type <class 'str'> with value not_an_int, expected type is <class 'int'>"
with pytest.raises(TypeError, match=error_msg):
Expand Down
Loading