Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve compatibility with mssql #9973

Merged
merged 36 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3cdfbc
add breeze config for mssql, fix db init on mssql
aneesh-joseph Dec 15, 2020
f2b36b3
fix more incompatibilities
aneesh-joseph Mar 22, 2021
66f69a9
fix failing mssql tests
aneesh-joseph Mar 22, 2021
4f0775d
Update airflow/models/dag.py
aneesh-joseph Mar 22, 2021
8054f09
use special query for MSSQL only
aneesh-joseph Mar 22, 2021
c639eb8
fix count tests
aneesh-joseph Mar 22, 2021
6702555
fix style checks
aneesh-joseph Mar 22, 2021
eee6c4b
Update airflow/models/dagcode.py
aneesh-joseph Mar 22, 2021
9825282
make xcom fields non-nullable
aneesh-joseph Mar 22, 2021
d8901da
exclude MSSQL default tables
aneesh-joseph Mar 23, 2021
cf76124
make mssql tables in sync with SQLA models
aneesh-joseph Mar 23, 2021
7f2ccd0
fix www tests
aneesh-joseph Mar 23, 2021
3d3cb00
relax constraints on dag_run to fix tests
aneesh-joseph Mar 23, 2021
44d207d
base the migration on top of latest master
aneesh-joseph Apr 7, 2021
d1679a4
incorporate review suggestions
aneesh-joseph Apr 11, 2021
50ac279
fix pylint errors
aneesh-joseph Apr 13, 2021
f97e113
move migration on top of latest master
aneesh-joseph Apr 13, 2021
e2bc6a1
fix pylint errors
aneesh-joseph Apr 13, 2021
78b9666
add mssql to CI
aneesh-joseph Apr 14, 2021
9fd1bee
fix port conflict
aneesh-joseph Apr 14, 2021
b941b20
simplify db based count check conditionals
aneesh-joseph Apr 15, 2021
88a4bf9
fix pylint errors
aneesh-joseph May 7, 2021
ac3cefc
fix rebase and mssql tests
aneesh-joseph May 8, 2021
b16933d
fix sqlite test
aneesh-joseph May 9, 2021
a9e1a04
combine mssql specific migrations
aneesh-joseph May 9, 2021
823cd1b
revert offset changes
aneesh-joseph May 9, 2021
4af7bbe
fix smart sensor for mssql
aneesh-joseph May 9, 2021
39223e4
fix mssql CI trigger
aneesh-joseph May 9, 2021
23c3566
fixup! fix smart sensor for mssql
potiuk May 9, 2021
f00e4d6
add experimental MSSQL support details into docs
aneesh-joseph May 10, 2021
6b90ccb
incorporate review comments
aneesh-joseph May 10, 2021
ff76c41
incorporate review comments
aneesh-joseph May 10, 2021
cfb2194
Apply suggestions from code review
kaxil May 10, 2021
51b33de
Apply suggestions from code review
kaxil May 10, 2021
cfb162e
fixup method names
aneesh-joseph May 11, 2021
f12e007
add back poituk's fix which I removed in error with a force push
aneesh-joseph May 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ jobs:
postgresVersions: ${{ steps.selective-checks.outputs.postgres-versions }}
defaultPostgresVersion: ${{ steps.selective-checks.outputs.default-postgres-version }}
mysqlVersions: ${{ steps.selective-checks.outputs.mysql-versions }}
mssqlVersions: ${{ steps.selective-checks.outputs.mssql-versions }}
defaultMySQLVersion: ${{ steps.selective-checks.outputs.default-mysql-version }}
helmVersions: ${{ steps.selective-checks.outputs.helm-versions }}
defaultHelmVersion: ${{ steps.selective-checks.outputs.default-helm-version }}
Expand All @@ -153,6 +154,7 @@ jobs:
testTypes: ${{ steps.selective-checks.outputs.test-types }}
postgresExclude: ${{ steps.selective-checks.outputs.postgres-exclude }}
mysqlExclude: ${{ steps.selective-checks.outputs.mysql-exclude }}
mssqlExclude: ${{ steps.selective-checks.outputs.mssql-exclude }}
sqliteExclude: ${{ steps.selective-checks.outputs.sqlite-exclude }}
run-tests: ${{ steps.selective-checks.outputs.run-tests }}
run-ui-tests: ${{ steps.selective-checks.outputs.run-ui-tests }}
Expand Down Expand Up @@ -762,6 +764,62 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
path: "./files/coverage*.xml"
retention-days: 7

