Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transfer client local_user support #736

Merged
merged 5 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.d/20230602_150515_aaschaer_local_user.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

* Several TransferClient methods, TransferData, and DeleteData now support the local_user, source_local_user, and destination_local_user parameters (:pr:`NUMBER`)

* The TransferRequestsTransport will no longer automatically retry errors with a code of EndpointError
19 changes: 19 additions & 0 deletions src/globus_sdk/_testing/data/transfer/operation_mkdir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from globus_sdk._testing.models import RegisteredResponse, ResponseSet

from ._common import ENDPOINT_ID

RESPONSES = ResponseSet(
metadata={"endpoint_id": ENDPOINT_ID},
default=RegisteredResponse(
service="transfer",
method="POST",
path=f"/operation/endpoint/{ENDPOINT_ID}/mkdir",
json={
"DATA_TYPE": "mkdir_result",
"code": "DirectoryCreated",
"message": "The directory was created successfully",
"request_id": "ShbIUzrWT",
"resource": f"/operation/endpoint/{ENDPOINT_ID}/mkdir",
},
),
)
19 changes: 19 additions & 0 deletions src/globus_sdk/_testing/data/transfer/operation_rename.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from globus_sdk._testing.models import RegisteredResponse, ResponseSet

from ._common import ENDPOINT_ID

RESPONSES = ResponseSet(
metadata={"endpoint_id": ENDPOINT_ID},
default=RegisteredResponse(
service="transfer",
method="POST",
path=f"/operation/endpoint/{ENDPOINT_ID}/rename",
json={
"DATA_TYPE": "result",
"code": "FileRenamed",
"message": "File or directory renamed successfully",
"request_id": "ShbIUzrWT",
"resource": f"/operation/endpoint/{ENDPOINT_ID}/rename",
},
),
)
26 changes: 24 additions & 2 deletions src/globus_sdk/services/transfer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ def operation_ls(
filter: (
str | TransferFilterDict | list[str | TransferFilterDict] | None
) = None,
local_user: str | None = None,
query_params: dict[str, t.Any] | None = None,
) -> IterableTransferResponse:
"""
Expand All @@ -1235,6 +1236,10 @@ def operation_ls(
If a list is supplied, it is passed as multiple params.
See :ref:`filter formatting <transfer_filter_formatting>` for details.
:type filter: str or dict, optional
:param local_user: Optional value passed to identity mapping specifying which
local user account to map to. Only usable with Globus Connect Server v5
mapped collections.
:type local_user: str, optional
:param query_params: Additional passthrough query parameters
:type query_params: dict, optional

Expand Down Expand Up @@ -1291,6 +1296,8 @@ def operation_ls(
query_params["orderby"] = ",".join(orderby)
if filter is not None:
query_params["filter"] = _format_filter(filter)
if local_user is not None:
query_params["local_user"] = local_user

log.info(f"TransferClient.operation_ls({endpoint_id}, {query_params})")
return IterableTransferResponse(
Expand All @@ -1302,13 +1309,18 @@ def operation_mkdir(
endpoint_id: UUIDLike,
path: str,
*,
local_user: str | None = None,
query_params: dict[str, t.Any] | None = None,
) -> response.GlobusHTTPResponse:
"""
:param endpoint_id: The ID of the endpoint on which to create a directory
:type endpoint_id: str or UUID
:param path: Path to the new directory to create
:type path: str
:param local_user: Optional value passed to identity mapping specifying which
local user account to map to. Only usable with Globus Connect Server v5
mapped collections.
:type local_user: str, optional
:param query_params: Additional passthrough query parameters
:type query_params: dict, optional

Expand All @@ -1333,7 +1345,7 @@ def operation_mkdir(
endpoint_id, path, query_params
)
)
json_body = {"DATA_TYPE": "mkdir", "path": path}
json_body = {"DATA_TYPE": "mkdir", "path": path, "local_user": local_user}
return self.post(
f"operation/endpoint/{endpoint_id}/mkdir",
data=json_body,
Expand All @@ -1346,6 +1358,7 @@ def operation_rename(
oldpath: str,
newpath: str,
*,
local_user: str | None = None,
query_params: dict[str, t.Any] | None = None,
) -> response.GlobusHTTPResponse:
"""
Expand All @@ -1355,6 +1368,10 @@ def operation_rename(
:type oldpath: str
:param newpath: Path to the new filename
:type newpath: str
:param local_user: Optional value passed to identity mapping specifying which
local user account to map to. Only usable with Globus Connect Server v5
mapped collections.
:type local_user: str, optional
:param query_params: Additional passthrough query parameters
:type query_params: dict, optional

Expand All @@ -1379,7 +1396,12 @@ def operation_rename(
endpoint_id, oldpath, newpath, query_params
)
)
json_body = {"DATA_TYPE": "rename", "old_path": oldpath, "new_path": newpath}
json_body = {
"DATA_TYPE": "rename",
"old_path": oldpath,
"new_path": newpath,
"local_user": local_user,
}
return self.post(
f"operation/endpoint/{endpoint_id}/rename",
data=json_body,
Expand Down
6 changes: 6 additions & 0 deletions src/globus_sdk/services/transfer/data/delete_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class DeleteData(utils.PayloadWrapper):
status to INACTIVE. e.g. From credentials expiring.
[default: ``True``]
:type notify_on_inactive: bool, optional
:param local_user: Optional value passed to identity mapping specifying which local
user account to map to. Only usable with Globus Connect Server v5 mapped
collections.
:type local_user: string, optional
:param additional_fields: additional fields to be added to the delete
document. Mostly intended for internal use
:type additional_fields: dict, optional
Expand Down Expand Up @@ -109,6 +113,7 @@ def __init__(
notify_on_succeeded: bool = True,
notify_on_failed: bool = True,
notify_on_inactive: bool = True,
local_user: str | None = None,
additional_fields: dict[str, t.Any] | None = None,
) -> None:
super().__init__()
Expand All @@ -129,6 +134,7 @@ def __init__(
else None
),
deadline=deadline,
local_user=local_user,
)
self._set_optbools(
recursive=recursive,
Expand Down
12 changes: 12 additions & 0 deletions src/globus_sdk/services/transfer/data/transfer_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ class TransferData(utils.PayloadWrapper):
status to INACTIVE. e.g. From credentials expiring.
[default: ``True``]
:type notify_on_inactive: bool, optional
:param source_local_user: Optional value passed to the source's identity mapping
specifying which local user account to map to. Only usable with Globus Connect
Server v5 mapped collections.
:type source_local_user: string, optional
:param destination_local_user: Optional value passed to the destination's identity
mapping specifying which local user account to map to. Only usable with Globus
Connect Server v5 mapped collections.
:type destination_local_user: string, optional
:param additional_fields: additional fields to be added to the transfer
document. Mostly intended for internal use
:type additional_fields: dict, optional
Expand Down Expand Up @@ -200,6 +208,8 @@ def __init__(
notify_on_succeeded: bool = True,
notify_on_failed: bool = True,
notify_on_inactive: bool = True,
source_local_user: str | None = None,
destination_local_user: str | None = None,
additional_fields: dict[str, t.Any] | None = None,
) -> None:
super().__init__()
Expand All @@ -225,6 +235,8 @@ def __init__(
),
recursive_symlinks=recursive_symlinks,
deadline=deadline,
source_local_user=source_local_user,
destination_local_user=destination_local_user,
)
self._set_optbools(
verify_checksum=verify_checksum,
Expand Down
12 changes: 8 additions & 4 deletions src/globus_sdk/services/transfer/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ class TransferRequestsTransport(RequestsTransport):
def default_check_transient_error(self, ctx: RetryContext) -> RetryCheckResult:
"""
check for transient error status codes which could be resolved by
retrying the request. Does not retry ExternalErrors as those are
unlikely to actually be transient.
retrying the request. Does not retry ExternalErrors or EndpointErrors
as those are unlikely to actually be transient.
"""
if ctx.response is not None and (
ctx.response.status_code in self.TRANSIENT_ERROR_STATUS_CODES
Expand All @@ -19,7 +19,11 @@ def default_check_transient_error(self, ctx: RetryContext) -> RetryCheckResult:
code = ctx.response.json()["code"]
except (ValueError, KeyError):
code = ""
if "ExternalError" not in code:
return RetryCheckResult.do_retry

for non_retry_code in ("ExternalError", "EndpointError"):
if non_retry_code in code:
return RetryCheckResult.no_decision

return RetryCheckResult.do_retry

return RetryCheckResult.no_decision
27 changes: 23 additions & 4 deletions tests/functional/services/transfer/test_operation_mkdir.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,25 @@
import pytest
import json
import urllib.parse

from globus_sdk._testing import get_last_request, load_response

@pytest.mark.xfail
def test_operation_mkdir():
raise NotImplementedError

def test_operation_mkdir(client):
meta = load_response(client.operation_mkdir).metadata
endpoint_id = meta["endpoint_id"]

res = client.operation_mkdir(
endpoint_id=endpoint_id,
path="~/dir/",
local_user="my-user",
query_params={"foo": "bar"},
)
assert res["DATA_TYPE"] == "mkdir_result"
assert res["code"] == "DirectoryCreated"

req = get_last_request()
body = json.loads(req.body)
assert body["path"] == "~/dir/"
assert body["local_user"] == "my-user"
query_params = urllib.parse.parse_qs(urllib.parse.urlparse(req.url).query)
assert query_params["foo"] == ["bar"]
29 changes: 25 additions & 4 deletions tests/functional/services/transfer/test_operation_rename.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
import pytest
import json
import urllib.parse

from globus_sdk._testing import get_last_request, load_response

@pytest.mark.xfail
def test_operation_rename():
raise NotImplementedError

def test_operation_rename(client):
meta = load_response(client.operation_rename).metadata
endpoint_id = meta["endpoint_id"]

res = client.operation_rename(
endpoint_id=endpoint_id,
oldpath="~/old-name",
newpath="~/new-name",
local_user="my-user",
query_params={"foo": "bar"},
)
assert res["DATA_TYPE"] == "result"
assert res["code"] == "FileRenamed"

req = get_last_request()
body = json.loads(req.body)
assert body["old_path"] == "~/old-name"
assert body["new_path"] == "~/new-name"
assert body["local_user"] == "my-user"
query_params = urllib.parse.parse_qs(urllib.parse.urlparse(req.url).query)
assert query_params["foo"] == ["bar"]
6 changes: 6 additions & 0 deletions tests/functional/services/transfer/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ def test_operation_ls(client):
parsed_qs = urllib.parse.parse_qs(urllib.parse.urlparse(req.url).query)
assert parsed_qs == {"orderby": ["name"], "filter": ["name:~*.png"]}

# local_user
client.operation_ls(GO_EP1_ID, local_user="my-user")
req = get_last_request()
parsed_qs = urllib.parse.parse_qs(urllib.parse.urlparse(req.url).query)
assert parsed_qs == {"local_user": ["my-user"]}


def test_autoactivation(client):
"""
Expand Down
10 changes: 10 additions & 0 deletions tests/functional/services/transfer/test_task_submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ def test_transfer_submit_success(client):
label="mytask",
sync_level="exists",
deadline="2018-06-01",
source_local_user="my-source-user",
destination_local_user="my-dest-user",
additional_fields={"custom_param": "foo"},
)
assert tdata["custom_param"] == "foo"
Expand All @@ -48,6 +50,10 @@ def test_transfer_submit_success(client):
assert res["submission_id"] == meta["submission_id"]
assert res["task_id"] == meta["task_id"]

