Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
5cae2a6
Officially support running more than one scheduler concurrently.
ashb Sep 15, 2020
06c07be
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
38b049c
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
f46abde
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
ae75728
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
6ffb762
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
d89827b
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
254aff1
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
8122862
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
2d4fe38
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
bfe9cad
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
4ce98c7
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
ef766b1
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 16, 2020
87fed6c
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 16, 2020
98402c5
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 17, 2020
9a891e8
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 18, 2020
807819d
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 21, 2020
2d5b067
Move callbacks from Scheduler loop to DagProcessorProcess
kaxil Sep 18, 2020
58b8514
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 23, 2020
44c60e5
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 23, 2020
c73b078
fixup! Move callbacks from Scheduler loop to DagProcessorProcess
ashb Sep 24, 2020
c806d1f
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 24, 2020
46f848f
Don’t run verify_integrity if the Serialized DAG hasn’t changed
kaxil Sep 24, 2020
5a0a73f
Do not Execute tasks with DummyOperators
kaxil Sep 24, 2020
6e94088
fixup! Don’t run verify_integrity if the Serialized DAG hasn’t changed
kaxil Sep 25, 2020
48a9d5d
fixup! Do not Execute tasks with DummyOperators
kaxil Sep 25, 2020
c79c88e
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 25, 2020
d61496a
fixup! Do not Execute tasks with DummyOperators
kaxil Sep 25, 2020
291be0f
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 25, 2020
daad3f8
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
742d633
fixup! Do not Execute tasks with DummyOperators
ashb Sep 28, 2020
167e146
fixup! Don’t run verify_integrity if the Serialized DAG hasn’t changed
ashb Sep 28, 2020
764aa20
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
59dab94
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
d3364d1
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
71ad5af
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
b1c114c
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 28, 2020
5625737
fixup! Officially support running more than one scheduler concurrently.
kaxil Sep 28, 2020
9496651
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 29, 2020
aa4b03a
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 29, 2020
aec5b21
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 29, 2020
b1bdb28
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 29, 2020
0f08190
fixup! Officially support running more than one scheduler concurrently.
ashb Sep 29, 2020
6b979c4
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 1, 2020
6b214a2
Add escape hatch to disable newly added "SELECT ... FOR UPDATE" queries
ashb Oct 1, 2020
6ef9195
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 1, 2020
d5d6839
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 1, 2020
1981e61
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 1, 2020
5824505
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 1, 2020
35af8bf
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 2, 2020
fc91f65
fixup! Don’t run verify_integrity if the Serialized DAG hasn’t changed
ashb Oct 5, 2020
a6b5444
fixup! Move callbacks from Scheduler loop to DagProcessorProcess
kaxil Oct 5, 2020
20a7fb5
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 5, 2020
c783cde
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 5, 2020
81516d1
fixup! Add escape hatch to disable newly added "SELECT ... FOR UPDATE…
ashb Oct 5, 2020
d5b6ca8
fixup! Add escape hatch to disable newly added "SELECT ... FOR UPDATE…
ashb Oct 5, 2020
4f04929
fixup! Don’t run verify_integrity if the Serialized DAG hasn’t changed
ashb Oct 5, 2020
08cd501
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 6, 2020
e6fda59
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 6, 2020
403c39c
fixup! Officially support running more than one scheduler concurrently.
kaxil Oct 6, 2020
0a089d6
Retry dagbag.sync_to_db on OperatorError
kaxil Oct 7, 2020
54be29d
fixup! Move callbacks from Scheduler loop to DagProcessorProcess
kaxil Oct 7, 2020
b53921e
Change State for TIs without DagRun
kaxil Oct 8, 2020
5450100
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
324601b
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
688ac64
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
90692c8
Respect settings.use_job_schedule setting
kaxil Oct 9, 2020
636693a
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
eb82b9a
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
ee0a79a
fixup! Officially support running more than one scheduler concurrently.
ashb Oct 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ def _trigger_dag(
state=State.RUNNING,
conf=run_conf,
external_trigger=True,
dag_hash=dag_bag.dags_hash.get(dag_id, None),
)