tests-mssql:
timeout-minutes: 130
name: >
MSSQL${{matrix.mssql-version}}, Py${{matrix.python-version}}: ${{needs.build-info.outputs.testTypes}}
runs-on: ${{ fromJson(needs.build-info.outputs.runsOn) }}
needs: [build-info, ci-images]
strategy:
matrix:
python-version: ${{ fromJson(needs.build-info.outputs.pythonVersions) }}
mssql-version: ${{ fromJson(needs.build-info.outputs.mssqlVersions) }}
exclude: ${{ fromJson(needs.build-info.outputs.mssqlExclude) }}
fail-fast: false
env:
RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
BACKEND: mssql
PYTHON_MAJOR_MINOR_VERSION: ${{ matrix.python-version }}
MSSQL_VERSION: ${{ matrix.mssql-version }}
TEST_TYPES: "${{needs.build-info.outputs.testTypes}}"
GITHUB_REGISTRY: ${{ needs.ci-images.outputs.githubRegistry }}
if: needs.build-info.outputs.run-tests == 'true'
steps:
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@v2
with:
persist-credentials: false
- name: "Setup python"
uses: actions/setup-python@v2
with:
python-version: ${{ env.PYTHON_MAJOR_MINOR_VERSION }}
- name: "Free space"
run: ./scripts/ci/tools/ci_free_space_on_ci.sh
- name: "Prepare CI image ${{env.PYTHON_MAJOR_MINOR_VERSION}}:${{ env.GITHUB_REGISTRY_PULL_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_prepare_ci_image_on_ci.sh
- name: "Tests: ${{needs.build-info.outputs.testTypes}}"
run: ./scripts/ci/testing/ci_run_airflow_testing.sh
- name: "Upload airflow logs"
uses: actions/upload-artifact@v2
if: failure()
with:
name: airflow-logs-${{matrix.python-version}}-${{matrix.mssql-version}}
path: "./files/airflow_logs*"
retention-days: 7
- name: "Upload container logs"
uses: actions/upload-artifact@v2
if: failure()
with:
name: container-logs-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
path: "./files/container_logs*"
retention-days: 7
- name: "Upload artifact for coverage"
uses: actions/upload-artifact@v2
with:
name: coverage-mssql-${{matrix.python-version}}-${{matrix.mssql-version}}
path: "./files/coverage*.xml"
retention-days: 7

tests-sqlite:
timeout-minutes: 130
name: >
Expand Down Expand Up @@ -898,6 +956,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-postgres
- tests-sqlite
- tests-mysql
- tests-mssql
- tests-quarantined
env:
RUNS_ON: ${{ fromJson(needs.build-info.outputs.runsOn) }}
Expand Down Expand Up @@ -1046,6 +1105,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
- tests-mssql
- tests-kubernetes
- prod-images
- docs
Expand Down Expand Up @@ -1107,6 +1167,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
- tests-mssql
- tests-kubernetes
- ci-images
- docs
Expand Down Expand Up @@ -1151,6 +1212,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- static-checks-pylint
- tests-sqlite
- tests-mysql
- tests-mssql
- tests-postgres
- tests-kubernetes
env:
Expand Down Expand Up @@ -1221,6 +1283,7 @@ ${{ hashFiles('.pre-commit-config.yaml') }}"
- tests-sqlite
- tests-postgres
- tests-mysql
- tests-mssql
- tests-kubernetes
- constraints
- prepare-test-provider-packages-wheel
Expand Down
4 changes: 2 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1883,7 +1883,7 @@ This is the current syntax for `./breeze <./breeze>`_:
Backend to use for tests - it determines which database is used.
One of:

sqlite mysql postgres
sqlite mysql postgres mssql

Default: sqlite

Expand Down Expand Up @@ -2349,7 +2349,7 @@ This is the current syntax for `./breeze <./breeze>`_:
Backend to use for tests - it determines which database is used.
One of:

sqlite mysql postgres
sqlite mysql postgres mssql

Default: sqlite

