Skip to content

Commit

Permalink
tests: add checks for ContainerID in bearer token, fixes nspcc-dev#546
Browse files Browse the repository at this point in the history
Signed-off-by: Evgeniy Zayats <zayatsevgeniy@nspcc.io>
  • Loading branch information
Evgeniy Zayats committed Sep 18, 2023
1 parent 130ae0f commit e8828d4
Showing 1 changed file with 150 additions and 0 deletions.
150 changes: 150 additions & 0 deletions pytest_tests/testsuites/acl/test_bearer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import uuid
from collections import namedtuple

import allure
import pytest
from cluster import Cluster
from cluster_test_base import ClusterTestBase
from common import ASSETS_DIR, TEST_FILES_DIR, WALLET_PASS
from epoch import get_epoch
from neofs_testlib.shell import Shell
from neofs_testlib.utils.wallet import get_last_address_from_wallet
from python_keywords.acl import (
EACLAccess,
Expand All @@ -19,11 +22,16 @@
sign_bearer,
wait_for_cache_expired,
)
from python_keywords.container import create_container
from python_keywords.container_access import (
check_custom_access_to_container,
check_full_access_to_container,
check_no_access_to_container,
)
from python_keywords.neofs_verbs import put_object_to_random_node
from wellknown_acl import PUBLIC_ACL

ContainerTuple = namedtuple("ContainerTuple", ["cid", "objects_oids"])


@pytest.mark.acl
Expand Down Expand Up @@ -308,3 +316,145 @@ def test_bearer_token_expiration(self, wallets, eacl_container_with_objects, exp
shell=self.shell,
cluster=self.cluster,
)

def test_bearer_token_with_container_id(
self, wallets, client_shell: Shell, cluster: Cluster, file_path: str
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
objects_count=1,
user_wallet=user_wallet,
client_shell=client_shell,
cluster=cluster,
file_path=file_path,
)

with allure.step(
f"Create bearer token with all operations allowed for cid: {container1.cid}"
):
bearer = form_bearertoken_file(
user_wallet.wallet_path,
container1.cid,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in EACLOperation
],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container1.cid,
container1.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

with allure.step(
f"Check {EACLRole.USER.value} has no access to all operations with another container {container2.cid}"
):
check_no_access_to_container(
user_wallet.wallet_path,
container2.cid,
container2.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

def test_bearer_token_without_container_id(
self, wallets, client_shell: Shell, cluster: Cluster, file_path: str
):
user_wallet = wallets.get_wallet()
container1, container2 = self._create_containers_with_objects(
containers_count=2,
objects_count=1,
user_wallet=user_wallet,
client_shell=client_shell,
cluster=cluster,
file_path=file_path,
)

with allure.step(f"Create bearer token with all operations allowed for all containers"):
bearer = form_bearertoken_file(
user_wallet.wallet_path,
None,
[
EACLRule(operation=op, access=EACLAccess.ALLOW, role=EACLRole.USER)
for op in EACLOperation
],
shell=self.shell,
endpoint=self.cluster.default_rpc_endpoint,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container1.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container1.cid,
container1.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

with allure.step(
f"Check {EACLRole.USER.value} with token has access to all operations with container {container2.cid}"
):
check_full_access_to_container(
user_wallet.wallet_path,
container2.cid,
container2.objects_oids.pop(),
file_path,
bearer=bearer,
wallet_config=user_wallet.config_path,
shell=self.shell,
cluster=self.cluster,
)

def _create_containers_with_objects(
self,
containers_count: int,
objects_count: int,
user_wallet,
client_shell: Shell,
cluster: Cluster,
file_path: str,
) -> list[ContainerTuple]:
result = []
for _ in range(containers_count):
with allure.step("Create eACL public container"):
cid = create_container(
user_wallet.wallet_path,
basic_acl=PUBLIC_ACL,
shell=client_shell,
endpoint=cluster.default_rpc_endpoint,
)

with allure.step("Add test objects to container"):
objects_oids = [
put_object_to_random_node(
user_wallet.wallet_path,
file_path,
cid,
attributes={"key1": "val1", "key": val, "key2": "abc"},
shell=client_shell,
cluster=cluster,
)
for val in range(objects_count)
]
result.append(ContainerTuple(cid=cid, objects_oids=objects_oids))
return result

0 comments on commit e8828d4

Please sign in to comment.