Skip to content

Commit

Permalink
Cleanup usage of marquez-client (apache#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
roaraya8 authored Feb 8, 2019
1 parent 8d12824 commit e40e068
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 77 deletions.
40 changes: 20 additions & 20 deletions marquez/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pendulum
import airflow.models
from airflow.utils.db import provide_session
from marquez.client import MarquezClient, RunState
from marquez_client.marquez import MarquezClient


class MarquezDag(airflow.models.DAG):
Expand All @@ -20,14 +20,14 @@ def create_dagrun(self, *args, **kwargs):
job_run_args = "{}" # TODO retrieve from DAG/tasks
start_time = pendulum.instance(kwargs['execution_date']).to_datetime_string()
end_time = None
state = RunState.RUNNING

self.mqz_client.set_namespace(self.mqz_namespace)
self.mqz_client.create_job(job_name, self.mqz_location, self.mqz_input_datasets, self.mqz_output_datasets,
self.description)
mqz_job_run_id = self.mqz_client.create_job_run(job_name, job_run_args=job_run_args,
nominal_start_time=start_time, nominal_end_time=end_time)
self.mqz_client.set_jobrun_state(mqz_job_run_id, state)
nominal_start_time=start_time,
nominal_end_time=end_time).run_id
self.mqz_client.mark_job_run_running(mqz_job_run_id)

self.marquez_log('job_running', json.dumps(
{'namespace': self.mqz_namespace,
Expand All @@ -39,8 +39,7 @@ def create_dagrun(self, *args, **kwargs):
'nominal_end_time': end_time,
'jobrun_id': mqz_job_run_id,
'inputDatasetUrns': self.mqz_input_datasets,
'outputDatasetUrns': self.mqz_output_datasets,
'state': str(state.name)
'outputDatasetUrns': self.mqz_output_datasets
}))

run = super().create_dagrun(*args, **kwargs)
Expand All @@ -50,22 +49,23 @@ def create_dagrun(self, *args, **kwargs):

def handle_callback(self, *args, **kwargs):
job_name = self.dag_id

if kwargs.get('success'):
state = RunState.COMPLETED
else:
state = RunState.FAILED



mqz_job_run_id = self.get_and_delete(args[0].run_id)

if mqz_job_run_id:
self.mqz_client.set_jobrun_state(mqz_job_run_id, state)
self.marquez_log('job_state_change',
json.dumps({'job_name': job_name,
'jobrun_id': mqz_job_run_id,
'state': str(state.name)}))

if kwargs.get('success'):
self.mqz_client.mark_job_run_completed(mqz_job_run_id)
self.marquez_log('job_state_change',
json.dumps({'job_name': job_name,
'jobrun_id': mqz_job_run_id,
'state': 'COMPLETED'}))
else:
self.mqz_client.mark_job_run_failed(mqz_job_run_id)
self.marquez_log('job_state_change',
json.dumps({'job_name': job_name,
'jobrun_id': mqz_job_run_id,
'state': 'FAILED'}))

else:
# TODO warn that the jobrun_id couldn't be found
pass
Expand All @@ -90,4 +90,4 @@ def marquez_log(self, event, extras, session=None):
owner="marquez",
extra=extras,
task_id=None,
dag_id=self.dag_id))
dag_id=self.dag_id))
57 changes: 0 additions & 57 deletions marquez/client/__init__.py

This file was deleted.

0 comments on commit e40e068

Please sign in to comment.