triggers.append(trigger)
Expand Down
2 changes: 1 addition & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,7 @@ class GroupCommand(NamedTuple):
help="Start a scheduler instance",
func=lazy_load_command('airflow.cli.commands.scheduler_command.scheduler'),
args=(
ARG_DAG_ID_OPT, ARG_SUBDIR, ARG_NUM_RUNS, ARG_DO_PICKLE, ARG_PID, ARG_DAEMON, ARG_STDOUT,
ARG_SUBDIR, ARG_NUM_RUNS, ARG_DO_PICKLE, ARG_PID, ARG_DAEMON, ARG_STDOUT,
ARG_STDERR, ARG_LOG_FILE
),
),
Expand Down
1 change: 0 additions & 1 deletion airflow/cli/commands/scheduler_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def scheduler(args):
"""Starts Airflow Scheduler"""
print(settings.HEADER)
job = SchedulerJob(
dag_id=args.dag_id,
subdir=process_subdir(args.subdir),
num_runs=args.num_runs,
do_pickle=args.do_pickle)
Expand Down
44 changes: 44 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@
type: string
example: "connexion,sqlalchemy"
default: ""
- name: max_db_retries
description: |
Number of times the code should be retried in case of DB Operational Errors.
Not all transactions will be retried as it can cause undesired state.
Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
version_added: ~
type: int
example: ~
default: "3"
- name: secrets
description: ~
options:
Expand Down Expand Up @@ -1572,6 +1581,41 @@
type: string
example: ~
default: "512"
- name: use_row_level_locking
description: |
Should the scheduler issue `SELECT ... FOR UPDATE` in relevant queries.
If this is set to False then you should not run more than a single
scheduler at once
version_added: 2.0.0
type: boolean
example: ~
default: "True"
- name: max_dagruns_to_create_per_loop
description: |
This changes the number of dags that are locked by each scheduler when
creating dag runs. One possible reason for setting this lower is if you
have huge dags and are running multiple schedules, you won't want one
scheduler to do all the work.

Default: 10
example: ~
version_added: 2.0.0
type: string
default: ~
- name: max_dagruns_per_loop_to_schedule
description: |
How many DagRuns should a scheduler examine (and lock) when scheduling
and queuing tasks. Increasing this limit will allow more throughput for
smaller DAGs but will likely slow down throughput for larger (>500
tasks for example) DAGs. Setting this too high when using multiple
schedulers could also lead to one scheduler taking all the dag runs
leaving no work for the others.

Default: 20
example: ~
version_added: 2.0.0
type: string
default: ~
- name: statsd_on
description: |
Statsd (https://github.com/etsy/statsd) integration settings
Expand Down
28 changes: 28 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,11 @@ task_log_reader = task
# Example: extra_loggers = connexion,sqlalchemy
extra_loggers =

# Number of times the code should be retried in case of DB Operational Errors.
# Not all transactions will be retried as it can cause undesired state.
# Currently it is only used in ``DagFileProcessor.process_file`` to retry ``dagbag.sync_to_db``.
max_db_retries = 3

[secrets]
# Full class name of secrets backend to enable (will precede env vars and metastore in search path)
# Example: backend = airflow.providers.amazon.aws.secrets.systems_manager.SystemsManagerParameterStoreBackend
Expand Down Expand Up @@ -789,6 +794,29 @@ catchup_by_default = True
# Set this to 0 for no limit (not advised)
max_tis_per_query = 512

# Should the scheduler issue `SELECT ... FOR UPDATE` in relevant queries.
# If this is set to False then you should not run more than a single
# scheduler at once
use_row_level_locking = True

# This changes the number of dags that are locked by each scheduler when
# creating dag runs. One possible reason for setting this lower is if you
# have huge dags and are running multiple schedules, you won't want one
# scheduler to do all the work.
#
# Default: 10
# max_dagruns_to_create_per_loop =

# How many DagRuns should a scheduler examine (and lock) when scheduling
# and queuing tasks. Increasing this limit will allow more throughput for
# smaller DAGs but will likely slow down throughput for larger (>500
# tasks for example) DAGs. Setting this too high when using multiple
# schedulers could also lead to one scheduler taking all the dag runs
# leaving no work for the others.
#
# Default: 20
# max_dagruns_per_loop_to_schedule =

# Statsd (https://github.com/etsy/statsd) integration settings
statsd_on = False
statsd_host = localhost
Expand Down
33 changes: 22 additions & 11 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
"""
Base executor - this is the base class for all the implemented executors.
"""
import sys
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple

from airflow.configuration import conf
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import State
Expand All @@ -39,8 +40,8 @@
# Task that is queued. It contains all the information that is
# needed to run the task.
#
# Tuple of: command, priority, queue name, SimpleTaskInstance
QueuedTaskInstanceType = Tuple[CommandType, int, Optional[str], Union[SimpleTaskInstance, TaskInstance]]
# Tuple of: command, priority, queue name, TaskInstance
QueuedTaskInstanceType = Tuple[CommandType, int, Optional[str], TaskInstance]

# Event_buffer dict value type
# Tuple of: state, info
Expand Down Expand Up @@ -72,16 +73,16 @@ def start(self): # pragma: no cover
"""

def queue_command(self,
simple_task_instance: SimpleTaskInstance,
task_instance: TaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None):
"""Queues command to task"""
if simple_task_instance.key not in self.queued_tasks and simple_task_instance.key not in self.running:
if task_instance.key not in self.queued_tasks and task_instance.key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[simple_task_instance.key] = (command, priority, queue, simple_task_instance)
self.queued_tasks[task_instance.key] = (command, priority, queue, task_instance)
else:
self.log.error("could not queue task %s", simple_task_instance.key)
self.log.error("could not queue task %s", task_instance.key)

def queue_task_instance(
self,
Expand Down Expand Up @@ -112,7 +113,7 @@ def queue_task_instance(
pickle_id=pickle_id,
cfg_path=cfg_path)
self.queue_command(
SimpleTaskInstance(task_instance),
task_instance,
command_list_to_run,
priority=task_instance.task.priority_weight_total,
queue=task_instance.task.queue)
Expand Down Expand Up @@ -178,13 +179,13 @@ def trigger_tasks(self, open_slots: int) -> None:
sorted_queue = self.order_queued_tasks_by_priority()

for _ in range(min((open_slots, len(self.queued_tasks)))):
key, (command, _, _, simple_ti) = sorted_queue.pop(0)
key, (command, _, _, ti) = sorted_queue.pop(0)
self.queued_tasks.pop(key)
self.running.add(key)
self.execute_async(key=key,
command=command,
queue=None,
executor_config=simple_ti.executor_config)
executor_config=ti.executor_config)

def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
"""
Expand Down Expand Up @@ -282,6 +283,16 @@ def try_adopt_task_instances(self, tis: List[TaskInstance]) -> List[TaskInstance
# Subclasses can do better!
return tis

@property
def slots_available(self):
"""
Number of new tasks this executor instance can accept
"""
if self.parallelism:
return self.parallelism - len(self.running) - len(self.queued_tasks)
else:
return sys.maxsize

@staticmethod
def validate_command(command: List[str]) -> None:
"""Check if the command to execute is airflow command"""
Expand Down
22 changes: 12 additions & 10 deletions airflow/executors/celery_kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ def start(self) -> None:
self.celery_executor.start()
self.kubernetes_executor.start()

def queue_command(self,
simple_task_instance: SimpleTaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None):
def queue_command(
self,
task_instance: TaskInstance,
command: CommandType,
priority: int = 1,
queue: Optional[str] = None
):
"""Queues command via celery or kubernetes executor"""
executor = self._router(simple_task_instance)
self.log.debug("Using executor: %s for %s",
executor.__class__.__name__, simple_task_instance.key
)
executor.queue_command(simple_task_instance, command, priority, queue)
executor = self._router(task_instance)
self.log.debug(
"Using executor: %s for %s", executor.__class__.__name__, task_instance.key
)
executor.queue_command(task_instance, command, priority, queue)

def queue_task_instance(
self,
Expand Down
Loading