diff --git a/pytest_tests/testsuites/acl/test_bearer.py b/pytest_tests/testsuites/acl/test_bearer.py index 18a8bf228..d8827e6ea 100644 --- a/pytest_tests/testsuites/acl/test_bearer.py +++ b/pytest_tests/testsuites/acl/test_bearer.py @@ -1,11 +1,14 @@ import os import uuid +from collections import namedtuple import allure import pytest +from cluster import Cluster from cluster_test_base import ClusterTestBase from common import ASSETS_DIR, TEST_FILES_DIR, WALLET_PASS from epoch import get_epoch +from neofs_testlib.shell import Shell from neofs_testlib.utils.wallet import get_last_address_from_wallet from python_keywords.acl import ( EACLAccess, @@ -19,11 +22,16 @@ sign_bearer, wait_for_cache_expired, ) +from python_keywords.container import create_container from python_keywords.container_access import ( check_custom_access_to_container, check_full_access_to_container, check_no_access_to_container, ) +from python_keywords.neofs_verbs import put_object_to_random_node +from wellknown_acl import PUBLIC_ACL + +ContainerTuple = namedtuple("ContainerTuple", ["cid", "objects_oids"]) @pytest.mark.acl @@ -308,3 +316,147 @@ def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, exp shell=self.shell, cluster=self.cluster, ) + + @allure.title("Check bearer token with ContainerID specified") + def test_bearer_token_with_container_id( + self, wallets, client_shell: Shell, cluster: Cluster, file_path: str + ): + user_wallet = wallets.get_wallet() + container1, container2 = self._create_containers_with_objects( + containers_count=2, + objects_count=1, + user_wallet=user_wallet, + client_shell=client_shell, + cluster=cluster, + file_path=file_path, + ) + + with allure.step( + f"Create bearer token with all operations allowed for cid: {container1.cid}" + ): + bearer = form_bearertoken_file( + user_wallet.wallet_path, + container1.cid, + [ + EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) + for op in EACLOperation + ], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step( + f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}" + ): + check_full_access_to_container( + user_wallet.wallet_path, + container1.cid, + container1.objects_oids.pop(), + file_path, + bearer=bearer, + wallet_config=user_wallet.config_path, + shell=self.shell, + cluster=self.cluster, + ) + + with allure.step( + f"Check {EACLRole.USER.value} has no access to all operations with another container {container2.cid}" + ): + check_no_access_to_container( + user_wallet.wallet_path, + container2.cid, + container2.objects_oids.pop(), + file_path, + bearer=bearer, + wallet_config=user_wallet.config_path, + shell=self.shell, + cluster=self.cluster, + ) + + @allure.title("Check bearer token without ContainerID specified") + def test_bearer_token_without_container_id( + self, wallets, client_shell: Shell, cluster: Cluster, file_path: str + ): + user_wallet = wallets.get_wallet() + container1, container2 = self._create_containers_with_objects( + containers_count=2, + objects_count=1, + user_wallet=user_wallet, + client_shell=client_shell, + cluster=cluster, + file_path=file_path, + ) + + with allure.step(f"Create bearer token with all operations allowed for all containers"): + bearer = form_bearertoken_file( + user_wallet.wallet_path, + None, + [ + EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER) + for op in EACLOperation + ], + shell=self.shell, + endpoint=self.cluster.default_rpc_endpoint, + ) + + with allure.step( + f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}" + ): + check_full_access_to_container( + user_wallet.wallet_path, + container1.cid, + container1.objects_oids.pop(), + file_path, + bearer=bearer, + wallet_config=user_wallet.config_path, + shell=self.shell, + cluster=self.cluster, + ) + + with allure.step( + f"Check {EACLRole.USER.value} with token has access to all operations with container {container2.cid}" + ): + check_full_access_to_container( + user_wallet.wallet_path, + container2.cid, + container2.objects_oids.pop(), + file_path, + bearer=bearer, + wallet_config=user_wallet.config_path, + shell=self.shell, + cluster=self.cluster, + ) + + def _create_containers_with_objects( + self, + containers_count: int, + objects_count: int, + user_wallet, + client_shell: Shell, + cluster: Cluster, + file_path: str, + ) -> list[ContainerTuple]: + result = [] + for _ in range(containers_count): + with allure.step("Create eACL public container"): + cid = create_container( + user_wallet.wallet_path, + basic_acl=PUBLIC_ACL, + shell=client_shell, + endpoint=cluster.default_rpc_endpoint, + ) + + with allure.step("Add test objects to container"): + objects_oids = [ + put_object_to_random_node( + user_wallet.wallet_path, + file_path, + cid, + attributes={"key1": "val1", "key": val, "key2": "abc"}, + shell=client_shell, + cluster=cluster, + ) + for val in range(objects_count) + ] + result.append(ContainerTuple(cid=cid, objects_oids=objects_oids)) + return result