Skip to content

Commit

Permalink
[AIRFLOW-3865] Add API endpoint to get python code of dag by id (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramandumcs authored and ashb committed Mar 22, 2019
1 parent 1045a46 commit c316e2e
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 0 deletions.
42 changes: 42 additions & 0 deletions airflow/api/common/experimental/get_code.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from airflow.exceptions import AirflowException, DagNotFound
from airflow import models, settings
from airflow.www import utils as wwwutils


def get_code(dag_id):
"""Return python code of a given dag_id."""
session = settings.Session()
DM = models.DagModel
dag = session.query(DM).filter(DM.dag_id == dag_id).first()
session.close()
# Check DAG exists.
if dag is None:
error_message = "Dag id {} not found".format(dag_id)
raise DagNotFound(error_message)

try:
with wwwutils.open_maybe_zipped(dag.fileloc, 'r') as f:
code = f.read()
return code
except IOError as e:
error_message = "Error {} while reading Dag id {} Code".format(str(e), dag_id)
raise AirflowException(error_message)
14 changes: 14 additions & 0 deletions airflow/www/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from airflow.api.common.experimental.get_dag_runs import get_dag_runs
from airflow.api.common.experimental.get_task import get_task
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.api.common.experimental.get_code import get_code
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
from airflow.utils import timezone
Expand Down Expand Up @@ -138,6 +139,19 @@ def test():
return jsonify(status='OK')


@api_experimental.route('/dags/<string:dag_id>/code', methods=['GET'])
@requires_authentication
def get_dag_code(dag_id):
"""Return python code of a given dag_id."""
try:
return get_code(dag_id)
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response


@api_experimental.route('/dags/<string:dag_id>/tasks/<string:task_id>', methods=['GET'])
@requires_authentication
def task_info(dag_id, task_id):
Expand Down
14 changes: 14 additions & 0 deletions airflow/www_rbac/api/experimental/endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from airflow.api.common.experimental.get_dag_runs import get_dag_runs
from airflow.api.common.experimental.get_task import get_task
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.api.common.experimental.get_code import get_code
from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
from airflow.exceptions import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
Expand Down Expand Up @@ -119,6 +120,19 @@ def test():
return jsonify(status='OK')


@api_experimental.route('/dags/<string:dag_id>/code', methods=['GET'])
@requires_authentication
def get_dag_code(dag_id):
"""Return python code of a given dag_id."""
try:
return get_code(dag_id)
except AirflowException as err:
_log.info(err)
response = jsonify(error="{}".format(err))
response.status_code = err.status_code
return response


@api_experimental.route('/dags/<string:dag_id>/tasks/<string:task_id>', methods=['GET'])
@requires_authentication
def task_info(dag_id, task_id):
Expand Down
14 changes: 14 additions & 0 deletions tests/www/api/experimental/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ def test_task_info(self):
self.assertIn('error', response.data.decode('utf-8'))
self.assertEqual(404, response.status_code)

def test_get_dag_code(self):
url_template = '/api/experimental/dags/{}/code'

response = self.app.get(
url_template.format('example_bash_operator')
)
self.assertIn('BashOperator(', response.data.decode('utf-8'))
self.assertEqual(200, response.status_code)

response = self.app.get(
url_template.format('xyz')
)
self.assertEqual(404, response.status_code)

def test_task_paused(self):
url_template = '/api/experimental/dags/{}/paused/{}'

Expand Down
14 changes: 14 additions & 0 deletions tests/www_rbac/api/experimental/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,20 @@ def test_task_info(self):
self.assertIn('error', response.data.decode('utf-8'))
self.assertEqual(404, response.status_code)

def test_get_dag_code(self):
url_template = '/api/experimental/dags/{}/code'

response = self.client.get(
url_template.format('example_bash_operator')
)
self.assertIn('BashOperator(', response.data.decode('utf-8'))
self.assertEqual(200, response.status_code)

response = self.client.get(
url_template.format('xyz')
)
self.assertEqual(404, response.status_code)

def test_task_paused(self):
url_template = '/api/experimental/dags/{}/paused/{}'

Expand Down

0 comments on commit c316e2e

Please sign in to comment.