-
-
Notifications
You must be signed in to change notification settings - Fork 4
feat(py-client): Add Python client implementation #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
63 commits
Select commit
Hold shift + click to select a range
eb22f80
feat(client-py): Add a (empty) package for the Python client
lcian d37032c
use workspace, use internal PyPI for whole workspace, uv sync
lcian 86c75cb
specify build system, install client as editable
lcian 8f932f2
uv sync --all-packages in envrc for unified venv
lcian 987c9c6
rebased
lcian 1e9388c
lockfile
lcian ac5bc03
adjst commands
lcian a29e932
add pre-commit
lcian 58d5309
disclaimer on pre-commit
lcian daca135
readme
lcian ca248cb
build_system
lcian 1c93543
os matrix
lcian 33ff8b4
os matrix
lcian 1f5d760
improve
lcian 790e69a
improve
lcian e209b31
only tests in matrix
lcian ad1a998
exclude mypy on windows
lcian 25c4df6
exclude precommit on windows
lcian 5296238
make internal pypi not exclusive - packages are missing on win
lcian 41c5edf
ci only on win and macos
lcian 5c2305e
default = true
lcian 7a6d503
lower py version, regenerate uv.lock
lcian e0c1a52
lower .python-version as well for CI
lcian 1ac9c9c
just require 3.11
lcian 5c0e09f
lower req to 3.11.9 due to GH runners availability
lcian b7ebd66
regerate lockfile with 3.11
lcian 08fd439
feat(py-client): Add Python client implementation
lcian 984888c
improve
lcian d24bf59
improve
lcian f62f754
regerate lockfile for 3.11
lcian 53d7f41
vendor in itertools.batched
lcian ce3dcb6
improve
lcian db5229d
move flake8 config to pyproject.toml
lcian 5cffe3f
improve
lcian 8ab5467
readd back setup.cfg
lcian 75c6065
use public PyPI for now
lcian 06d8d11
Merge branch 'main' into lcian/feat/python-client-devsetup
lcian 074da70
back to python 3.13
lcian e1b78e9
use ruff; less pre-commit hooks; less overriding things
lcian 0bbe63b
use uv run for CI
lcian b592a34
test only on ubuntu-latest
lcian 10d6c92
specify workflow permissions; remove concurrency
lcian 114e0d4
format py file in devenv dir
lcian 96c25d5
move pytest to top level; run uv sync in CI
lcian 664e211
try removing uv run prefix from CI commands
lcian 76c8943
delete gitignore
lcian 395963c
ignore just dist
lcian 0bb4a6f
unify CI
lcian 2e19ec3
fix workflow file
lcian 088fd48
improve
lcian 7d1d979
improve
lcian 6f6a046
improve
lcian 5389be3
change mypy rules slightly
lcian 10fb7cd
pin minor python version
lcian 28f41b4
simplify build requirement
lcian 93ada0e
improve
lcian 1f49192
Merge branch 'lcian/feat/python-client-devsetup' into lcian/feat/pyth…
lcian 1687ac2
remove metrics.timing in favor of distribution
lcian e861c49
Update .pre-commit-config.yaml
lcian de40c3e
Merge branch 'lcian/feat/python-client-devsetup' into lcian/feat/pyth…
lcian e9b2306
remove vendored itertools thingy; satisfy linter
lcian 3fd52f2
Merge branch 'main' into lcian/feat/python-client-initial-code
lcian bb300b6
remove setup.cfg
lcian File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
239 changes: 239 additions & 0 deletions
239
python-objectstore-client/src/objectstore_client/client.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,239 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from io import BytesIO | ||
| from typing import IO, Literal, NamedTuple, NotRequired, Self, TypedDict, cast | ||
| from urllib.parse import urlencode | ||
|
|
||
| import sentry_sdk | ||
| import urllib3 | ||
| import zstandard | ||
| from urllib3.connectionpool import HTTPConnectionPool | ||
|
|
||
| from objectstore_client.metadata import ( | ||
| HEADER_EXPIRATION, | ||
| HEADER_META_PREFIX, | ||
| Compression, | ||
| ExpirationPolicy, | ||
| Metadata, | ||
| format_expiration, | ||
| ) | ||
| from objectstore_client.metrics import ( | ||
| MetricsBackend, | ||
| NoOpMetricsBackend, | ||
| measure_storage_operation, | ||
| ) | ||
|
|
||
| Permission = Literal["read", "write"] | ||
|
|
||
|
|
||
| class Scope(TypedDict): | ||
| organization: int | ||
| project: NotRequired[int] | ||
|
|
||
|
|
||
| class GetResult(NamedTuple): | ||
| metadata: Metadata | ||
| payload: IO[bytes] | ||
|
|
||
|
|
||
| class ClientBuilder: | ||
| def __init__( | ||
| self, | ||
| objectstore_base_url: str, | ||
| usecase: str, | ||
| metrics_backend: MetricsBackend | None = None, | ||
| propagate_traces: bool = False, | ||
| ): | ||
| self._base_url = objectstore_base_url | ||
| self._usecase = usecase | ||
| self._default_compression: Compression = "zstd" | ||
| self._propagate_traces = propagate_traces | ||
| self._metrics_backend = metrics_backend or NoOpMetricsBackend() | ||
|
|
||
| def _make_client(self, scope: str) -> Client: | ||
| pool = urllib3.connectionpool.connection_from_url(self._base_url) | ||
| return Client( | ||
| pool, | ||
| self._default_compression, | ||
| self._usecase, | ||
| scope, | ||
| self._propagate_traces, | ||
| self._metrics_backend, | ||
| ) | ||
|
|
||
| def default_compression(self, default_compression: Compression) -> Self: | ||
| self._default_compression = default_compression | ||
| return self | ||
|
|
||
| def for_organization(self, organization_id: int) -> Client: | ||
| return self._make_client(f"org.{organization_id}") | ||
|
|
||
| def for_project(self, organization_id: int, project_id: int) -> Client: | ||
| return self._make_client(f"org.{organization_id}/proj.{project_id}") | ||
|
|
||
|
|
||
| class Client: | ||
| _default_compression: Compression | ||
|
|
||
| def __init__( | ||
| self, | ||
| pool: HTTPConnectionPool, | ||
| default_compression: Compression, | ||
| usecase: str, | ||
| scope: str, | ||
| propagate_traces: bool, | ||
| metrics_backend: MetricsBackend, | ||
| ): | ||
| self._pool = pool | ||
| self._default_compression = default_compression | ||
| self._usecase = usecase | ||
| self._scope = scope | ||
| self._propagate_traces = propagate_traces | ||
| self._metrics_backend = metrics_backend | ||
|
|
||
| def _make_headers(self) -> dict[str, str]: | ||
| if self._propagate_traces: | ||
| return dict(sentry_sdk.get_current_scope().iter_trace_propagation_headers()) | ||
| return {} | ||
|
|
||
| def _make_url(self, id: str | None, full: bool = False) -> str: | ||
| base_path = f"/v1/{id}" if id else "/v1/" | ||
| qs = urlencode({"usecase": self._usecase, "scope": self._scope}) | ||
| if full: | ||
| return f"http://{self._pool.host}:{self._pool.port}{base_path}?{qs}" | ||
| else: | ||
| return f"{base_path}?{qs}" | ||
|
|
||
| def put( | ||
| self, | ||
| contents: bytes | IO[bytes], | ||
| id: str | None = None, | ||
| compression: Compression | Literal["none"] | None = None, | ||
| metadata: dict[str, str] | None = None, | ||
| expiration_policy: ExpirationPolicy | None = None, | ||
| ) -> str: | ||
| """ | ||
| Uploads the given `contents` to blob storage. | ||
|
|
||
| If no `id` is provided, one will be automatically generated and returned | ||
| from this function. | ||
|
|
||
| The client will select the configured `default_compression` if none is given | ||
| explicitly. | ||
| This can be overridden by explicitly giving a `compression` argument. | ||
| Providing `"none"` as the argument will instruct the client to not apply | ||
| any compression to this upload, which is useful for uncompressible formats. | ||
| """ | ||
| headers = self._make_headers() | ||
| body = BytesIO(contents) if isinstance(contents, bytes) else contents | ||
| original_body: IO[bytes] = body | ||
|
|
||
| compression = compression or self._default_compression | ||
| if compression == "zstd": | ||
| cctx = zstandard.ZstdCompressor() | ||
| body = cctx.stream_reader(original_body) | ||
| headers["Content-Encoding"] = "zstd" | ||
|
|
||
| if expiration_policy: | ||
| headers[HEADER_EXPIRATION] = format_expiration(expiration_policy) | ||
|
|
||
| if metadata: | ||
| for k, v in metadata.items(): | ||
| headers[f"{HEADER_META_PREFIX}{k}"] = v | ||
|
|
||
| with measure_storage_operation( | ||
| self._metrics_backend, "put", self._usecase | ||
| ) as metric_emitter: | ||
| response = self._pool.request( | ||
| "PUT", | ||
| self._make_url(id), | ||
| body=body, | ||
| headers=headers, | ||
| preload_content=True, | ||
| decode_content=True, | ||
| ) | ||
| raise_for_status(response) | ||
| res = response.json() | ||
|
|
||
| # Must do this after streaming `body` as that's what is responsible | ||
| # for advancing the seek position in both streams | ||
| metric_emitter.record_uncompressed_size(original_body.tell()) | ||
| if compression and compression != "none": | ||
| metric_emitter.record_compressed_size(body.tell(), compression) | ||
| return res["key"] | ||
|
|
||
| def get(self, id: str, decompress: bool = True) -> GetResult: | ||
| """ | ||
| This fetches the blob with the given `id`, returning an `IO` stream that | ||
| can be read. | ||
|
|
||
| By default, content that was uploaded compressed will be automatically | ||
| decompressed, unless `decompress=True` is passed. | ||
| """ | ||
|
|
||
| headers = self._make_headers() | ||
| with measure_storage_operation(self._metrics_backend, "get", self._usecase): | ||
| response = self._pool.request( | ||
| "GET", | ||
| self._make_url(id), | ||
| preload_content=False, | ||
| decode_content=False, | ||
| headers=headers, | ||
| ) | ||
| raise_for_status(response) | ||
| # OR: should I use `response.stream()`? | ||
| stream = cast(IO[bytes], response) | ||
| metadata = Metadata.from_headers(response.headers) | ||
|
|
||
| if metadata.compression and decompress: | ||
| if metadata.compression != "zstd": | ||
| raise NotImplementedError( | ||
| "Transparent decoding of anything but `zstd` is not implemented yet" | ||
| ) | ||
|
|
||
| metadata.compression = None | ||
| dctx = zstandard.ZstdDecompressor() | ||
| stream = dctx.stream_reader(stream, read_across_frames=True) | ||
|
|
||
| return GetResult(metadata, stream) | ||
|
|
||
| def object_url(self, id: str) -> str: | ||
| """ | ||
| Generates a GET url to the object with the given `id`. | ||
|
|
||
| This can then be used by downstream services to fetch the given object. | ||
| NOTE however that the service does not strictly follow HTTP semantics, | ||
| in particular in relation to `Accept-Encoding`. | ||
| """ | ||
| return self._make_url(id, full=True) | ||
|
|
||
| def delete(self, id: str) -> None: | ||
| """ | ||
| Deletes the blob with the given `id`. | ||
| """ | ||
|
|
||
| headers = self._make_headers() | ||
| with measure_storage_operation(self._metrics_backend, "delete", self._usecase): | ||
| response = self._pool.request( | ||
| "DELETE", | ||
| self._make_url(id), | ||
| headers=headers, | ||
| ) | ||
| raise_for_status(response) | ||
|
|
||
|
|
||
| class ClientError(Exception): | ||
| def __init__(self, message: str, status: int, response: str): | ||
| super().__init__(message) | ||
| self.status = status | ||
| self.response = response | ||
|
|
||
|
|
||
| def raise_for_status(response: urllib3.BaseHTTPResponse) -> None: | ||
| if response.status >= 400: | ||
| res = str(response.data or response.read()) | ||
| raise ClientError( | ||
| f"Objectstore request failed with status {response.status}", | ||
| response.status, | ||
| res, | ||
| ) |
101 changes: 101 additions & 0 deletions
101
python-objectstore-client/src/objectstore_client/metadata.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import itertools | ||
| import re | ||
| from collections.abc import Mapping | ||
| from dataclasses import dataclass | ||
| from datetime import timedelta | ||
| from typing import Literal, cast | ||
|
|
||
| Compression = Literal["zstd"] | ||
|
|
||
| HEADER_EXPIRATION = "x-sn-expiration" | ||
| HEADER_META_PREFIX = "x-snme-" | ||
|
|
||
|
|
||
| @dataclass | ||
| class TimeToIdle: | ||
| delta: timedelta | ||
|
|
||
|
|
||
| @dataclass | ||
| class TimeToLive: | ||
| delta: timedelta | ||
|
|
||
|
|
||
| ExpirationPolicy = TimeToIdle | TimeToLive | ||
|
|
||
|
|
||
| @dataclass | ||
| class Metadata: | ||
| compression: Compression | None | ||
| expiration_policy: ExpirationPolicy | None | ||
| custom: dict[str, str] | ||
|
|
||
| @classmethod | ||
| def from_headers(cls, headers: Mapping[str, str]) -> Metadata: | ||
| compression = None | ||
| expiration_policy = None | ||
| custom_metadata = {} | ||
| for k, v in headers.items(): | ||
| if k == "content-encoding": | ||
| compression = cast(Compression | None, v) | ||
| elif k == HEADER_EXPIRATION: | ||
| expiration_policy = parse_expiration(v) | ||
| elif k.startswith(HEADER_META_PREFIX): | ||
| custom_metadata[k[len(HEADER_META_PREFIX) :]] = v | ||
| return Metadata(compression, expiration_policy, custom_metadata) | ||
|
|
||
|
|
||
| def format_expiration(expiration_policy: ExpirationPolicy) -> str: | ||
| if isinstance(expiration_policy, TimeToIdle): | ||
| return f"tti:{format_timedelta(expiration_policy.delta)}" | ||
| elif isinstance(expiration_policy, TimeToLive): | ||
| return f"ttl:{format_timedelta(expiration_policy.delta)}" | ||
|
|
||
|
|
||
| def parse_expiration(value: str) -> ExpirationPolicy | None: | ||
| if value.startswith("tti:"): | ||
| return TimeToIdle(parse_timedelta(value[4:])) | ||
| elif value.startswith("ttl:"): | ||
| return TimeToLive(parse_timedelta(value[4:])) | ||
|
|
||
| return None | ||
|
|
||
|
|
||
| def format_timedelta(delta: timedelta) -> str: | ||
| days = delta.days | ||
| output = f"{days} days" if days else "" | ||
| if seconds := delta.seconds: | ||
| if output: | ||
| output += " " | ||
| output += f"{seconds} seconds" | ||
|
|
||
| return output | ||
|
|
||
|
|
||
| TIME_SPLIT = re.compile(r"[^\W\d_]+|\d+") | ||
|
|
||
|
|
||
| def parse_timedelta(delta: str) -> timedelta: | ||
| words = TIME_SPLIT.findall(delta) | ||
| seconds = 0 | ||
|
|
||
| for num, unit in itertools.batched(words, n=2, strict=True): | ||
| num = int(num) | ||
| multiplier = 0 | ||
|
|
||
| if unit.startswith("w"): | ||
| multiplier = 86400 * 7 | ||
| elif unit.startswith("d"): | ||
| multiplier = 86400 | ||
| elif unit.startswith("h"): | ||
| multiplier = 3600 | ||
| elif unit.startswith("m") and not unit.startswith("ms"): | ||
| multiplier = 60 | ||
| elif unit.startswith("s"): | ||
| multiplier = 1 | ||
|
|
||
| seconds += num * multiplier | ||
|
|
||
| return timedelta(seconds=seconds) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.