Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,38 @@ def _record_attrs_to_ignore(self) -> Iterable[str]:
)
return frozenset(record.__dict__).difference({"msg", "args"})

def _redact_exception_with_context(self, exception):
def _redact_exception_with_context_or_cause(self, exception, visited=None):
# Exception class may not be modifiable (e.g. declared by an
# extension module such as JDBC).
with contextlib.suppress(AttributeError):
exception.args = (self.redact(v) for v in exception.args)
if exception.__context__:
self._redact_exception_with_context(exception.__context__)
if exception.__cause__ and exception.__cause__ is not exception.__context__:
self._redact_exception_with_context(exception.__cause__)
if visited is None:
visited = set()

if id(exception) in visited:
# already visited - it was redacted earlier
return exception

# Check depth before adding to visited to ensure we skip exceptions beyond the limit
if len(visited) >= self.MAX_RECURSION_DEPTH:
return RuntimeError(
f"Stack trace redaction hit recursion limit of {self.MAX_RECURSION_DEPTH} "
f"when processing exception of type {type(exception).__name__}. "
f"The remaining exceptions will be skipped to avoid "
f"infinite recursion and protect against revealing sensitive information."
)

visited.add(id(exception))

exception.args = tuple(self.redact(v) for v in exception.args)
if exception.__context__:
exception.__context__ = self._redact_exception_with_context_or_cause(
exception.__context__, visited
)
if exception.__cause__ and exception.__cause__ is not exception.__context__:
exception.__cause__ = self._redact_exception_with_context_or_cause(
exception.__cause__, visited
)
return exception

def filter(self, record) -> bool:
if not self.is_log_masking_enabled():
Expand All @@ -285,7 +308,7 @@ def filter(self, record) -> bool:
record.__dict__[k] = self.redact(v)
if record.exc_info and record.exc_info[1] is not None:
exc = record.exc_info[1]
self._redact_exception_with_context(exc)
self._redact_exception_with_context_or_cause(exc)
record.__dict__[self.ALREADY_FILTERED_FLAG] = True

return True
Expand Down
339 changes: 339 additions & 0 deletions shared/secrets_masker/tests/secrets_masker/test_secrets_masker.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,345 @@ def test_masking_in_explicit_context_exceptions(self, logger, caplog):
"""
)

def test_redact_exception_with_context_simple(self):
"""
Test _redact_exception_with_context_or_cause with a simple exception without context or cause.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

exc = RuntimeError(f"Cannot connect to user:{PASSWORD}")
masker._redact_exception_with_context_or_cause(exc)

assert "password" not in str(exc.args[0])
assert "user:***" in str(exc.args[0])

def test_redact_exception_with_implicit_context(self):
"""
Test _redact_exception_with_context with exception __context__ (implicit chaining).
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

try:
try:
raise RuntimeError(f"Inner error with {PASSWORD}")
except RuntimeError:
raise RuntimeError(f"Outer error with {PASSWORD}")
except RuntimeError as exc:
captured_exc = exc

masker._redact_exception_with_context_or_cause(captured_exc)
assert "password" not in str(captured_exc.args[0])
assert "password" not in str(captured_exc.__context__.args[0])

def test_redact_exception_with_explicit_cause(self):
"""
Test _redact_exception_with_context with exception __cause__ (explicit chaining).
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

try:
inner = RuntimeError(f"Cause error: {PASSWORD}")
raise RuntimeError(f"Main error: {PASSWORD}") from inner
except RuntimeError as exc:
captured_exc = exc

masker._redact_exception_with_context_or_cause(captured_exc)
assert "password" not in str(captured_exc.args[0])
assert "password" not in str(captured_exc.__cause__.args[0])

def test_redact_exception_with_circular_context_reference(self):
"""
Test _redact_exception_with_context handles circular references without infinite recursion.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

exc1 = RuntimeError(f"Error with {PASSWORD}")
exc2 = RuntimeError(f"Another error with {PASSWORD}")
# Create circular reference
exc1.__context__ = exc2
exc2.__context__ = exc1

# Should not raise RecursionError
masker._redact_exception_with_context_or_cause(exc1)

assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])

def test_redact_exception_with_max_context_recursion_depth(self):
"""
Test _redact_exception_with_context respects MAX_RECURSION_DEPTH.
Once the depth limit is reached, the remaining exception chain is not traversed;
instead, it is truncated and replaced with a sentinel exception indicating the
recursion limit was hit, and further chaining is dropped.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Create a long chain of exceptions
exc_chain = RuntimeError(f"Error 0 with {PASSWORD}")
current = exc_chain
for i in range(1, 10):
new_exc = RuntimeError(f"Error {i} with {PASSWORD}")
current.__context__ = new_exc
current = new_exc

masker._redact_exception_with_context_or_cause(exc_chain)

# Verify redaction happens up to MAX_RECURSION_DEPTH
# The check is `len(visited) >= MAX_RECURSION_DEPTH` before adding current exception
# So it processes exactly MAX_RECURSION_DEPTH exceptions (0 through MAX_RECURSION_DEPTH-1)
current = exc_chain
for i in range(10):
assert current, "We should always get some exception here"
if i < masker.MAX_RECURSION_DEPTH:
# Should be redacted within the depth limit
assert "password" not in str(current.args[0]), f"Exception {i} should be redacted"
else:
assert "hit recursion limit" in str(current.args[0]), (
f"Exception {i} should indicate recursion depth limit hit"
)
assert "password" not in str(current.args[0]), f"Exception {i} should not be present"
assert current.__context__ is None, (
f"Exception {i} should not have a context due to depth limit"
)
break
current = current.__context__

def test_redact_exception_with_circular_cause_reference(self):
"""
Test _redact_exception_with_context_or_cause handles circular __cause__ references without infinite recursion.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

exc1 = RuntimeError(f"Error with {PASSWORD}")
exc2 = RuntimeError(f"Another error with {PASSWORD}")
# Create circular reference using __cause__
exc1.__cause__ = exc2
exc2.__cause__ = exc1

# Should not raise RecursionError
masker._redact_exception_with_context_or_cause(exc1)

assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])

def test_redact_exception_with_max_cause_recursion_depth(self):
"""
Test _redact_exception_with_context_or_cause respects MAX_RECURSION_DEPTH for __cause__ chains.
Exceptions beyond the depth limit should be skipped (not redacted).
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Create a long chain of exceptions using __cause__
exc_chain = RuntimeError(f"Error 0 with {PASSWORD}")
current = exc_chain
for i in range(1, 10):
new_exc = RuntimeError(f"Error {i} with {PASSWORD}")
current.__cause__ = new_exc
current = new_exc

masker._redact_exception_with_context_or_cause(exc_chain)

# Verify redaction happens up to MAX_RECURSION_DEPTH
# The check is `len(visited) >= MAX_RECURSION_DEPTH` before adding current exception
# So it processes exactly MAX_RECURSION_DEPTH exceptions (0 through MAX_RECURSION_DEPTH-1)
current = exc_chain
for i in range(10):
assert current, "We should always get some exception here"
if i < masker.MAX_RECURSION_DEPTH:
# Should be redacted within the depth limit
assert "password" not in str(current.args[0]), f"Exception {i} should be redacted"
else:
assert "hit recursion limit" in str(current.args[0]), (
f"Exception {i} should indicate recursion depth limit hit"
)
assert "password" not in str(current.args[0]), f"Exception {i} should not be present"
assert current.__cause__ is None, f"Exception {i} should not have a cause due to depth limit"
break
current = current.__cause__

def test_redact_exception_with_mixed_cause_and_context_linear(self):
"""
Test _redact_exception_with_context_or_cause with mixed __cause__ and __context__ in a linear chain.
This simulates: exception with cause, which has context, which has cause, etc.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Build a chain: exc1 -> (cause) -> exc2 -> (context) -> exc3 -> (cause) -> exc4
exc4 = RuntimeError(f"Error 4 with {PASSWORD}")
exc3 = RuntimeError(f"Error 3 with {PASSWORD}")
exc3.__cause__ = exc4
exc2 = RuntimeError(f"Error 2 with {PASSWORD}")
exc2.__context__ = exc3
exc1 = RuntimeError(f"Error 1 with {PASSWORD}")
exc1.__cause__ = exc2

masker._redact_exception_with_context_or_cause(exc1)

# All exceptions should be redacted
assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])
assert "password" not in str(exc3.args[0])
assert "password" not in str(exc4.args[0])

def test_redact_exception_with_mixed_cause_and_context_branching(self):
"""
Test with an exception that has both __cause__ and __context__ pointing to different exceptions.
This creates a branching structure.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Create branching structure:
# exc1
# / \
# cause context
# | |
# exc2 exc3
exc2 = RuntimeError(f"Cause error with {PASSWORD}")
exc3 = RuntimeError(f"Context error with {PASSWORD}")
exc1 = RuntimeError(f"Main error with {PASSWORD}")
exc1.__cause__ = exc2
exc1.__context__ = exc3

