Skip to content

Commit

Permalink
[AIRFLOW-3600] Remove dagbag from trigger (apache#4407)
Browse files Browse the repository at this point in the history
* Remove dagbag from trigger call

* Adding fix to rbac

* empty commit

* Added create_dagrun to DagModel

* Adding testing to /trigger calls

* Make session a class var
  • Loading branch information
ffinfo authored and Alice Berard committed Jan 3, 2019
1 parent d67715d commit ab4d048
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 7 deletions.
38 changes: 38 additions & 0 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3000,6 +3000,44 @@ def get_default_view(self):
else:
return self.default_view

def get_dag(self):
return DagBag(dag_folder=self.fileloc).get_dag(self.dag_id)

@provide_session
def create_dagrun(self,
run_id,
state,
execution_date,
start_date=None,
external_trigger=False,
conf=None,
session=None):
"""
Creates a dag run from this dag including the tasks associated with this dag.
Returns the dag run.
:param run_id: defines the the run id for this dag run
:type run_id: str
:param execution_date: the execution date of this dag run
:type execution_date: datetime
:param state: the state of the dag run
:type state: State
:param start_date: the date this dag run should be evaluated
:type start_date: datetime
:param external_trigger: whether this dag run is externally triggered
:type external_trigger: bool
:param session: database session
:type session: Session
"""

return self.get_dag().create_dagrun(run_id=run_id,
state=state,
execution_date=execution_date,
start_date=start_date,
external_trigger=external_trigger,
conf=conf,
session=session)


@functools.total_ordering
class DAG(BaseDag, LoggingMixin):
Expand Down
8 changes: 4 additions & 4 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,11 +1069,11 @@ def delete(self):
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def trigger(self):
@provide_session
def trigger(self, session=None):
dag_id = request.args.get('dag_id')
origin = request.args.get('origin') or "/admin/"
dag = dagbag.get_dag(dag_id)

dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
if not dag:
flash("Cannot find dag {}".format(dag_id))
return redirect(origin)
Expand Down Expand Up @@ -1592,7 +1592,7 @@ class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
task_instances=json.dumps(task_instances, indent=2),
tasks=json.dumps(tasks, indent=2),
nodes=json.dumps(nodes, indent=2),
edges=json.dumps(edges, indent=2), )
edges=json.dumps(edges, indent=2))

@expose('/duration')
@login_required
Expand Down
6 changes: 3 additions & 3 deletions airflow/www_rbac/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,11 +798,11 @@ def delete(self):
@has_dag_access(can_dag_edit=True)
@has_access
@action_logging
def trigger(self):
@provide_session
def trigger(self, session=None):
dag_id = request.args.get('dag_id')
origin = request.args.get('origin') or "/"
dag = dagbag.get_dag(dag_id)

dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
if not dag:
flash("Cannot find dag {}".format(dag_id))
return redirect(origin)
Expand Down
31 changes: 31 additions & 0 deletions tests/www/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from urllib.parse import quote_plus
from werkzeug.test import Client
from sqlalchemy import func

from airflow import models, configuration
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
Expand Down Expand Up @@ -821,5 +822,35 @@ def test_delete_dag_button_for_dag_on_scheduler_only(self):
session.commit()


class TestTriggerDag(unittest.TestCase):

def setUp(self):
conf.load_test_config()
app = application.create_app(testing=True)
app.config['WTF_CSRF_METHODS'] = []
self.app = app.test_client()
self.session = Session()
models.DagBag().get_dag("example_bash_operator").sync_to_db()

def test_trigger_dag_button_normal_exist(self):
resp = self.app.get('/', follow_redirects=True)
self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
self.assertIn("return confirmDeleteDag('example_bash_operator')", resp.data.decode('utf-8'))

def test_trigger_dag_button(self):

test_dag_id = "example_bash_operator"

DR = models.DagRun
self.session.query(DR).delete()
self.session.commit()

self.app.get('/admin/airflow/trigger?dag_id={}'.format(test_dag_id))

run = self.session.query(DR).filter(DR.dag_id == test_dag_id).first()
self.assertIsNotNone(run)
self.assertIn("manual__", run.run_id)


if __name__ == '__main__':
unittest.main()
27 changes: 27 additions & 0 deletions tests/www_rbac/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1428,5 +1428,32 @@ def test_start_date_filter(self):
pass


class TestTriggerDag(TestBase):

def setUp(self):
super(TestTriggerDag, self).setUp()
self.session = Session()
models.DagBag().get_dag("example_bash_operator").sync_to_db(session=self.session)

def test_trigger_dag_button_normal_exist(self):
resp = self.client.get('/', follow_redirects=True)
self.assertIn('/trigger?dag_id=example_bash_operator', resp.data.decode('utf-8'))
self.assertIn("return confirmDeleteDag('example_bash_operator')", resp.data.decode('utf-8'))

def test_trigger_dag_button(self):

test_dag_id = "example_bash_operator"

DR = models.DagRun
self.session.query(DR).delete()
self.session.commit()

resp = self.client.get('trigger?dag_id={}'.format(test_dag_id))

run = self.session.query(DR).filter(DR.dag_id == test_dag_id).first()
self.assertIsNotNone(run)
self.assertIn("manual__", run.run_id)


if __name__ == '__main__':
unittest.main()

0 comments on commit ab4d048

Please sign in to comment.