Skip to content

Commit

Permalink
Merge branch 'gcs_system_tests' of https://github.com/lwyszomi/airflow
Browse files Browse the repository at this point in the history
…into gcs_system_tests

Change-Id: Ie7a572289872d10c649d8697a5819ddcac9d9e99
  • Loading branch information
Bartlomiej Hirsz committed Apr 14, 2022
2 parents 3bffe04 + a1a685c commit 6492a1e
Show file tree
Hide file tree
Showing 35 changed files with 515 additions and 228 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# Licensed to the Apache Software Foundation (ASF) under one
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
Expand Down Expand Up @@ -675,6 +674,9 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
uses: actions/setup-python@v2
with:
python-version: ${{ needs.build-info.outputs.defaultPythonVersion }}
cache: 'pip'
cache-dependency-path: ./dev/breeze/setup*
- run: python -m pip install --editable ./dev/breeze/
- name: >
Fetch incoming commit ${{ github.sha }} with its parent
uses: actions/checkout@v2
Expand Down
26 changes: 13 additions & 13 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,21 +231,9 @@ To support operator-mapping (AIP 42), the `deps` attribute on operator class mus

If you set the `dag_default_view` config option or the `default_view` argument to `DAG()` to `tree` you will need to update your deployment. The old name will continue to work but will issue warnings.

## Airflow 2.2.5

No breaking changes

## Airflow 2.2.4

### Smart sensors deprecated

Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferrable Operators, added in Airflow 2.2.0.

See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.2.4/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.

### Database configuration moved to new section

The following configurations have been moved from `[core]` to the new `[database]` section. However when reading new option, the old option will be checked to see if it exists. If it does a DeprecationWarning will be issued and the old option will be used instead.
The following configurations have been moved from `[core]` to the new `[database]` section. However when reading the new option, the old option will be checked to see if it exists. If it does a DeprecationWarning will be issued and the old option will be used instead.

