From b9f9bd5643cd824402b7cbc8126e5ba1e44818a9 Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Wed, 17 May 2023 17:12:46 +0200 Subject: [PATCH 1/2] Fix broken `dbutils.fs.mount` and `dbutils.fs.updateMount` Fixes #112 --- databricks/sdk/dbutils.py | 106 +++++++++++++++++++++++++++++--------- tests/test_dbutils.py | 2 +- 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/databricks/sdk/dbutils.py b/databricks/sdk/dbutils.py index 7204fd016..e959cd012 100644 --- a/databricks/sdk/dbutils.py +++ b/databricks/sdk/dbutils.py @@ -1,14 +1,17 @@ import base64 import json +import logging import threading import typing from collections import namedtuple -from .core import ApiClient, Config +from .core import ApiClient, Config, DatabricksError from .mixins import compute as compute_ext from .mixins import dbfs as dbfs_ext from .service import compute, workspace +_LOG = logging.getLogger('databricks.sdk') + class FileInfo(namedtuple('FileInfo', ['path', 'name', 'size', "modificationTime"])): pass @@ -77,38 +80,42 @@ def rm(self, dir: str, recurse: bool = False) -> bool: def mount(self, source: str, - mountPoint: str, - encryptionType: str = "", - owner: str = "", - extraConfigs: 'typing.Dict[str, str]' = None, - ) -> bool: + mount_point: str, + encryption_type: str = None, + owner: str = None, + extra_configs: 'typing.Dict[str, str]' = None) -> bool: """Mounts the given source directory into DBFS at the given mount point""" fs = self._proxy_factory('fs') - return fs.mount(source=source, - mountPoint=mountPoint, - encryptionType=encryptionType, - owner=owner, - extraConfigs=extraConfigs) - - def unmount(self, mountPoint: str) -> bool: + kwargs = {} + if encryption_type: + kwargs['encryption_type'] = encryption_type + if owner: + kwargs['owner'] = owner + if extra_configs: + kwargs['extra_configs'] = extra_configs + return fs.mount(source, mount_point, **kwargs) + + def unmount(self, mount_point: str) -> bool: """Deletes a DBFS mount point""" fs = self._proxy_factory('fs') - return fs.unmount(mountPoint) + return fs.unmount(mount_point) def updateMount(self, source: str, - mountPoint: str, - encryptionType: str = "", - owner: str = "", - extraConfigs: 'typing.Dict[str, str]' = None, - ) -> bool: + mount_point: str, + encryption_type: str = None, + owner: str = None, + extra_configs: 'typing.Dict[str, str]' = None) -> bool: """ Similar to mount(), but updates an existing mount point (if present) instead of creating a new one """ fs = self._proxy_factory('fs') - return fs.updateMount(source=source, - mountPoint=mountPoint, - encryptionType=encryptionType, - owner=owner, - extraConfigs=extraConfigs) + kwargs = {} + if encryption_type: + kwargs['encryption_type'] = encryption_type + if owner: + kwargs['owner'] = owner + if extra_configs: + kwargs['extra_configs'] = extra_configs + return fs.updateMount(source, mount_point, **kwargs) def mounts(self) -> typing.List[MountInfo]: """ Displays information about what is mounted within DBFS """ @@ -214,6 +221,10 @@ def __getattr__(self, method: str) -> '_ProxyCall': method=method) +import html +import re + + class _ProxyCall: def __init__(self, *, command_execution: compute.CommandExecutionAPI, @@ -225,6 +236,52 @@ def __init__(self, *, command_execution: compute.CommandExecutionAPI, self._util = util self._method = method + _out_re = re.compile(r'Out\[[\d\s]+]:\s') + _tag_re = re.compile(r'<[^>]*>') + _exception_re = re.compile(r'.*Exception:\s+(.*)') + _execution_error_re = re.compile( + r'ExecutionError: ([\s\S]*)\n(StatusCode=[0-9]*)\n(StatusDescription=.*)\n') + _error_message_re = re.compile(r'ErrorMessage=(.+)\n') + _ascii_escape_re = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -/]*[@-~]') + + def _is_failed(self, results: compute.Results) -> bool: + return results.result_type == compute.ResultType.error + + def _text(self, results: compute.Results) -> str: + if results.result_type != compute.ResultType.text: + return '' + return self._out_re.sub("", str(results.data)) + + def _raise_if_failed(self, results: compute.Results): + if not self._is_failed(results): + return + raise DatabricksError(self._error_from_results(results)) + + def _error_from_results(self, results: compute.Results): + if not self._is_failed(results): + return + if results.cause: + _LOG.debug(f'{self._ascii_escape_re.sub("", results.cause)}') + + summary = self._tag_re.sub("", results.summary) + summary = html.unescape(summary) + + exception_matches = self._exception_re.findall(summary) + if len(exception_matches) == 1: + summary = exception_matches[0].replace("; nested exception is:", "") + summary = summary.rstrip(" ") + return summary + + execution_error_matches = self._execution_error_re.findall(results.cause) + if len(execution_error_matches) == 1: + return "\n".join(execution_error_matches[0]) + + error_message_matches = self._error_message_re.findall(results.cause) + if len(error_message_matches) == 1: + return error_message_matches[0] + + return summary + def __call__(self, *args, **kwargs): raw = json.dumps((args, kwargs)) code = f''' @@ -239,6 +296,7 @@ def __call__(self, *args, **kwargs): context_id=ctx.id, command=code).result() if result.status == compute.CommandStatus.Finished: + self._raise_if_failed(result.results) raw = result.results.data return json.loads(raw) else: diff --git a/tests/test_dbutils.py b/tests/test_dbutils.py index ff13ca95e..f956fffcd 100644 --- a/tests/test_dbutils.py +++ b/tests/test_dbutils.py @@ -178,7 +178,7 @@ def test_fs_mounts(dbutils_proxy): mounts = dbutils.fs.mounts() assert len(mounts) == 2 - assert mounts[0].mountPoint == 'a' + assert mounts[0].mount_point == 'a' assert mounts[0].source == 'b' assertions() From 0ca9991e0bc1af60ff46bc09365a33a17797056e Mon Sep 17 00:00:00 2001 From: Serge Smertin Date: Wed, 17 May 2023 17:25:28 +0200 Subject: [PATCH 2/2] Fix broken `dbutils.fs.mount` and `dbutils.fs.updateMount` Fixes #112 --- tests/test_dbutils.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_dbutils.py b/tests/test_dbutils.py index f956fffcd..42ff9bc63 100644 --- a/tests/test_dbutils.py +++ b/tests/test_dbutils.py @@ -137,9 +137,7 @@ def assertions(): def test_fs_mount(dbutils_proxy): command = ('\n' ' import json\n' - ' (args, kwargs) = json.loads(\'[[], {"source": "s3://foo", ' - '"mountPoint": "bar", "encryptionType": "", "owner": "", ' - '"extraConfigs": null}]\')\n' + ' (args, kwargs) = json.loads(\'[["s3://foo", "bar"], {}]\')\n' ' result = dbutils.fs.mount(*args, **kwargs)\n' ' dbutils.notebook.exit(json.dumps(result))\n' ' ') @@ -153,9 +151,7 @@ def test_fs_mount(dbutils_proxy): def test_fs_update_mount(dbutils_proxy): command = ('\n' ' import json\n' - ' (args, kwargs) = json.loads(\'[[], {"source": ' - '"s3://foo2", "mountPoint": "bar", "encryptionType": "", "owner": ' - '"", "extraConfigs": null}]\')\n' + ' (args, kwargs) = json.loads(\'[["s3://foo2", "bar"], {}]\')\n' ' result = dbutils.fs.updateMount(*args, **kwargs)\n' ' dbutils.notebook.exit(json.dumps(result))\n' ' ') @@ -178,7 +174,7 @@ def test_fs_mounts(dbutils_proxy): mounts = dbutils.fs.mounts() assert len(mounts) == 2 - assert mounts[0].mount_point == 'a' + assert mounts[0].mountPoint == 'a' assert mounts[0].source == 'b' assertions()