-
Notifications
You must be signed in to change notification settings - Fork 51
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
Showing
3 changed files
with
264 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,256 @@ | ||
# flake8: noqa: W293 | ||
import itertools | ||
import textwrap | ||
from unittest.mock import Mock | ||
|
||
import pytest | ||
from passlib.handlers.md5_crypt import md5_crypt | ||
from pytest import raises | ||
|
||
from irrd.conf import PASSWORD_HASH_DUMMY_VALUE | ||
from irrd.rpki.status import RPKIStatus | ||
from irrd.rpsl.rpsl_objects import rpsl_object_from_text | ||
from irrd.scopefilter.status import ScopeFilterStatus | ||
from irrd.scopefilter.validators import ScopeFilterValidator | ||
from irrd.storage.models import JournalEntryOrigin | ||
from irrd.utils.rpsl_samples import SAMPLE_INETNUM, SAMPLE_AS_SET, SAMPLE_MNTNER_CRYPT, SAMPLE_PERSON, SAMPLE_MNTNER, \ | ||
SAMPLE_ROUTE, SAMPLE_MNTNER_MD5 | ||
from irrd.utils.test_utils import flatten_mock_calls | ||
from irrd.utils.text import splitline_unicodesafe, remove_auth_hashes | ||
from ..parser import parse_change_requests | ||
from ..parser_state import UpdateRequestType, UpdateRequestStatus | ||
from ..validators import ReferenceValidator, AuthValidator, ValidatorResult | ||
|
||
VALID_PW = 'override-password' | ||
INVALID_PW = 'not-override-password' | ||
VALID_PW_HASH = '$1$J6KycItM$MbPaBU6iFSGFV299Rk7Di0' | ||
|
||
|
||
@pytest.fixture() | ||
def prepare_mocks(monkeypatch): | ||
mock_dh = Mock() | ||
mock_dq = Mock() | ||
monkeypatch.setattr('irrd.updates.parser.RPSLDatabaseQuery', lambda: mock_dq) | ||
monkeypatch.setattr('irrd.updates.validators.RPSLDatabaseQuery', lambda: mock_dq) | ||
|
||
validator = AuthValidator(mock_dh, None) | ||
yield validator, mock_dq, mock_dh | ||
|
||
|
||
class TestAuthValidatorOverride: | ||
def test_valid_override(self, prepare_mocks, config_override): | ||
config_override({ | ||
'auth': {'override_password': VALID_PW_HASH}, | ||
}) | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
person = rpsl_object_from_text(SAMPLE_PERSON) | ||
|
||
validator.overrides = [VALID_PW] | ||
result = validator.process_auth(person, None) | ||
assert result.is_valid(), result.error_messages | ||
assert result.used_override | ||
|
||
person = rpsl_object_from_text(SAMPLE_PERSON) | ||
result = validator.process_auth(person, person) | ||
assert result.is_valid(), result.error_messages | ||
assert result.used_override | ||
|
||
def test_invalid_or_missing_override(self, prepare_mocks, config_override): | ||
# This test mostly ignores the regular process that happens | ||
# after override validation fails. | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
mock_dh.execute_query = lambda q: [] | ||
person = rpsl_object_from_text(SAMPLE_PERSON) | ||
|
||
validator.overrides = [VALID_PW] | ||
result = validator.process_auth(person, None) | ||
assert not result.is_valid() | ||
assert not result.used_override | ||
|
||
config_override({ | ||
'auth': {'override_password': VALID_PW_HASH}, | ||
}) | ||
validator.overrides = [] | ||
result = validator.process_auth(person, None) | ||
assert not result.is_valid() | ||
assert not result.used_override | ||
|
||
validator.overrides = [INVALID_PW] | ||
result = validator.process_auth(person, None) | ||
assert not result.is_valid() | ||
assert not result.used_override | ||
|
||
config_override({ | ||
'auth': {'override_password': 'not-valid-hash'}, | ||
}) | ||
person = rpsl_object_from_text(SAMPLE_PERSON) | ||
result = validator.process_auth(person, None) | ||
assert not result.is_valid() | ||
assert not result.used_override | ||
|
||
|
||
class TestAuthValidator: | ||
def test_valid_new_person(self, prepare_mocks): | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
person = rpsl_object_from_text(SAMPLE_PERSON) | ||
mock_dh.execute_query = lambda q: [ | ||
{'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, | ||
] | ||
|
||
validator.passwords = [SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(person, None) | ||
assert result.is_valid(), result.error_messages | ||
assert not result.used_override | ||
assert len(result.mntners_notify) == 1 | ||
assert result.mntners_notify[0].pk() == 'TEST-MNT' | ||
|
||
assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ | ||
['sources', (['TEST'],), {}], | ||
['object_classes', (['mntner'],), {}], | ||
['rpsl_pks', ({'TEST-MNT'},), {}], | ||
] | ||
|
||
def test_existing_person_mntner_change(self, prepare_mocks): | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
# TEST-MNT is in both maintainers | ||
person_new = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-NEW-MNT\n') | ||
person_old = rpsl_object_from_text(SAMPLE_PERSON + 'mnt-by: TEST-OLD-MNT\n') | ||
query_results = itertools.cycle([ | ||
[ | ||
{ | ||
'object_class': 'mntner', | ||
'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-NEW-MNT').replace('MD5', 'nomd5') | ||
}, { | ||
'object_class': 'mntner', | ||
'object_text': SAMPLE_MNTNER.replace('MD5', 'nomd5').replace('CRYPT', 'nocrypt') | ||
}, | ||
], | ||
[ | ||
{ | ||
'object_class': 'mntner', | ||
'object_text': SAMPLE_MNTNER.replace('TEST-MNT', 'TEST-OLD-MNT').replace('CRYPT', 'nocrypt') | ||
}, | ||
], | ||
]) | ||
mock_dh.execute_query = lambda q: next(query_results) | ||
|
||
validator.passwords = [SAMPLE_MNTNER_CRYPT, SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(person_new, person_old) | ||
|
||
assert result.is_valid(), result.error_messages | ||
assert not result.used_override | ||
assert {m.pk() for m in result.mntners_notify} == {'TEST-MNT', 'TEST-OLD-MNT'} | ||
|
||
assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ | ||
['sources', (['TEST'],), {}], | ||
['object_classes', (['mntner'],), {}], | ||
['rpsl_pks', ({'TEST-MNT', 'TEST-NEW-MNT'},), {}], | ||
['sources', (['TEST'],), {}], | ||
['object_classes', (['mntner'],), {}], | ||
['rpsl_pks', ({'TEST-OLD-MNT'},), {}], # TEST-MNT is cached | ||
] | ||
|
||
validator.passwords = [SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(person_new, person_old) | ||
assert not result.is_valid() | ||
print(result.error_messages) | ||
assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' | ||
'must by authenticated by one of: TEST-MNT, TEST-NEW-MNT'} | ||
|
||
validator.passwords = [SAMPLE_MNTNER_CRYPT] | ||
result = validator.process_auth(person_new, person_old) | ||
assert not result.is_valid() | ||
assert result.error_messages == {'Authorisation for person PERSON-TEST failed: ' | ||
'must by authenticated by one of: TEST-MNT, TEST-OLD-MNT'} | ||
|
||
# TODO: the preapprove function needs a bunch of refactoring for this to work | ||
# def test_valid_new_person_preapproved_mntner(self, prepare_mocks): | ||
# validator, mock_dq, mock_dh = prepare_mocks | ||
# person = rpsl_object_from_text(SAMPLE_PERSON) | ||
# mock_dh.execute_query = lambda q: [ | ||
# {'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, | ||
# ] | ||
|
||
# result_mntner = parse_change_requests(SAMPLE_MNTNER + 'override: override-password', | ||
# mock_dh, Mock(), Mock())[0] | ||
# validator.pre_approve([result_mntner]) | ||
|
||
# result = validator.process_auth(person, None) | ||
# assert result.is_valid(), result.error_messages | ||
# assert not result.used_override | ||
# assert len(result.mntners_notify) == 1 | ||
# assert result.mntners_notify[0].pk() == 'TEST-MNT' | ||
# assert not flatten_mock_calls(mock_dq, flatten_objects=True) | ||
|
||
def test_create_mntner_requires_override(self, prepare_mocks, config_override): | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
mntner = rpsl_object_from_text(SAMPLE_MNTNER) | ||
mock_dh.execute_query = lambda q: [ | ||
{'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, | ||
] | ||
|
||
validator.passwords = [SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(mntner, None) | ||
assert not result.is_valid() | ||
assert not result.used_override | ||
assert result.error_messages == {'New mntner objects must be added by an administrator.'} | ||
|
||
assert flatten_mock_calls(mock_dq, flatten_objects=True) == [ | ||
['sources', (['TEST'],), {}], | ||
['object_classes', (['mntner'],), {}], | ||
['rpsl_pks', ({'TEST-MNT', 'OTHER1-MNT', 'OTHER2-MNT'},), {}], | ||
] | ||
|
||
validator.overrides = [VALID_PW] | ||
config_override({ | ||
'auth': {'override_password': VALID_PW_HASH}, | ||
}) | ||
|
||
result = validator.process_auth(mntner, None) | ||
assert result.is_valid(), result.error_messages | ||
assert result.used_override | ||
|
||
def test_modify_mntner(self, prepare_mocks, config_override): | ||
validator, mock_dq, mock_dh = prepare_mocks | ||
mntner = rpsl_object_from_text(SAMPLE_MNTNER) | ||
mock_dh.execute_query = lambda q: [ | ||
{'object_class': 'mntner', 'object_text': SAMPLE_MNTNER}, | ||
] | ||
|
||
# This counts as submitting all new hashes. | ||
validator.passwords = [SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(mntner, mntner) | ||
assert result.is_valid() | ||
assert not result.info_messages | ||
|
||
# This counts as submitting all new hashes, but not matching any password | ||
new_mntner = rpsl_object_from_text(SAMPLE_MNTNER.replace('CRYPT', '').replace('MD5', '')) | ||
validator.passwords = [SAMPLE_MNTNER_MD5] | ||
result = validator.process_auth(new_mntner, mntner) | ||
assert not result.is_valid() | ||
assert result.error_messages == { | ||
'Authorisation failed for the auth methods on this mntner object.' | ||
} | ||
|
||
# This counts as submitting all dummy hashes. | ||
mntner_no_auth_hashes = remove_auth_hashes(SAMPLE_MNTNER) | ||
new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) | ||
result = validator.process_auth(new_mntner, mntner) | ||
assert result.is_valid() | ||
assert not new_mntner.has_dummy_auth_value() | ||
assert result.info_messages == { | ||
'As you submitted dummy hash values, all password hashes on this ' | ||
'object were replaced with a new MD5-PW hash of the password you ' | ||
'provided for authentication.' | ||
} | ||
|
||
# # This is a multi password submission which is rejected | ||
validator.passwords = [SAMPLE_MNTNER_MD5, SAMPLE_MNTNER_CRYPT] | ||
new_mntner = rpsl_object_from_text(mntner_no_auth_hashes) | ||
result = validator.process_auth(new_mntner, mntner) | ||
assert not result.is_valid() | ||
assert not result.info_messages | ||
assert result.error_messages == { | ||
'Object submitted with dummy hash values, but multiple or no passwords ' | ||
'submitted. Either submit only full hashes, or a single password.' | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters