Skip to content

Commit

Permalink
Fix corruption during CONFLICT upload (#3498)
Browse files Browse the repository at this point in the history
PBENCH-1219

Large uploads can time out, causing the client (e.g., the 0.69 passthrough
server's dispatch) to retry. Eventually, this will result in an `OK` (200)
response, which is good. However if we retry before the original operation
finishes (it may be still running, despite the client timeout), we catch the
already existing "temporary intake directory" as a `CONFLICT` error.

Unfortunately, the cleanup logic doesn't recognize this distinction, and still
deleted the intake directory on exit. Timed correctly, this could break the
original upload: at best, it results in a noisy termination with complaints
that the existed temporary directory no longer exists.

Fix this problem by attempting to delete only when this API instance has
successfully created the temporary directory. Modify the `CONFLICT` unit test
case to reproduce the situation more accurately and additionally validate that
the directory still exists after completion.
  • Loading branch information
dbutenhof authored Jul 21, 2023
1 parent 5329c0f commit abfc995
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
18 changes: 11 additions & 7 deletions lib/pbench/server/api/resources/intake_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def _intake(

audit: Optional[Audit] = None
username: Optional[str] = None
tmp_dir: Optional[Path] = None
intake_dir: Optional[Path] = None

prefix = current_app.server_config.rest_uri
origin = f"{self._get_uri_base(request).host}{prefix}/datasets/"
Expand Down Expand Up @@ -245,10 +245,12 @@ def _intake(
except FileExistsError as e:
raise APIAbort(
HTTPStatus.CONFLICT,
"Temporary upload directory already exists",
"Dataset is currently being uploaded",
) from e
tar_full_path = tmp_dir / filename
md5_full_path = tmp_dir / f"{filename}.md5"
else:
intake_dir = tmp_dir
tar_full_path = intake_dir / filename
md5_full_path = intake_dir / f"{filename}.md5"

bytes_received = 0
usage = shutil.disk_usage(tar_full_path.parent)
Expand Down Expand Up @@ -515,11 +517,13 @@ def _intake(
recovery.cleanup()
raise exception from e
finally:
if tmp_dir:
if intake_dir:
try:
shutil.rmtree(tmp_dir)
shutil.rmtree(intake_dir)
except Exception as e:
current_app.logger.warning("Error removing {}: {}", tmp_dir, str(e))
current_app.logger.warning(
"Error removing {}: {}", intake_dir, str(e)
)

response = jsonify(
{
Expand Down
32 changes: 24 additions & 8 deletions lib/pbench/test/unit/server/test_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from io import BytesIO
from logging import Logger
from pathlib import Path
from typing import Any
from typing import Any, Optional

from flask import Request
import pytest
Expand Down Expand Up @@ -337,19 +337,33 @@ def test_empty_upload(
def test_temp_exists(
self, monkeypatch, client, tmp_path, server_config, pbench_drb_token
):
"""Test behavior of a conflicting upload
When the MD5-based temporary intake directory exists already, upload
will fail with CONFLICT. We want to verify that behavior, and that we
don't delete the existing directory during cleanup, which could
interfere with a running upload. This can happen, for example, when a
large upload times out and the client retries before the original is
finished.
"""
md5 = "d41d8cd98f00b204e9800998ecf8427e"
temp_path: Optional[Path] = None

def td_exists(self, *args, **kwargs):
"""Mock out Path.mkdir()
The trick here is that calling the UPLOAD API results in two calls
to Path.mkdir: one in the __init__ to be sure that ARCHIVE/UPLOAD
exists, and the second for the temporary subdirectory. We want the
first to succeed normally so we'll pass the call to the real mkdir
if the path doesn't end with our MD5 value.
exists, and the second for the temporary subdirectory. We want to
create both directories, but for the second (MD5-based intake temp)
we want to raise FileExistsError as if it had already existed, to
trigger the duplicate upload logic.
"""
retval = self.real_mkdir(*args, **kwargs)
if self.name != md5:
return self.real_mkdir(*args, **kwargs)
return retval
nonlocal temp_path
temp_path = self
raise FileExistsError(str(self))

filename = "tmp.tar.xz"
Expand All @@ -365,9 +379,11 @@ def td_exists(self, *args, **kwargs):
headers=self.gen_headers(pbench_drb_token, md5),
)
assert response.status_code == HTTPStatus.CONFLICT
assert (
response.json.get("message") == "Temporary upload directory already exists"
)

# Assert that we captured an intake temporary directory path and that
# the "duplicate" path wasn't deleted during API cleanup.
assert temp_path and temp_path.is_dir()
assert response.json.get("message") == "Dataset is currently being uploaded"
assert not self.cachemanager_created

@pytest.mark.parametrize(
Expand Down

0 comments on commit abfc995

Please sign in to comment.