- sql_alchemy_conn
- sql_engine_encoding
Expand All @@ -260,6 +248,18 @@ The following configurations have been moved from `[core]` to the new `[database
- load_default_connections
- max_db_retries

## Airflow 2.2.5

No breaking changes

## Airflow 2.2.4

### Smart sensors deprecated

Smart sensors, an "early access" feature added in Airflow 2, are now deprecated and will be removed in Airflow 2.4.0. They have been superseded by Deferrable Operators, added in Airflow 2.2.0.

See [Migrating to Deferrable Operators](https://airflow.apache.org/docs/apache-airflow/2.2.4/concepts/smart-sensors.html#migrating-to-deferrable-operators) for details on how to migrate.

## Airflow 2.2.3

No breaking changes.
Expand Down
26 changes: 13 additions & 13 deletions airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,19 @@ class AirflowConfigParser(ConfigParser):
('core', 'max_active_tasks_per_dag'): ('core', 'dag_concurrency', '2.2.0'),
('logging', 'worker_log_server_port'): ('celery', 'worker_log_server_port', '2.2.0'),
('api', 'access_control_allow_origins'): ('api', 'access_control_allow_origin', '2.2.0'),
('api', 'auth_backends'): ('api', 'auth_backend', '2.3'),
('database', 'sql_alchemy_conn'): ('core', 'sql_alchemy_conn', '2.3'),
('database', 'sql_engine_encoding'): ('core', 'sql_engine_encoding', '2.3'),
('database', 'sql_engine_collation_for_ids'): ('core', 'sql_engine_collation_for_ids', '2.3'),
('database', 'sql_alchemy_pool_enabled'): ('core', 'sql_alchemy_pool_enabled', '2.3'),
('database', 'sql_alchemy_pool_size'): ('core', 'sql_alchemy_pool_size', '2.3'),
('database', 'sql_alchemy_max_overflow'): ('core', 'sql_alchemy_max_overflow', '2.3'),
('database', 'sql_alchemy_pool_recycle'): ('core', 'sql_alchemy_pool_recycle', '2.3'),
('database', 'sql_alchemy_pool_pre_ping'): ('core', 'sql_alchemy_pool_pre_ping', '2.3'),
('database', 'sql_alchemy_schema'): ('core', 'sql_alchemy_schema', '2.3'),
('database', 'sql_alchemy_connect_args'): ('core', 'sql_alchemy_connect_args', '2.3'),
('database', 'load_default_connections'): ('core', 'load_default_connections', '2.3'),
('database', 'max_db_retries'): ('core', 'max_db_retries', '2.3'),
('api', 'auth_backends'): ('api', 'auth_backend', '2.3.0'),
('database', 'sql_alchemy_conn'): ('core', 'sql_alchemy_conn', '2.3.0'),
('database', 'sql_engine_encoding'): ('core', 'sql_engine_encoding', '2.3.0'),
('database', 'sql_engine_collation_for_ids'): ('core', 'sql_engine_collation_for_ids', '2.3.0'),
('database', 'sql_alchemy_pool_enabled'): ('core', 'sql_alchemy_pool_enabled', '2.3.0'),
('database', 'sql_alchemy_pool_size'): ('core', 'sql_alchemy_pool_size', '2.3.0'),
('database', 'sql_alchemy_max_overflow'): ('core', 'sql_alchemy_max_overflow', '2.3.0'),
('database', 'sql_alchemy_pool_recycle'): ('core', 'sql_alchemy_pool_recycle', '2.3.0'),
('database', 'sql_alchemy_pool_pre_ping'): ('core', 'sql_alchemy_pool_pre_ping', '2.3.0'),
('database', 'sql_alchemy_schema'): ('core', 'sql_alchemy_schema', '2.3.0'),
('database', 'sql_alchemy_connect_args'): ('core', 'sql_alchemy_connect_args', '2.3.0'),
('database', 'load_default_connections'): ('core', 'load_default_connections', '2.3.0'),
('database', 'max_db_retries'): ('core', 'max_db_retries', '2.3.0'),
}

# A mapping of old default values that we want to change and warn the user
Expand Down
27 changes: 16 additions & 11 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class _DagRunTaskStatus:

def __init__(
self,
dag,
dag: DAG,
start_date=None,
end_date=None,
mark_success=False,
Expand Down Expand Up @@ -228,7 +228,7 @@ def _update_counters(self, ti_status, session=None):

def _manage_executor_state(
self, running, session
) -> Iterator[Tuple["MappedOperator", str, Sequence[TaskInstance]]]:
) -> Iterator[Tuple["MappedOperator", str, Sequence[TaskInstance], int]]:
"""
Checks if the executor agrees with the state of task instances
that are running.
Expand All @@ -238,8 +238,6 @@ def _manage_executor_state(
:param running: dict of key, task to verify
:return: An iterable of expanded TaskInstance per MappedTask
"""
from airflow.models.mappedoperator import MappedOperator

executor = self.executor

# TODO: query all instead of refresh from db
Expand All @@ -266,9 +264,11 @@ def _manage_executor_state(
ti.handle_failure_with_callback(error=msg)
continue
if ti.state not in self.STATES_COUNT_AS_RUNNING:
for node in ti.task.mapped_dependants():
assert isinstance(node, MappedOperator)
yield node, ti.run_id, node.expand_mapped_task(ti.run_id, session=session)
# Don't use ti.task; if this task is mapped, that attribute
# would hold the unmapped task. We need to original task here.
for node in self.dag.get_task(ti.task_id, include_subdags=True).mapped_dependants():
new_tis, num_mapped_tis = node.expand_mapped_task(ti.run_id, session=session)
yield node, ti.run_id, new_tis, num_mapped_tis

@provide_session
def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = None):
Expand Down Expand Up @@ -609,18 +609,23 @@ def _per_task_process(key, ti: TaskInstance, session=None):
ti_status.to_run.clear()

# check executor state -- and expand any mapped TIs
for node, run_id, mapped_tis in self._manage_executor_state(ti_status.running, session):
for node, run_id, new_mapped_tis, max_map_index in self._manage_executor_state(
ti_status.running, session
):

def to_keep(key: TaskInstanceKey) -> bool:
if key.dag_id != node.dag_id or key.task_id != node.task_id or key.run_id != run_id:
# For another Dag/Task/Run -- don't remove
return True
return False
return 0 <= key.map_index <= max_map_index

# remove the old unmapped TIs for node -- they have been replaced with the mapped TIs
ti_status.to_run = {key: ti for (key, ti) in ti_status.to_run.items() if to_keep(key)}

ti_status.to_run.update({ti.key: ti for ti in mapped_tis})
ti_status.to_run.update({ti.key: ti for ti in new_mapped_tis})

for new_ti in new_mapped_tis:
new_ti.set_state(TaskInstanceState.SCHEDULED, session=session)

# update the task counters
self._update_counters(ti_status=ti_status, session=session)
Expand Down Expand Up @@ -702,7 +707,7 @@ def tabulate_ti_keys_set(ti_keys: Iterable[TaskInstanceKey]) -> str:

return err

def _get_dag_with_subdags(self):
def _get_dag_with_subdags(self) -> List[DAG]:
return [self.dag] + self.dag.subdags

@provide_session
Expand Down
Loading

0 comments on commit 6492a1e

Please sign in to comment.