Skip to content

Commit

Permalink
Merge branch 'refs/heads/master' into enhancement/annotation-metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
kbolashev committed Jul 22, 2024
2 parents 9017527 + 3daa07d commit bb531a6
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 12 deletions.
2 changes: 1 addition & 1 deletion dagshub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.3.31"
__version__ = "0.3.32"
from .logger import DAGsHubLogger, dagshub_logger
from .common.init import init
from .upload.wrapper import upload_files
Expand Down
34 changes: 24 additions & 10 deletions dagshub/common/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from pathlib import Path
from typing import Tuple, Callable, Optional, List, Union, Dict

from httpx import Auth
from httpx import Auth, Response
from tenacity import stop_after_attempt, wait_exponential, before_sleep_log, retry, retry_if_exception

from dagshub.common import config

Expand Down Expand Up @@ -126,16 +127,29 @@ def download_url_to_bucket_path(url: str) -> Optional[Tuple[str, str, str]]:
return groups["proto"], groups["bucket"], groups["path"]


class DownloadError(Exception):
def __init__(self, response: Response):
self.response = response
super().__init__(f"Download failed with status code {response.status_code}")


def is_download_server_error(error: BaseException) -> bool:
if not isinstance(error, DownloadError):
return False
return error.response.status_code >= 500


@retry(
retry=retry_if_exception(is_download_server_error),
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10),
before_sleep=before_sleep_log(logger, logging.WARNING),
)
def _dagshub_download(url: str, auth: Auth) -> bytes:
resp = http_request("GET", url, auth=auth, timeout=600)
try:
assert resp.status_code == 200
# TODO: retry
except AssertionError:
raise RuntimeError(
f"Couldn't download file at URL {url}. Response code {resp.status_code} (Body: {resp.content})"
)
return resp.content
if resp.status_code == 200:
return resp.content
raise DownloadError(resp)


BucketDownloaderFuncType = Callable[[str, str], bytes]
Expand Down Expand Up @@ -219,7 +233,7 @@ def download_files(

# Convert string paths to Path objects
for i, file_tuple in enumerate(files):
if type(file_tuple[1]) is str:
if isinstance(file_tuple[1], str):
files[i] = (file_tuple[0], Path(file_tuple[1]))

if download_fn is None:
Expand Down
7 changes: 6 additions & 1 deletion dagshub/data_engine/client/gql_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,12 @@ def datasource_params(id: Optional[Union[int, str]], name: Optional[str]) -> Dic
@staticmethod
@functools.lru_cache()
def datasource_query(include_metadata: bool, introspection: "TypesIntrospection") -> GqlQuery:
metadata_fields = "metadata { key value timeZone }" if include_metadata else ""
metadata_fields = ""
if include_metadata:
# Filter out the unavailable fields
queryable_fields = ["key", "value", "timeZone"]
queryable_fields = Validators.filter_supported_fields(queryable_fields, "MetadataField", introspection)
metadata_fields = "metadata " + "{" + " ".join(queryable_fields) + "}"
q = (
GqlQuery()
.operation(
Expand Down

0 comments on commit bb531a6

Please sign in to comment.