Skip to content

Commit

Permalink
tests: add usage of address for tests with eacl
Browse files Browse the repository at this point in the history
closes #838

Signed-off-by: Evgeniy Zayats <zayatsevgeniy@nspcc.io>
  • Loading branch information
Evgeniy Zayats committed Aug 23, 2024
1 parent eb74a7e commit 7316457
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 63 deletions.
29 changes: 23 additions & 6 deletions pytest_tests/lib/helpers/acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from helpers.grpc_responses import EACL_NOT_FOUND, EACL_TABLE_IS_NOT_SET
from neofs_testlib.cli import NeofsCli
from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet

logger = logging.getLogger("NeoLogger")
EACL_LIFETIME = 100500
Expand Down Expand Up @@ -42,6 +43,26 @@ class EACLRole(Enum):
SYSTEM = "system"


class EACLRoleExtendedType(Enum):
PUBKEY = "pubkey"
ADDRESS = "address"

def get_value(self, wallet_path: str, wallet_password: str) -> str:
match self:
case EACLRoleExtendedType.PUBKEY:
return get_wallet_public_key(wallet_path, wallet_password)
case EACLRoleExtendedType.ADDRESS:
return get_last_address_from_wallet(wallet_path, wallet_password)
case _:
raise RuntimeError(f"Invalid EACLRoleExtendedType: {self}")


@dataclass
class EACLRoleExtended:
role_type: EACLRoleExtendedType
value: str


class EACLHeaderType(Enum):
REQUEST = "req" # Filter request headers
OBJECT = "obj" # Filter object headers
Expand Down Expand Up @@ -110,7 +131,7 @@ class EACLPubKey:
class EACLRule:
operation: Optional[EACLOperation] = None
access: Optional[EACLAccess] = None
role: Optional[Union[EACLRole, str]] = None
role: Optional[Union[EACLRole, EACLRoleExtended]] = None
filters: Optional[EACLFilters] = None
password: str = ""

Expand All @@ -123,11 +144,7 @@ def to_dict(self) -> Dict[str, Any]:
}

def __str__(self):
role = (
self.role.value
if isinstance(self.role, EACLRole)
else f"pubkey:{get_wallet_public_key(self.role, self.password)}"
)
role = self.role.value if isinstance(self.role, EACLRole) else f"{self.role.role_type.value}:{self.role.value}"
return f'{self.access.value} {self.operation.value} {self.filters or ""} {role}'


Expand Down
74 changes: 60 additions & 14 deletions pytest_tests/tests/acl/test_bearer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
EACLAccess,
EACLOperation,
EACLRole,
EACLRoleExtended,
EACLRoleExtendedType,
EACLRule,
create_bearer_token,
create_eacl,
Expand Down Expand Up @@ -47,8 +49,9 @@
@pytest.mark.acl
@pytest.mark.acl_bearer
class TestACLBearer(NeofsEnvTestBase):
@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
@pytest.mark.parametrize("role", [EACLRole.USER, EACLRole.OTHERS])
def test_bearer_token_operations(self, wallets, eacl_container_with_objects, role):
def test_bearer_token_operations(self, wallets, eacl_container_with_objects, role, address):
allure.dynamic.title(f"Testcase to validate NeoFS operations with {role.value} BearerToken")
cid, objects_oids, file_path = eacl_container_with_objects
user_wallet = wallets.get_wallet()
Expand All @@ -67,6 +70,10 @@ def test_bearer_token_operations(self, wallets, eacl_container_with_objects, rol
)

with allure.step(f"Set deny all operations for {role.value} via eACL"):
if address:
role = EACLRoleExtended(
address, address.get_value(deny_wallet.wallet_path, self.neofs_env.default_password)
)
eacl = [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in EACLOperation]
eacl_file = create_eacl(cid, eacl, shell=self.shell)
set_eacl(user_wallet.wallet_path, cid, eacl_file, shell=self.shell, endpoint=endpoint)
Expand Down Expand Up @@ -122,7 +129,8 @@ def test_bearer_token_operations(self, wallets, eacl_container_with_objects, rol
)

@allure.title("BearerToken Operations for compound Operations")
def test_bearer_token_compound_operations(self, wallets, eacl_container_with_objects):
@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
def test_bearer_token_compound_operations(self, wallets, eacl_container_with_objects, address):
endpoint = self.neofs_env.sn_rpc
cid, objects_oids, file_path = eacl_container_with_objects
user_wallet = wallets.get_wallet()
Expand Down Expand Up @@ -159,6 +167,9 @@ def test_bearer_token_compound_operations(self, wallets, eacl_container_with_obj

eacl_deny = []
for role, operations in deny_map.items():
if address:
wallet = user_wallet if role == EACLRole.USER else other_wallet
role = EACLRoleExtended(address, address.get_value(wallet.wallet_path, self.neofs_env.default_password))
eacl_deny += [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in operations]
set_eacl(
user_wallet.wallet_path,
Expand Down Expand Up @@ -192,22 +203,29 @@ def test_bearer_token_compound_operations(self, wallets, eacl_container_with_obj
)

with allure.step("Check rule consistency using bearer token"):
user_role = EACLRole.USER
if address:
user_role = EACLRoleExtended(
address, address.get_value(user_wallet.wallet_path, self.neofs_env.default_password)
)
bearer_user = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in bearer_map[EACLRole.USER]
],
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=user_role) for op in bearer_map[EACLRole.USER]],
shell=self.shell,
endpoint=self.neofs_env.sn_rpc,
)