masker._redact_exception_with_context_or_cause(exc1)

# All three should be redacted
assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])
assert "password" not in str(exc3.args[0])

def test_redact_exception_with_mixed_circular_reference(self):
"""
Test with circular references involving both __cause__ and __context__.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Create circular mixed reference: exc1 -cause-> exc2 -context-> exc1
exc1 = RuntimeError(f"Error 1 with {PASSWORD}")
exc2 = RuntimeError(f"Error 2 with {PASSWORD}")
exc1.__cause__ = exc2
exc2.__context__ = exc1

# Should not raise RecursionError
masker._redact_exception_with_context_or_cause(exc1)

assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])

def test_redact_exception_with_mixed_deep_chain(self):
"""
Test with a deep chain alternating between __cause__ and __context__.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

# Create alternating chain exceeding MAX_RECURSION_DEPTH
exceptions = [RuntimeError(f"Error {i} with {PASSWORD}") for i in range(10)]

# Link them: 0 -cause-> 1 -context-> 2 -cause-> 3 -context-> 4 ...
for i in range(len(exceptions) - 1):
if i % 2 == 0:
exceptions[i].__cause__ = exceptions[i + 1]
else:
exceptions[i].__context__ = exceptions[i + 1]

masker._redact_exception_with_context_or_cause(exceptions[0])

# Check that first MAX_RECURSION_DEPTH are redacted, rest hit the limit
for i in range(min(len(exceptions), masker.MAX_RECURSION_DEPTH)):
assert "password" not in str(exceptions[i].args[0]), f"Exception {i} should be redacted"

def test_redact_exception_with_mixed_diamond_structure(self):
"""
Test with diamond structure: top exception has both cause and context that converge to same exception.
exc1
/ \
cause context
| |
exc2 exc3
\\ /
exc4
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

exc4 = RuntimeError(f"Bottom error with {PASSWORD}")
exc2 = RuntimeError(f"Left error with {PASSWORD}")
exc2.__cause__ = exc4
exc3 = RuntimeError(f"Right error with {PASSWORD}")
exc3.__context__ = exc4
exc1 = RuntimeError(f"Top error with {PASSWORD}")
exc1.__cause__ = exc2
exc1.__context__ = exc3

masker._redact_exception_with_context_or_cause(exc1)

# All should be redacted, exc4 should be visited only once
assert "password" not in str(exc1.args[0])
assert "password" not in str(exc2.args[0])
assert "password" not in str(exc3.args[0])
assert "password" not in str(exc4.args[0])

def test_redact_exception_with_immutable_args(self):
"""
Test _redact_exception_with_context handles exceptions with immutable args gracefully.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

class ImmutableException(Exception):
@property
def args(self):
return (f"Immutable error with {PASSWORD}",)

exc = ImmutableException()
# Should not raise AttributeError
masker._redact_exception_with_context_or_cause(exc)
# Note: Since args is immutable, it won't be redacted, but the method shouldn't crash

def test_redact_exception_with_same_cause_and_context(self):
"""
Test _redact_exception_with_context when __cause__ is same as __context__.
"""
masker = SecretsMasker()
configure_secrets_masker_for_test(masker)
masker.add_mask(PASSWORD)

try:
inner = RuntimeError(f"Base error: {PASSWORD}")
raise RuntimeError(f"Derived error: {PASSWORD}") from inner
except RuntimeError as exc:
# Make __context__ same as __cause__
exc.__context__ = exc.__cause__
captured_exc = exc

masker._redact_exception_with_context_or_cause(captured_exc)
assert "password" not in str(captured_exc.args[0])
assert "password" not in str(captured_exc.__cause__.args[0])
# Should only process __cause__ once (optimization check)

@pytest.mark.parametrize(
("name", "value", "expected_mask"),
[
Expand Down