Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/terminate jobs #15

Merged
merged 6 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bricklayer/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.6'
__version__ = '0.0.7'
78 changes: 62 additions & 16 deletions bricklayer/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from databricks_cli.workspace.api import WorkspaceApi
from databricks_cli.jobs.api import JobsApi
from databricks_cli.sdk import ApiClient
from databricks_cli.sdk import JobsService
from databricks_cli.clusters.api import ClusterApi
from databricks_cli.runs.api import RunsApi

Expand All @@ -35,7 +36,7 @@ def __init__(self, job, run_id, client):
self.job = job
self.run_id = run_id
self._client = client

@property
def data(self):
'''Return the data from the raw API call'''
Expand Down Expand Up @@ -75,7 +76,7 @@ def __init__(self, job_id, client):
self.job_id = job_id
self._client = client
self.runs = []

def run_now(self, jar_params=None, notebook_params=None, python_params=None,
spark_submit_params=None):
"""Run this job.
Expand All @@ -96,6 +97,15 @@ def run_now(self, jar_params=None, notebook_params=None, python_params=None,
self.runs.append(run)
return run

def stop(self):
"Stop this job."
for run in self.runs:
JobsService(self._client).client.perform_query(
'POST', '/jobs/runs/cancel', data={
"run_id": run.run_id
}
)


class DBSApi(object):

Expand All @@ -107,7 +117,7 @@ def __init__(
):
if token is None:
token = get_notebook_context().get_api_token()

if host is None:
host = get_notebook_context().get_browser_host_name_url()

Expand Down Expand Up @@ -151,36 +161,43 @@ def mkdir(self, dir_path):
)
)

def backup_notebook(self, source_path, target_path, tmp_dir):
def backup_notebook(self, source_path, target_path, fmt="DBC"):
"Backup a notebook to another place in the workspace"
tmp_name = f'backup_{random.randint(0,1000)}'
tmp_dir = '/dbfs/tmp/'
tmp_name = 'backup'
intermediate_location = pathlib.Path(tmp_dir).joinpath(tmp_name)
self.export_notebook(source_path, intermediate_location.as_posix())
self.export_notebook(source_path, intermediate_location.as_posix(), fmt)
try:
self.import_notebook(intermediate_location, target_path)
self.import_notebook(intermediate_location, target_path, fmt)
finally:
intermediate_location.unlink()

def export_current_notebook_run(self, runs_dir, tmp_dir):
"""Save the current notebook to a given location preserving
the path and timestamp"""
def export_current_notebook_run(self, target_path, fmt="DBC"):
"""Save the current notebook to a given location in the required format (default DBC)
and preserving the path and timestamp.
Formats allowed:
SOURCE : The notebook will be imported/exported as source code.
HTML : The notebook will be imported/exported as an HTML file.
JUPYTER: The notebook will be imported/exported as a Jupyter/IPython Notebook file.
DBC : The notebook will be imported/exported as Databricks archive format.
"""
current_path = get_notebook_context().get_notebook_path()
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
target_path = (
pathlib.Path(runs_dir)
pathlib.Path(target_path)
.joinpath(current_path[1:])
.joinpath(timestamp)
)
try:
self.backup_notebook(current_path, target_path.as_posix(), tmp_dir)
self.backup_notebook(current_path, target_path.as_posix(), fmt)
except requests.exceptions.HTTPError as _e:
error_code = _e.response.json()['error_code']
if error_code == 'RESOURCE_DOES_NOT_EXIST':
self.mkdir(target_path.parent.as_posix())
self.backup_notebook(current_path, target_path.as_posix(), tmp_dir)
self.backup_notebook(current_path, target_path.as_posix(), fmt)
else:
raise

def create_job(self, notebook_path, job_name=None, cluster_name=None,
cluster_id=None, notifications_email=None):
"""Create a databricks job.
Expand All @@ -191,7 +208,7 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None,
be provided at the same time with cluster_name)
:param notifications_email: If provided notifications on success or failure on the job run
will be sent to this email address.

Examples
--------
```
Expand Down Expand Up @@ -245,7 +262,7 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None,
.joinpath(notebook_path)
.as_posix()
)

_json = (
{
"name": _job_name,
Expand All @@ -269,4 +286,33 @@ def create_job(self, notebook_path, job_name=None, cluster_name=None,
self._client
)

def list_jobs(self, job_name='', job_id=''):
"""List all jobs with job name or job id
"""
jobs = []
_jobs = JobsApi(self._client).list_jobs()['jobs']

if job_name:
result = list(
filter(
lambda job:
job_name in job['settings']['name'],
_jobs
))

if job_id:
result = list(
filter(
lambda job:
job_id in job['job_id'],
_jobs
))

for jobdata in result:
job = DBJob(
jobdata['job_id'],
self._client
)
jobs.append(job)

return jobs