Skip to content

Commit

Permalink
[AIRFLOW-160] Parse DAG files through child processes
Browse files Browse the repository at this point in the history
Instead of parsing the DAG definition files in the same process as the
scheduler, this change parses the files in a child process. This helps
to isolate the scheduler from bad user code.
  • Loading branch information
plypaul committed Jun 23, 2016
1 parent 1d0d868 commit 084196a
Show file tree
Hide file tree
Showing 18 changed files with 2,126 additions and 313 deletions.
33 changes: 30 additions & 3 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from daemon.pidfile import TimeoutPIDLockFile
import signal
import sys
import threading
import traceback

import airflow
from airflow import jobs, settings
Expand All @@ -31,10 +33,28 @@
DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))


def sigint_handler(signal, frame):
def sigint_handler(sig, frame):
sys.exit(0)


def sigquit_handler(sig, frame):
"""Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
e.g. kill -s QUIT <PID> or CTRL+\
"""
print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
code = []
for thread_id, stack in sys._current_frames().items():
code.append("\n# Thread: {}({})"
.format(id_to_name.get(thread_id, ""), thread_id))
for filename, line_number, name, line in traceback.extract_stack(stack):
code.append('File: "{}", line {}, in {}'
.format((filename, line_number, name)))
if line:
code.append(" {}".format(line.strip()))
print("\n".join(code))


def setup_logging(filename):
root = logging.getLogger()
handler = logging.FileHandler(filename)
Expand Down Expand Up @@ -429,6 +449,7 @@ def scheduler(args):
job = jobs.SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
run_duration=args.run_duration,
num_runs=args.num_runs,
do_pickle=args.do_pickle)

Expand All @@ -452,6 +473,7 @@ def scheduler(args):
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
job.run()


Expand Down Expand Up @@ -776,6 +798,10 @@ class CLIFactory(object):
default=False),
# scheduler
'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
'run_duration': Arg(
("-r", "--run-duration"),
default=None, type=int,
help="Set number of seconds to execute before exiting"),
'num_runs': Arg(
("-n", "--num_runs"),
default=None, type=int,
Expand Down Expand Up @@ -903,8 +929,9 @@ class CLIFactory(object):
}, {
'func': scheduler,
'help': "Start a scheduler scheduler instance",
'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle',
'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
'log_file'),
}, {
'func': worker,
'help': "Start a Celery worker node",
Expand Down
5 changes: 5 additions & 0 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ def run_command(command):
'scheduler_heartbeat_sec': 60,
'authenticate': False,
'max_threads': 2,
'run_duration': 30 * 60,
'dag_dir_list_interval': 5 * 60,
'print_stats_interval': 30,
'min_file_process_interval': 180,
'child_process_log_directory': '/tmp/airflow/scheduler/logs'
},
'celery': {
'default_queue': 'default',
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def heartbeat(self):
# TODO(jlowin) without a way to know what Job ran which tasks,
# there is a danger that another Job started running a task
# that was also queued to this executor. This is the last chance
# to check if that hapened. The most probable way is that a
# to check if that happened. The most probable way is that a
# Scheduler tried to run a task that was originally queued by a
# Backfill. This fix reduces the probability of a collision but
# does NOT eliminate it.
Expand Down
Loading

0 comments on commit 084196a

Please sign in to comment.