Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-3735] Separate dag parsing from checking #4880

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 33 additions & 102 deletions airflow/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from __future__ import unicode_literals

import copy
from collections import defaultdict, namedtuple, OrderedDict
from builtins import ImportError as BuiltinImportError, bytes, object, str
from collections import defaultdict, namedtuple, OrderedDict

from future.standard_library import install_aliases

from airflow.models.base import Base, ID_LEN
Expand All @@ -40,9 +41,6 @@
import dill
import functools
import getpass
import imp
import importlib
import zipfile
import jinja2
import json
import logging
Expand Down Expand Up @@ -93,7 +91,7 @@

from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
from airflow.utils import timezone
from airflow.utils.dag_processing import list_py_file_paths
from airflow.utils.dag_processing import list_py_file_paths, process_dag_file
from airflow.utils.dates import cron_presets, date_range as utils_date_range
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -302,7 +300,6 @@ def __init__(
self.file_last_changed = {}
self.executor = executor
self.import_errors = {}
self.has_logged = False

self.collect_dags(
dag_folder=dag_folder,
Expand Down Expand Up @@ -359,6 +356,9 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
if filepath is None or not os.path.isfile(filepath):
return found_dags

if filepath.endswith(".pyc"):
filepath = filepath.rstrip("c")

try:
# This failed before in what may have been a git sync
# race condition
Expand All @@ -369,105 +369,36 @@ def process_file(self, filepath, only_if_updated=True, safe_mode=True):
return found_dags

except Exception as e:
self.import_errors[filepath] = str(e)
self.log.exception(e)
return found_dags

mods = []
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
if safe_mode and os.path.isfile(filepath):
with open(filepath, 'rb') as f:
content = f.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = file_last_changed_on_disk
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s assumed to contain no DAGs. Skipping.",
filepath)
return found_dags

self.log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ('unusual_prefix_' +
hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
'_' + org_mod_name)

if mod_name in sys.modules:
del sys.modules[mod_name]

with timeout(configuration.conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
try:
m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk

else:
zip_file = zipfile.ZipFile(filepath)
for mod in zip_file.infolist():
head, _ = os.path.split(mod.filename)
mod_name, ext = os.path.splitext(mod.filename)
if not head and (ext == '.py' or ext == '.pyc'):
if mod_name == '__init__':
self.log.warning("Found __init__.%s at root of %s", ext, filepath)
if safe_mode:
with zip_file.open(mod.filename) as zf:
self.log.debug("Reading %s from %s", mod.filename, filepath)
content = zf.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
self.file_last_changed[filepath] = (
file_last_changed_on_disk)
# todo: create ignore list
# Don't want to spam user with skip messages
if not self.has_logged:
self.has_logged = True
self.log.info(
"File %s assumed to contain no DAGs. Skipping.",
filepath)

if mod_name in sys.modules:
del sys.modules[mod_name]
try:
found_dags = process_dag_file(filepath, safe_mode=safe_mode)
except Exception as e:
self.import_errors[filepath] = str(e)
self.log.exception("Failed to dag file: %s, message: $s", filepath, e)

try:
sys.path.insert(0, filepath)
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
self.log.exception("Failed to import: %s", filepath)
self.import_errors[filepath] = str(e)
self.file_last_changed[filepath] = file_last_changed_on_disk

for m in mods:
for dag in list(m.__dict__.values()):
if isinstance(dag, DAG):
if not dag.full_filepath:
dag.full_filepath = filepath
if dag.fileloc != filepath and not is_zipfile:
dag.fileloc = filepath
try:
dag.is_subdag = False
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
croniter(dag._schedule_interval)
found_dags.append(dag)
found_dags += dag.subdags
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
for dag in found_dags:
try:
self.bag_dag(dag, parent_dag=dag, root_dag=dag)
if isinstance(dag._schedule_interval, six.string_types):
croniter(dag._schedule_interval)
except (CroniterBadCronError,
CroniterBadDateError,
CroniterNotAlphaError) as cron_e:
found_dags.remove(dag)
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = \
"Invalid Cron expression: " + str(cron_e)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk
except AirflowDagCycleException as cycle_exception:
found_dags.remove(dag)
self.log.exception("Failed to bag_dag: %s", dag.full_filepath)
self.import_errors[dag.full_filepath] = str(cycle_exception)
self.file_last_changed[dag.full_filepath] = \
file_last_changed_on_disk

self.file_last_changed[filepath] = file_last_changed_on_disk
return found_dags
Expand Down Expand Up @@ -1730,7 +1661,7 @@ def email_alert(self, exception):
jinja_context.update(dict(
exception=exception,
exception_html=exception_html,
try_number=self.try_number - 1,
try_number=self.try_number,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was because the test order did change because I moved the DagBag tests to a separated file. In the testing the state of a task would be changed from RUNNING to something else. the try_number in TaskInstances does a +1 based on the state. Not the cleanest way. @ashb, I see this part is on your name. Can you double check the logic here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About all I remember of this code now was it was a hack on top of a hack :(

max_tries=self.max_tries))

jinja_env = self.task.get_template_env()
Expand Down
2 changes: 2 additions & 0 deletions airflow/operators/subdag_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ def __init__(
)

self.subdag = subdag
self.subdag.is_subdag = True
self.subdag.parent_dag = dag
# Airflow pool is not honored by SubDagOperator.
# Hence resources could be consumed by SubdagOperators
# Use other executor with your own risk.
Expand Down
84 changes: 84 additions & 0 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from __future__ import print_function
from __future__ import unicode_literals

import hashlib
import imp
import importlib
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -52,6 +55,9 @@
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
from airflow.utils.timeout import timeout

log = LoggingMixin().log


class SimpleDag(BaseDag):
Expand Down Expand Up @@ -1316,3 +1322,81 @@ def end(self):
self.log.info("Killing child PID: {}".format(child.pid))
child.kill()
child.wait()


def process_dag_file(filepath, safe_mode=True):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I'm not a fan of separate functions, can't we merge this into a class as a static method? For example the DagFileProcessorAgent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For functionality this does not matter. Adding a class that will never be initialised feels wrong for me. If this is part of a class this would make sense but this is an independent method that can be used in multiple location and not bound to a single class.

Still if this is a blocking issue I can move to an existing class or make a new class for this.

"""
Given a path to a python module or zip file, this method imports
the module and look for dag objects within it.
"""
found_dags = []

if filepath is None or not os.path.isfile(filepath):
raise FileNotFoundError

mods = []
is_zipfile = zipfile.is_zipfile(filepath)
if not is_zipfile:
if safe_mode and os.path.isfile(filepath):
with open(filepath, 'rb') as f:
content = f.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
# Don't want to spam user with skip messages
log.debug(
"File %s assumed to contain no DAGs. Skipping.",
filepath)
return found_dags

log.debug("Importing %s", filepath)
org_mod_name, _ = os.path.splitext(os.path.split(filepath)[-1])
mod_name = ('unusual_prefix_' +
hashlib.sha1(filepath.encode('utf-8')).hexdigest() +
'_' + org_mod_name)

if mod_name in sys.modules:
del sys.modules[mod_name]

with timeout(conf.getint('core', "DAGBAG_IMPORT_TIMEOUT")):
try:
m = imp.load_source(mod_name, filepath)
mods.append(m)
except Exception as e:
log.exception("Failed to import: %s", filepath)
raise AirflowException(e)

else:
zip_file = zipfile.ZipFile(filepath)
for mod in zip_file.infolist():
head, _ = os.path.split(mod.filename)
mod_name, ext = os.path.splitext(mod.filename)
if not head and (ext == '.py' or ext == '.pyc'):
if mod_name == '__init__':
log.warning("Found __init__.%s at root of %s", ext, filepath)
if safe_mode:
with zip_file.open(mod.filename) as zf:
log.debug("Reading %s from %s", mod.filename, filepath)
content = zf.read()
if not all([s in content for s in (b'DAG', b'airflow')]):
log.debug(
"File %s assumed to contain no DAGs. Skipping.",
filepath)

if mod_name in sys.modules:
del sys.modules[mod_name]

try:
sys.path.insert(0, filepath)
m = importlib.import_module(mod_name)
mods.append(m)
except Exception as e:
log.exception("Failed to import: %s", filepath)
raise AirflowException(e)

for m in mods:
for dag in list(m.__dict__.values()):
if isinstance(dag, airflow.DAG):
if not dag.is_subdag:
found_dags.append(dag)
for dag in found_dags:
dag.full_filepath = filepath
return found_dags
2 changes: 1 addition & 1 deletion tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@

from .api import *
from .core import *
from .models import *
from .test_models import *
Binary file added tests/dags/corrupt.zip
Binary file not shown.
25 changes: 25 additions & 0 deletions tests/dags/corrupt_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

from airflow.models import DAG

dag = DAG(dag_id='test_corrupt')

raise Exception
19 changes: 19 additions & 0 deletions tests/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
Loading