Expand Down
11 changes: 11 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ RUN mkdir -pv /usr/share/man/man1 \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add - \
&& curl https://packages.microsoft.com/config/debian/9/prod.list > /etc/apt/sources.list.d/mssql-release.list \
&& apt-get update -yqq \
&& apt-get upgrade -yqq \
&& ACCEPT_EULA=Y apt-get -yqq install -y --no-install-recommends \
gcc \
unixodbc-dev \
g++ \
msodbcsql17 \
mssql-tools \
&& rm -rf /var/lib/apt/lists/* \
&& curl https://download.docker.com/linux/static/stable/x86_64/docker-${DOCKER_CLI_VERSION}.tgz \
| tar -C /usr/bin --strip-components=1 -xvzf - docker/docker

Expand Down
15 changes: 8 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ We **highly** recommend upgrading to the latest Airflow major release at the ear

Apache Airflow is tested with:

| | Master version (dev) | Stable version (2.0.2) | Previous version (1.10.15) |
| ------------ | ------------------------- | ------------------------ | ------------------------- |
| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8 | 2.7, 3.5, 3.6, 3.7, 3.8 |
| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18 | 1.18, 1.17, 1.16 |
| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 |
| MySQL | 5.7, 8 | 5.7, 8 | 5.6, 5.7 |
| SQLite | 3.15.0+ | 3.15.0+ | 3.15.0+ |
| | Master version (dev) | Stable version (2.0.2) | Previous version (1.10.15) |
| -------------------- | ------------------------- | ------------------------ | ------------------------- |
| Python | 3.6, 3.7, 3.8 | 3.6, 3.7, 3.8 | 2.7, 3.5, 3.6, 3.7, 3.8 |
| Kubernetes | 1.20, 1.19, 1.18 | 1.20, 1.19, 1.18 | 1.18, 1.17, 1.16 |
| PostgreSQL | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 | 9.6, 10, 11, 12, 13 |
| MySQL | 5.7, 8 | 5.7, 8 | 5.6, 5.7 |
| SQLite | 3.15.0+ | 3.15.0+ | 3.15.0+ |
| MSSQL(Experimental) | 2017,2019 | | |

**Note:** MySQL 5.x versions are unable to or have limitations with
running multiple schedulers -- please see the [Scheduler docs](https://airflow.apache.org/docs/apache-airflow/stable/scheduler.html).
Expand Down
13 changes: 6 additions & 7 deletions airflow/api_connexion/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from typing import Callable, Dict, TypeVar, cast

from pendulum.parsing import ParserError
from sqlalchemy import asc, desc
from sqlalchemy import text

from airflow.api_connexion.exceptions import BadRequest
from airflow.configuration import conf
Expand Down Expand Up @@ -97,11 +97,10 @@ def apply_sorting(query, order_by, to_replace=None, allowed_attrs=None):
detail=f"Ordering with '{lstriped_orderby}' is disallowed or "
f"the attribute does not exist on the model"
)
if to_replace:
lstriped_orderby = to_replace.get(lstriped_orderby, lstriped_orderby)
if order_by[0] == "-":
func = desc
order_by = lstriped_orderby
order_by = f"{lstriped_orderby} desc"
else:
func = asc
if to_replace:
order_by = to_replace.get(order_by, order_by)
return query.order_by(func(order_by))
order_by = f"{lstriped_orderby} asc"
return query.order_by(text(order_by))
45 changes: 28 additions & 17 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import load_only, selectinload
from sqlalchemy.orm.session import Session, make_transient
from sqlalchemy.sql import expression

from airflow import models, settings
from airflow.configuration import conf
Expand Down Expand Up @@ -1067,15 +1068,15 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session =

task_instance_str = "\n\t".join(repr(x) for x in executable_tis)
self.log.info("Setting the following tasks to queued state:\n\t%s", task_instance_str)

# set TIs to queued state
filter_for_tis = TI.filter_for_tis(executable_tis)
session.query(TI).filter(filter_for_tis).update(
# TODO[ha]: should we use func.now()? How does that work with DB timezone on mysql when it's not
# UTC?
{TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), TI.queued_by_job_id: self.id},
synchronize_session=False,
)
if len(executable_tis) > 0:
# set TIs to queued state
filter_for_tis = TI.filter_for_tis(executable_tis)
session.query(TI).filter(filter_for_tis).update(
# TODO[ha]: should we use func.now()? How does that work with DB timezone
# on mysql when it's not UTC?
{TI.state: State.QUEUED, TI.queued_dttm: timezone.utcnow(), TI.queued_by_job_id: self.id},
synchronize_session=False,
)

for ti in executable_tis:
make_transient(ti)
Expand Down Expand Up @@ -1580,14 +1581,24 @@ def _create_dag_runs(self, dag_models: Iterable[DagModel], session: Session) ->
# as DagModel.dag_id and DagModel.next_dagrun
# This list is used to verify if the DagRun already exist so that we don't attempt to create
# duplicate dag runs
active_dagruns = (
session.query(DagRun.dag_id, DagRun.execution_date)
.filter(
tuple_(DagRun.dag_id, DagRun.execution_date).in_(
[(dm.dag_id, dm.next_dagrun) for dm in dag_models]
)

if session.bind.dialect.name == 'mssql':
active_dagruns_filter = or_(
*[
and_(
DagRun.dag_id == dm.dag_id,
DagRun.execution_date == dm.next_dagrun,
)
for dm in dag_models
]
)
.all()
else:
active_dagruns_filter = tuple_(DagRun.dag_id, DagRun.execution_date).in_(
[(dm.dag_id, dm.next_dagrun) for dm in dag_models]
)

active_dagruns = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use a join here or CTE rather than making multiple queries?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Feel free to give it a shot if you like :)

session.query(DagRun.dag_id, DagRun.execution_date).filter(active_dagruns_filter).all()
)

for dag_model in dag_models:
Expand Down Expand Up @@ -1644,7 +1655,7 @@ def _update_dag_next_dagruns(self, dag_models: Iterable[DagModel], session: Sess
.filter(
DagRun.dag_id.in_([o.dag_id for o in dag_models]),
DagRun.state == State.RUNNING, # pylint: disable=comparison-with-callable
DagRun.external_trigger.is_(False),
DagRun.external_trigger == expression.false(),
)
.group_by(DagRun.dag_id)
.all()
Expand Down
Loading