req_body = json.loads(get_last_request().body)
assert req_body["source_local_user"] == "my-source-user"
assert req_body["destination_local_user"] == "my-dest-user"


def test_delete_submit_success(client):
load_response(client.get_submission_id)
Expand All @@ -57,6 +63,7 @@ def test_delete_submit_success(client):
endpoint=GO_EP1_ID,
label="mytask",
deadline="2018-06-01",
local_user="my-user",
additional_fields={"custom_param": "foo"},
)
assert ddata["custom_param"] == "foo"
Expand All @@ -69,6 +76,9 @@ def test_delete_submit_success(client):
assert res["submission_id"] == meta["submission_id"]
assert res["task_id"] == meta["task_id"]

req_body = json.loads(get_last_request().body)
assert req_body["local_user"] == "my-user"


@pytest.mark.parametrize("datatype", ("transfer", "delete"))
def test_submit_adds_missing_submission_id_to_data(client, datatype):
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/transport/test_transfer_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,29 @@ def test_transfer_does_not_retry_external():
assert checker.should_retry(ctx) is False


def test_transfer_does_not_retry_endpoint_error():
transport = TransferRequestsTransport()
checker = RetryCheckRunner(transport.retry_checks)

body = {
"HTTP status": "502",
"code": "EndpointError",
"error_name": "Transfer API Error",
"message": (
"This GCSv5 is older than version 5.4.62 and does not support local user "
"selection"
),
"request_id": "istNh0Zpz",
}

dummy_response = mock.Mock()
dummy_response.json = lambda: body
dummy_response.status_code = 502
ctx = RetryContext(1, response=dummy_response)

assert checker.should_retry(ctx) is False


def test_transfer_retries_others():
transport = TransferRequestsTransport()
checker = RetryCheckRunner(transport.retry_checks)
Expand Down