others_role = EACLRole.OTHERS
if address:
others_role = EACLRoleExtended(
address, address.get_value(other_wallet.wallet_path, self.neofs_env.default_password)
)
bearer_other = form_bearertoken_file(
user_wallet.wallet_path,
cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.OTHERS)
EACLRule(operation=op, access=EACLAccess.ALLOW, role=others_role)
for op in bearer_map[EACLRole.OTHERS]
],
shell=self.shell,
Expand Down Expand Up @@ -237,15 +255,21 @@ def test_bearer_token_compound_operations(self, wallets, eacl_container_with_obj
neofs_env=self.neofs_env,
)

@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
@pytest.mark.parametrize("expiration_flag", ["lifetime", "expire_at"])
def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, expiration_flag):
def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, expiration_flag, address):
self.tick_epochs_and_wait(1)
current_epoch = neofs_epoch.get_epoch(self.neofs_env)
cid, objects_oids, file_path = eacl_container_with_objects
user_wallet = wallets.get_wallet()

with allure.step("Create and sign bearer token via cli"):
eacl = [EACLRule(access=EACLAccess.ALLOW, role=EACLRole.USER, operation=op) for op in EACLOperation]
user_role = EACLRole.USER
if address:
user_role = EACLRoleExtended(
address, address.get_value(user_wallet.wallet_path, self.neofs_env.default_password)
)
eacl = [EACLRule(access=EACLAccess.ALLOW, role=user_role, operation=op) for op in EACLOperation]

path_to_bearer = os.path.join(os.getcwd(), ASSETS_DIR, TEST_FILES_DIR, f"bearer_token_{str(uuid.uuid4())}")

Expand Down Expand Up @@ -298,7 +322,10 @@ def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, exp
)

@allure.title("Check bearer token with ContainerID specified")
def test_bearer_token_with_container_id(self, wallets, client_shell: Shell, neofs_env: NeoFSEnv, file_path: str):
@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
def test_bearer_token_with_container_id(
self, wallets, client_shell: Shell, neofs_env: NeoFSEnv, file_path: str, address
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
Expand All @@ -310,10 +337,15 @@ def test_bearer_token_with_container_id(self, wallets, client_shell: Shell, neof
)

with allure.step(f"Create bearer token with all operations allowed for cid: {container1.cid}"):
user_role = EACLRole.USER
if address:
user_role = EACLRoleExtended(
address, address.get_value(user_wallet.wallet_path, self.neofs_env.default_password)
)
bearer = form_bearertoken_file(
user_wallet.wallet_path,
container1.cid,
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) for op in EACLOperation],
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=user_role) for op in EACLOperation],
shell=self.shell,
endpoint=self.neofs_env.sn_rpc,
)
Expand Down Expand Up @@ -347,7 +379,10 @@ def test_bearer_token_with_container_id(self, wallets, client_shell: Shell, neof
)

@allure.title("Check bearer token without ContainerID specified")
def test_bearer_token_without_container_id(self, wallets, client_shell: Shell, neofs_env: NeoFSEnv, file_path: str):
@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
def test_bearer_token_without_container_id(
self, wallets, client_shell: Shell, neofs_env: NeoFSEnv, file_path: str, address
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
Expand All @@ -359,10 +394,15 @@ def test_bearer_token_without_container_id(self, wallets, client_shell: Shell, n
)

with allure.step("Create bearer token with all operations allowed for all containers"):
user_role = EACLRole.USER
if address:
user_role = EACLRoleExtended(
address, address.get_value(user_wallet.wallet_path, self.neofs_env.default_password)
)
bearer = form_bearertoken_file(
user_wallet.wallet_path,
None,
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) for op in EACLOperation],
[EACLRule(operation=op, access=EACLAccess.ALLOW, role=user_role) for op in EACLOperation],
shell=self.shell,
endpoint=self.neofs_env.sn_rpc,
)
Expand Down Expand Up @@ -395,8 +435,9 @@ def test_bearer_token_without_container_id(self, wallets, client_shell: Shell, n
neofs_env=self.neofs_env,
)

@pytest.mark.parametrize("address", [EACLRoleExtendedType.ADDRESS, None])
@pytest.mark.parametrize("operation", list(EACLOperation))
def test_bearer_token_separate_operations(self, wallets, eacl_container_with_objects, operation):
def test_bearer_token_separate_operations(self, wallets, eacl_container_with_objects, operation, address):
role = EACLRole.USER
not_allowed_operations = [op for op in EACLOperation if op != operation]

Expand All @@ -406,6 +447,11 @@ def test_bearer_token_separate_operations(self, wallets, eacl_container_with_obj
deny_wallet = wallets.get_wallet(role)
endpoint = self.neofs_env.sn_rpc

if address:
role = EACLRoleExtended(
address, address.get_value(deny_wallet.wallet_path, self.neofs_env.default_password)
)

with allure.step(f"Set deny all operations for {role.value} via eACL"):
eacl = [EACLRule(access=EACLAccess.DENY, role=role, operation=op) for op in EACLOperation]
eacl_file = create_eacl(cid, eacl, shell=self.shell)
Expand Down
Loading

0 comments on commit 7316457

Please sign in to comment.