diff --git a/bricklayer/__version__.py b/bricklayer/__version__.py index fa9c4ec..2792152 100644 --- a/bricklayer/__version__.py +++ b/bricklayer/__version__.py @@ -1 +1 @@ -__version__ = '0.0.6' +__version__ = '0.0.7' diff --git a/bricklayer/api/__init__.py b/bricklayer/api/__init__.py index 6e8a891..08db29b 100644 --- a/bricklayer/api/__init__.py +++ b/bricklayer/api/__init__.py @@ -23,6 +23,7 @@ from databricks_cli.workspace.api import WorkspaceApi from databricks_cli.jobs.api import JobsApi from databricks_cli.sdk import ApiClient +from databricks_cli.sdk import JobsService from databricks_cli.clusters.api import ClusterApi from databricks_cli.runs.api import RunsApi @@ -35,7 +36,7 @@ def __init__(self, job, run_id, client): self.job = job self.run_id = run_id self._client = client - + @property def data(self): '''Return the data from the raw API call''' @@ -75,7 +76,7 @@ def __init__(self, job_id, client): self.job_id = job_id self._client = client self.runs = [] - + def run_now(self, jar_params=None, notebook_params=None, python_params=None, spark_submit_params=None): """Run this job. @@ -96,6 +97,15 @@ def run_now(self, jar_params=None, notebook_params=None, python_params=None, self.runs.append(run) return run + def stop(self): + "Stop this job." + for run in self.runs: + JobsService(self._client).client.perform_query( + 'POST', '/jobs/runs/cancel', data={ + "run_id": run.run_id + } + ) + class DBSApi(object): @@ -107,7 +117,7 @@ def __init__( ): if token is None: token = get_notebook_context().get_api_token() - + if host is None: host = get_notebook_context().get_browser_host_name_url() @@ -151,36 +161,43 @@ def mkdir(self, dir_path): ) ) - def backup_notebook(self, source_path, target_path, tmp_dir): + def backup_notebook(self, source_path, target_path, fmt="DBC"): "Backup a notebook to another place in the workspace" - tmp_name = f'backup_{random.randint(0,1000)}' + tmp_dir = '/dbfs/tmp/' + tmp_name = 'backup' intermediate_location = pathlib.Path(tmp_dir).joinpath(tmp_name) - self.export_notebook(source_path, intermediate_location.as_posix()) + self.export_notebook(source_path, intermediate_location.as_posix(), fmt) try: - self.import_notebook(intermediate_location, target_path) + self.import_notebook(intermediate_location, target_path, fmt) finally: intermediate_location.unlink() - def export_current_notebook_run(self, runs_dir, tmp_dir): - """Save the current notebook to a given location preserving - the path and timestamp""" + def export_current_notebook_run(self, target_path, fmt="DBC"): + """Save the current notebook to a given location in the required format (default DBC) + and preserving the path and timestamp. + Formats allowed: + SOURCE : The notebook will be imported/exported as source code. + HTML : The notebook will be imported/exported as an HTML file. + JUPYTER: The notebook will be imported/exported as a Jupyter/IPython Notebook file. + DBC : The notebook will be imported/exported as Databricks archive format. + """ current_path = get_notebook_context().get_notebook_path() timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S') target_path = ( - pathlib.Path(runs_dir) + pathlib.Path(target_path) .joinpath(current_path[1:]) .joinpath(timestamp) ) try: - self.backup_notebook(current_path, target_path.as_posix(), tmp_dir) + self.backup_notebook(current_path, target_path.as_posix(), fmt) except requests.exceptions.HTTPError as _e: error_code = _e.response.json()['error_code'] if error_code == 'RESOURCE_DOES_NOT_EXIST': self.mkdir(target_path.parent.as_posix()) - self.backup_notebook(current_path, target_path.as_posix(), tmp_dir) + self.backup_notebook(current_path, target_path.as_posix(), fmt) else: raise - + def create_job(self, notebook_path, job_name=None, cluster_name=None, cluster_id=None, notifications_email=None): """Create a databricks job. @@ -191,7 +208,7 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None, be provided at the same time with cluster_name) :param notifications_email: If provided notifications on success or failure on the job run will be sent to this email address. - + Examples -------- ``` @@ -245,7 +262,7 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None, .joinpath(notebook_path) .as_posix() ) - + _json = ( { "name": _job_name, @@ -269,4 +286,33 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None, self._client ) + def list_jobs(self, job_name='', job_id=''): + """List all jobs with job name or job id + """ + jobs = [] + _jobs = JobsApi(self._client).list_jobs()['jobs'] + + if job_name: + result = list( + filter( + lambda job: + job_name in job['settings']['name'], + _jobs + )) + + if job_id: + result = list( + filter( + lambda job: + job_id in job['job_id'], + _jobs + )) + + for jobdata in result: + job = DBJob( + jobdata['job_id'], + self._client + ) + jobs.append(job) + return jobs