-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add unit tests * fix PR comments
- Loading branch information
Showing
4 changed files
with
178 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
SHELL := bash | ||
|
||
PYTHONPATH = . | ||
|
||
.PHONY: test | ||
test: | ||
pytest --cov=audit_log -vv tests | ||
|
||
|
||
.PHONY: fmt | ||
fmt: ## Format the source code using pre-commit hooks | ||
pre-commit run --all-files |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,8 @@ | ||
black | ||
freezegun | ||
mypy | ||
pip-tools | ||
pre-commit | ||
pytest | ||
pytest-cov | ||
ruff |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
import pytest | ||
|
||
from audit_log.exceptions import AuditPrincipalError | ||
from audit_log.headers import ( | ||
ParsedSPIFFE, | ||
Principal, | ||
PrincipalType, | ||
get_principal_from_headers, | ||
parse_spiffe, | ||
) | ||
|
||
VALID_SPIFFE_HEADER = "URI=spiffe://example.com/ns/namespace/sa/service-account" | ||
INVALID_SPIFFE_HEADER = "URI=invalid-uri" | ||
VALID_SPIFFE_HEADER_INVALID_PATH = ( | ||
"URI=spiffe://example.com//namespace/sa/service-account" | ||
) | ||
|
||
VALID_HEADERS = { | ||
"x-forwarded-client-cert": VALID_SPIFFE_HEADER, | ||
} | ||
INVALID_HEADERS = {} | ||
|
||
VALID_JWT_HEADERS = { | ||
"x-jwt-claim-sub": "user123", | ||
"x-jwt-claim-iss": "example.com", | ||
"x-jwt-claim-sub-type": "USER", | ||
} | ||
INVALID_JWT_HEADERS = { | ||
"x-jwt-claim-sub": "user123", | ||
"x-jwt-claim-iss": "example.com", | ||
"x-jwt-claim-sub-type": "TEST", | ||
} | ||
|
||
|
||
def test_parse_spiffe_valid(): | ||
parsed_spiffe = parse_spiffe(VALID_SPIFFE_HEADER) | ||
assert isinstance(parsed_spiffe, ParsedSPIFFE) | ||
assert parsed_spiffe.domain == "example.com" | ||
assert parsed_spiffe.namespace == "namespace" | ||
assert parsed_spiffe.service_account == "service-account" | ||
assert ( | ||
parsed_spiffe.spiffe_id | ||
== "spiffe://example.com/ns/namespace/sa/service-account" | ||
) | ||
|
||
|
||
def test_parse_spiffe_invalid(): | ||
with pytest.raises(ValueError, match="Invalid SPIFFE header"): | ||
parse_spiffe(INVALID_SPIFFE_HEADER) | ||
|
||
|
||
def test_parse_spiffe_invalid_header(): | ||
with pytest.raises(ValueError, match="Invalid SPIFFE header"): | ||
parse_spiffe(VALID_SPIFFE_HEADER_INVALID_PATH) | ||
|
||
|
||
def test_get_principal_from_headers_valid(): | ||
principal = get_principal_from_headers(VALID_HEADERS) | ||
assert isinstance(principal, Principal) | ||
assert principal.type == PrincipalType.SERVICE | ||
assert principal.authority == "example.com" | ||
assert principal.id == "spiffe://example.com/ns/namespace/sa/service-account" | ||
|
||
|
||
def test_get_principal_from_headers_invalid(): | ||
with pytest.raises(AuditPrincipalError, match="Invalid SPIFFE header"): | ||
get_principal_from_headers(INVALID_HEADERS) | ||
|
||
|
||
def test_get_principal_from_headers_with_jwt_valid(): | ||
principal = get_principal_from_headers(VALID_JWT_HEADERS) | ||
assert isinstance(principal, Principal) | ||
assert principal.type == PrincipalType.USER | ||
assert principal.authority == "example.com" | ||
assert principal.id == "user123" | ||
|
||
|
||
def test_get_principal_from_headers_with_jwt_invalid(): | ||
with pytest.raises(AuditPrincipalError, match="Invalid JWT headers"): | ||
get_principal_from_headers(INVALID_JWT_HEADERS) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
import pytest | ||
from freezegun import freeze_time | ||
|
||
from audit_log.log import log, to_serializable | ||
from audit_log.schema import ActionType, OutcomeResult, Principal, PrincipalType | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("value", "expected_result"), | ||
[ | ||
(123, "123"), | ||
({"key": "value"}, "{'key': 'value'}"), | ||
], | ||
) | ||
def test_to_serializable_default(value, expected_result): | ||
assert to_serializable(value) == expected_result | ||
|
||
|
||
def test_to_serializable_set(): | ||
value = {1, 2, 3} | ||
assert to_serializable(value) == [1, 2, 3] | ||
|
||
|
||
def test_to_serializable_exception(): | ||
val = ValueError("test") | ||
assert to_serializable(val) == repr(val) | ||
|
||
|
||
@pytest.mark.parametrize( | ||
( | ||
"action_type", | ||
"resource_type", | ||
"resource_id", | ||
"result", | ||
"request_id", | ||
"outcome_reason", | ||
"principal", | ||
"expected_log", | ||
), | ||
[ | ||
( | ||
ActionType.CREATE, | ||
"test", | ||
"123e4567-e89b-12d3-a456-426614174000", | ||
OutcomeResult.SUCCEEDED, | ||
"123e4567-e89b-12d3-a456-426614174000", | ||
"Some reason", | ||
Principal( | ||
type=PrincipalType.USER, | ||
authority="respect_mine", | ||
id="test.user@test.com", | ||
), | ||
'{"type": "audit-log", "timestamp": "2022-04-20T12:00:00+00:00", "level": "INFO", "version": 1, ' | ||
'"resource": {"type": "test", "id": "123e4567-e89b-12d3-a456-426614174000"}, "action": {"type": ' | ||
'"CREATE"}, "outcome": {"result": "SUCCEEDED", "reason": "Some reason", "before": null, ' | ||
'"after": null}, "context": {"request": {"id": "123e4567-e89b-12d3-a456-426614174000"}}, ' | ||
'"principal": {"type": "USER", "authority": "respect_mine", "id": "test.user@test.com"}}', | ||
), | ||
], | ||
) | ||
def test_log( | ||
action_type, | ||
resource_type, | ||
resource_id, | ||
result, | ||
request_id, | ||
outcome_reason, | ||
principal, | ||
expected_log, | ||
capsys, | ||
): | ||
with freeze_time("2022-04-20T12:00:00+00:00"): | ||
log( | ||
action_type=action_type, | ||
resource_type=resource_type, | ||
resource_id=resource_id, | ||
result=result, | ||
request_id=request_id, | ||
outcome_reason=outcome_reason, | ||
principal=principal, | ||
) | ||
printed_message = capsys.readouterr().out.strip() | ||
assert printed_message == expected_log |