Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AIRFLOW-5088][AIP-24] Persisting serialized DAG in DB for webserver scalability #5743

Merged
merged 64 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
67515c5
[AIRFLOW-5088] persisting serialized DAG in DB for webserver scalability
Aug 8, 2019
07a7481
clean flake8 and pylint warnings
Aug 8, 2019
bf11bba
use create_session instead of provide_session
Aug 8, 2019
e58d100
make dagbag.get_dag/dag.get_bag called by webserver/scheduler control…
Aug 8, 2019
1db7933
fix a degradation that inspect.getsource does not support functools.p…
Aug 8, 2019
f104fc3
add unit tests of SerializedDagModel
Aug 8, 2019
4559939
fall back to loading from files when serialized DAGs are unavailable
Aug 9, 2019
bb65a22
Fix param names & remove unused params
kaxil Aug 9, 2019
2efd089
Turning Serialisation Off by default
kaxil Aug 9, 2019
0eb6898
remove hardcoded example DAGs in serialization unit tests
Aug 9, 2019
2124fe8
Add logs
kaxil Aug 9, 2019
ab443c4
move enum module to enums to avoid confusion
Aug 9, 2019
b8cbd55
Add config to airflow.cfg file
kaxil Aug 9, 2019
4d46b8d
Add timezone to migration script
kaxil Aug 9, 2019
480dccc
fix operator displying as SerializedBaseOperator on UI
Aug 10, 2019
ffc1441
add subdags into dagbag when loading from database
Aug 10, 2019
55825ee
clean mypy and pylint
Aug 10, 2019
79c5f42
Replace create_session with provide_session
kaxil Aug 12, 2019
cb77194
Move the date check into SQL
kaxil Aug 12, 2019
6dd110f
Fix logging error
kaxil Aug 12, 2019
35f62b5
Sync Database schema and SQLAlchemy model
kaxil Aug 12, 2019
ef65c7b
Add test to validate if task.subdag is None if operator is not SubDag…
kaxil Aug 14, 2019
cb688d9
Use session whereever available
kaxil Aug 14, 2019
296c217
Fix operator displying as SerializedBaseOperator on UI
kaxil Aug 14, 2019
09f6f5c
Add warning when dag_id does not match
kaxil Aug 14, 2019
2e00f9c
Validate Json String before writing to DB
kaxil Aug 14, 2019
a1102a9
Store JSON schema as static, package-data JSON file
ashb Aug 15, 2019
3423182
Remove unused fields from DAGs & move tests file
kaxil Aug 19, 2019
bfb3f32
Update Serialisation tests after refactor
kaxil Aug 19, 2019
d2436ad
Update tree view to use dag from dagbag instead of creating new dagbag
kaxil Aug 20, 2019
927f610
Make Dag Schema Stricter: No Additional Properties allowed
kaxil Aug 21, 2019
825abe0
Rename the dagcached config& dagcached_min_update_interval setting
kaxil Aug 21, 2019
3228d04
Remove inlets & outlets from SerializedBaseOperator
kaxil Aug 21, 2019
c2c0ddb
improve fileloc hashing in DAG persistence
Aug 21, 2019
6e8da2c
Add documentation
kaxil Aug 23, 2019
8b84343
Allow timedelta(seconds=0) in Serialized DAGs
kaxil Aug 27, 2019
94c69c7
Fix issue with different class for Pendulum Timezones
kaxil Sep 3, 2019
01e5f73
Update timezone class
kaxil Sep 4, 2019
88ce053
Parse date from timestamp instead of string
kaxil Sep 4, 2019
b5ee858
Do not serialize dates in tasks if they have matching date in DAG
kaxil Sep 5, 2019
4932254
Change type of data column to JSON & Add metric for dagbag size
kaxil Sep 6, 2019
ebe4ec7
Code Cleanup for JSON columns
kaxil Sep 9, 2019
a2b27f0
Test code to allow old mysql & sqlite versions
kaxil Sep 9, 2019
89a03a6
Test to see if JSON column is available
kaxil Sep 10, 2019
0754f61
Add Debug info
kaxil Sep 12, 2019
da0d59a
Trial reducing size of SerializedDAGs
ashb Sep 26, 2019
ae03cf0
Add specific test for schedule_interval serialization
ashb Sep 27, 2019
69b242c
Support dateutil.relativedelta in SerializedDAGs
ashb Sep 27, 2019
4bf9eb5
Cleanup
kaxil Oct 1, 2019
a498a62
Fix imports for iSort
kaxil Oct 2, 2019
b127f37
Delete non-existent Dags
kaxil Oct 3, 2019
525cccd
Remove comment
kaxil Oct 3, 2019
21fafb6
fix bugs that date/time/IntEnum are not supported in serialization.
Oct 4, 2019
f4b3e7d
Deactivate DAGs instead of deleting if their DAG file is deleted
kaxil Oct 7, 2019
facbef7
Change maxDiff to None
kaxil Oct 7, 2019
83f2007
Fix CI
kaxil Oct 7, 2019
69f7854
Just-in-time loading of DagBag in webserver
kaxil Oct 8, 2019
7c96ed8
Add default args to decorated_fields
kaxil Oct 8, 2019
9cb6e28
Add support for OperatorLinks
kaxil Oct 10, 2019
e840616
Cleanup
kaxil Oct 14, 2019
5ccd878
Move serialization directory out of dags folder
kaxil Oct 15, 2019
2b23a65
Fix docs building
Oct 18, 2019
be21a0f
Move the debug line before merging session
kaxil Oct 22, 2019
162da54
Move Dag Serialization doc from howto to root folder
kaxil Oct 23, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion airflow/api/common/experimental/delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

from airflow import models
from airflow.exceptions import DagNotFound
from airflow.models import DagModel, TaskFail
from airflow.models import DagModel, SerializedDagModel, TaskFail
from airflow.settings import STORE_SERIALIZED_DAGS
from airflow.utils.db import provide_session
from airflow.utils.log.logging_mixin import LoggingMixin


@provide_session
Expand All @@ -36,10 +38,17 @@ def delete_dag(dag_id: str, keep_records_in_log: bool = True, session=None) -> i
:param session: session used
:return count of deleted dags
"""
logger = LoggingMixin()
logger.log.info("Deleting DAG: %s", dag_id)
dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).first()
if dag is None:
raise DagNotFound("Dag id {} not found".format(dag_id))

# Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
# There may be a lag, so explicitly removes serialized DAG here.
if STORE_SERIALIZED_DAGS and SerializedDagModel.has_dag(dag_id=dag_id, session=session):
SerializedDagModel.remove_dag(dag_id=dag_id, session=session)

count = 0

# noinspection PyUnresolvedReferences,PyProtectedMember
Expand Down
7 changes: 7 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ dag_discovery_safe_mode = True
# The number of retries each task is going to have by default. Can be overridden at dag or task level.
default_task_retries = 0

# Whether to serialises DAGs and persist them in DB.
# If set to True, Webserver reads from DB instead of parsing DAG files
# More details: https://airflow.apache.org/howto/enable-dag-serialization.html
store_serialized_dags = False

# Updating serialized DAG can not be faster than a minimum interval to reduce database write rate.
min_serialized_dag_update_interval = 30

[cli]
# In what way should the cli access the API. The LocalClient will use the
Expand Down
140 changes: 0 additions & 140 deletions airflow/dag/serialization/json_schema.py

This file was deleted.

Loading