diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..ded697d --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +SHELL := bash + +PYTHONPATH = . + +.PHONY: test +test: + pytest --cov=audit_log -vv tests + + +.PHONY: fmt +fmt: ## Format the source code using pre-commit hooks + pre-commit run --all-files diff --git a/requirements-dev.txt b/requirements-dev.txt index 3d2fad5..afa1923 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,5 +1,8 @@ black +freezegun mypy pip-tools pre-commit +pytest +pytest-cov ruff diff --git a/tests/test_headers.py b/tests/test_headers.py new file mode 100644 index 0000000..21c1e06 --- /dev/null +++ b/tests/test_headers.py @@ -0,0 +1,80 @@ +import pytest + +from audit_log.exceptions import AuditPrincipalError +from audit_log.headers import ( + ParsedSPIFFE, + Principal, + PrincipalType, + get_principal_from_headers, + parse_spiffe, +) + +VALID_SPIFFE_HEADER = "URI=spiffe://example.com/ns/namespace/sa/service-account" +INVALID_SPIFFE_HEADER = "URI=invalid-uri" +VALID_SPIFFE_HEADER_INVALID_PATH = ( + "URI=spiffe://example.com//namespace/sa/service-account" +) + +VALID_HEADERS = { + "x-forwarded-client-cert": VALID_SPIFFE_HEADER, +} +INVALID_HEADERS = {} + +VALID_JWT_HEADERS = { + "x-jwt-claim-sub": "user123", + "x-jwt-claim-iss": "example.com", + "x-jwt-claim-sub-type": "USER", +} +INVALID_JWT_HEADERS = { + "x-jwt-claim-sub": "user123", + "x-jwt-claim-iss": "example.com", + "x-jwt-claim-sub-type": "TEST", +} + + +def test_parse_spiffe_valid(): + parsed_spiffe = parse_spiffe(VALID_SPIFFE_HEADER) + assert isinstance(parsed_spiffe, ParsedSPIFFE) + assert parsed_spiffe.domain == "example.com" + assert parsed_spiffe.namespace == "namespace" + assert parsed_spiffe.service_account == "service-account" + assert ( + parsed_spiffe.spiffe_id + == "spiffe://example.com/ns/namespace/sa/service-account" + ) + + +def test_parse_spiffe_invalid(): + with pytest.raises(ValueError, match="Invalid SPIFFE header"): + parse_spiffe(INVALID_SPIFFE_HEADER) + + +def test_parse_spiffe_invalid_header(): + with pytest.raises(ValueError, match="Invalid SPIFFE header"): + parse_spiffe(VALID_SPIFFE_HEADER_INVALID_PATH) + + +def test_get_principal_from_headers_valid(): + principal = get_principal_from_headers(VALID_HEADERS) + assert isinstance(principal, Principal) + assert principal.type == PrincipalType.SERVICE + assert principal.authority == "example.com" + assert principal.id == "spiffe://example.com/ns/namespace/sa/service-account" + + +def test_get_principal_from_headers_invalid(): + with pytest.raises(AuditPrincipalError, match="Invalid SPIFFE header"): + get_principal_from_headers(INVALID_HEADERS) + + +def test_get_principal_from_headers_with_jwt_valid(): + principal = get_principal_from_headers(VALID_JWT_HEADERS) + assert isinstance(principal, Principal) + assert principal.type == PrincipalType.USER + assert principal.authority == "example.com" + assert principal.id == "user123" + + +def test_get_principal_from_headers_with_jwt_invalid(): + with pytest.raises(AuditPrincipalError, match="Invalid JWT headers"): + get_principal_from_headers(INVALID_JWT_HEADERS) diff --git a/tests/test_log.py b/tests/test_log.py new file mode 100644 index 0000000..6d8c3ac --- /dev/null +++ b/tests/test_log.py @@ -0,0 +1,83 @@ +import pytest +from freezegun import freeze_time + +from audit_log.log import log, to_serializable +from audit_log.schema import ActionType, OutcomeResult, Principal, PrincipalType + + +@pytest.mark.parametrize( + ("value", "expected_result"), + [ + (123, "123"), + ({"key": "value"}, "{'key': 'value'}"), + ], +) +def test_to_serializable_default(value, expected_result): + assert to_serializable(value) == expected_result + + +def test_to_serializable_set(): + value = {1, 2, 3} + assert to_serializable(value) == [1, 2, 3] + + +def test_to_serializable_exception(): + val = ValueError("test") + assert to_serializable(val) == repr(val) + + +@pytest.mark.parametrize( + ( + "action_type", + "resource_type", + "resource_id", + "result", + "request_id", + "outcome_reason", + "principal", + "expected_log", + ), + [ + ( + ActionType.CREATE, + "test", + "123e4567-e89b-12d3-a456-426614174000", + OutcomeResult.SUCCEEDED, + "123e4567-e89b-12d3-a456-426614174000", + "Some reason", + Principal( + type=PrincipalType.USER, + authority="respect_mine", + id="test.user@test.com", + ), + '{"type": "audit-log", "timestamp": "2022-04-20T12:00:00+00:00", "level": "INFO", "version": 1, ' + '"resource": {"type": "test", "id": "123e4567-e89b-12d3-a456-426614174000"}, "action": {"type": ' + '"CREATE"}, "outcome": {"result": "SUCCEEDED", "reason": "Some reason", "before": null, ' + '"after": null}, "context": {"request": {"id": "123e4567-e89b-12d3-a456-426614174000"}}, ' + '"principal": {"type": "USER", "authority": "respect_mine", "id": "test.user@test.com"}}', + ), + ], +) +def test_log( + action_type, + resource_type, + resource_id, + result, + request_id, + outcome_reason, + principal, + expected_log, + capsys, +): + with freeze_time("2022-04-20T12:00:00+00:00"): + log( + action_type=action_type, + resource_type=resource_type, + resource_id=resource_id, + result=result, + request_id=request_id, + outcome_reason=outcome_reason, + principal=principal, + ) + printed_message = capsys.readouterr().out.strip() + assert printed_message == expected_log