Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…16692-show-cron-schedule-description-in-ui
  • Loading branch information
pateash committed Oct 15, 2021
2 parents 572b8fc + b814ab4 commit c9a5852
Show file tree
Hide file tree
Showing 74 changed files with 639 additions and 311 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,12 @@ jobs:
- name: "Build PROD images ${{ matrix.python-version }}:${{ env.GITHUB_REGISTRY_PUSH_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_prepare_prod_image_on_ci.sh
env:
# GITHUB_REGISTRY_PULL_IMAGE_TAG is overriden to latest in order to build PROD image using "latest"
# GITHUB_REGISTRY_PULL_IMAGE_TAG is overridden to latest in order to build PROD image using "latest"
GITHUB_REGISTRY_PULL_IMAGE_TAG: "latest"
- name: "Push PROD images ${{ matrix.python-version }}:${{ env.GITHUB_REGISTRY_PUSH_IMAGE_TAG }}"
run: ./scripts/ci/images/ci_push_production_images.sh
env:
# GITHUB_REGISTRY_PULL_IMAGE_TAG is overriden to latest in order to build PROD image using "latest"
# GITHUB_REGISTRY_PULL_IMAGE_TAG is overridden to latest in order to build PROD image using "latest"
GITHUB_REGISTRY_PULL_IMAGE_TAG: "latest"

cancel-on-ci-build:
Expand Down
12 changes: 12 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ repos:
exclude: |
(?x)
^airflow/_vendor/
- repo: https://github.com/codespell-project/codespell
rev: v2.1.0
hooks:
- id: codespell
name: Run codespell to check for common misspellings in files
entry: codespell
language: python
types: [text]
exclude: ^airflow/_vendor/|^CHANGELOG.txt|^airflow/www/static/css/material-icons.css
args:
- --ignore-words=docs/spelling_wordlist.txt
- --skip=docs/*/commits.rst,airflow/providers/*/*.rst,*.lock,INTHEWILD.md,*.min.js,docs/apache-airflow/pipeline_example.csv
- repo: local
hooks:
- id: lint-openapi
Expand Down
10 changes: 8 additions & 2 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ Docker in WSL 2
E.g. Run ``cd ~`` and create a development folder in your Linux distro home
and git pull the Airflow repo there.

- **WSL 2 Docker mount errors**:
Another reason to use Linux filesystem, is that sometimes - depending on the length of
your path, you might get strange errors when you try start ``Breeze``, such us
``caused: mount through procfd: not a directory: unknown:``. Therefore checking out
Airflow in Windows-mounted Filesystem is strongly discouraged.

- **WSL 2 Memory Usage** :
WSL 2 can consume a lot of memory under the process name "Vmmem". To reclaim the memory after
development you can:
Expand All @@ -125,7 +131,7 @@ Docker in WSL 2
* If no longer using WSL you can shut it down on the Windows Host
with the following command: ``wsl --shutdown``

- **Developing in WSL 2** :
- **Developing in WSL 2**:
You can use all the standard Linux command line utilities to develop on WSL 2.
Further VS Code supports developing in Windows but remotely executing in WSL.
If VS Code is installed on the Windows host system then in the WSL Linux Distro
Expand Down Expand Up @@ -2185,7 +2191,7 @@ This is the current syntax for `./breeze <./breeze>`_:
check-executables-have-shebangs check-extras-order check-hooks-apply
check-integrations check-merge-conflict check-xml daysago-import-check
debug-statements detect-private-key doctoc dont-use-safe-filter end-of-file-fixer
fix-encoding-pragma flake8 flynt forbid-tabs helm-lint identity
fix-encoding-pragma flake8 flynt codespell forbid-tabs helm-lint identity
incorrect-use-of-LoggingMixin insert-license isort json-schema language-matters
lint-dockerfile lint-openapi markdownlint mermaid mixed-line-ending mypy mypy-helm
no-providers-in-core-examples no-relative-imports pre-commit-descriptions
Expand Down
2 changes: 2 additions & 0 deletions STATIC_CODE_CHECKS.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ require Breeze Docker images to be installed locally.
------------------------------------ ---------------------------------------------------------------- ------------
``flynt`` Runs flynt
------------------------------------ ---------------------------------------------------------------- ------------
``codespell`` Checks for common misspellings in files.
------------------------------------ ---------------------------------------------------------------- ------------
``forbid-tabs`` Fails if tabs are used in the project
------------------------------------ ---------------------------------------------------------------- ------------
``helm-lint`` Verifies if helm lint passes for the chart
Expand Down
2 changes: 1 addition & 1 deletion UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ No breaking changes.
### `activate_dag_runs` argument of the function `clear_task_instances` is replaced with `dag_run_state`
To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead. (The previous paramater is still accepted, but is deprecated)
To achieve the previous default behaviour of `clear_task_instances` with `activate_dag_runs=True`, no change is needed. To achieve the previous behaviour of `activate_dag_runs=False`, pass `dag_run_state=False` instead. (The previous parameter is still accepted, but is deprecated)
### `dag.set_dag_runs_state` is deprecated
Expand Down
2 changes: 1 addition & 1 deletion airflow/_vendor/connexion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .apps import AbstractApp # NOQA
from .decorators.produces import NoContent # NOQA
from .exceptions import ProblemException # NOQA
# add operation for backwards compatability
# add operation for backwards compatibility
from .operations import compat
from .problem import problem # NOQA
from .resolver import Resolution, Resolver, RestyResolver # NOQA
Expand Down
4 changes: 2 additions & 2 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ def dag_backfill(args, dag=None):
if args.ignore_first_depends_on_past is False:
args.ignore_first_depends_on_past = True

dag = dag or get_dag(args.subdir, args.dag_id)

if not args.start_date and not args.end_date:
raise AirflowException("Provide a start_date and/or end_date")

dag = dag or get_dag(args.subdir, args.dag_id)

# If only one date is passed, using same as start and end
args.end_date = args.end_date or args.start_date
args.start_date = args.start_date or args.end_date
Expand Down
6 changes: 5 additions & 1 deletion airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,11 @@ def prepare_file_path_queue(self):
for file_path in self._file_paths:

if is_mtime_mode:
files_with_mtime[file_path] = os.path.getmtime(file_path)
try:
files_with_mtime[file_path] = os.path.getmtime(file_path)
except FileNotFoundError:
self.log.warning("Skipping processing of missing file: %s", file_path)
continue
file_modified_time = timezone.make_aware(datetime.fromtimestamp(files_with_mtime[file_path]))
else:
file_paths.append(file_path)
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.

"""Plugin to demostrate timetable registration and accomdate example DAGs."""
"""Plugin to demonstrate timetable registration and accommodate example DAGs."""

# [START howto_timetable]
from datetime import timedelta
Expand Down
12 changes: 10 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import warnings
from collections import defaultdict
from datetime import timedelta
from typing import Collection, DefaultDict, Dict, List, Optional, Tuple
from typing import Collection, DefaultDict, Dict, Iterator, List, Optional, Tuple

from sqlalchemy import and_, func, not_, or_, tuple_
from sqlalchemy.exc import OperationalError
Expand Down Expand Up @@ -510,7 +510,15 @@ def _process_executor_events(self, session: Session = None) -> int:

# Check state of finished tasks
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
tis: List[TI] = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model')).all()
query = session.query(TI).filter(filter_for_tis).options(selectinload('dag_model'))
# row lock this entire set of taskinstances to make sure the scheduler doesn't fail when we have
# multi-schedulers
tis: Iterator[TI] = with_row_locks(
query,
of=TI,
session=session,
**skip_locked(session=session),
)
for ti in tis:
try_number = ti_primary_key_to_try_number_map[ti.key.primary]
buffer_key = ti.key.with_try_number(try_number)
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,8 @@ def handle_failure(
session.add(Log(State.FAILED, self))

# Log failure duration
session.add(TaskFail(task, self.execution_date, self.start_date, self.end_date))
dag_run = self.get_dagrun(session=session) # self.dag_run not populated by refresh_from_db
session.add(TaskFail(task, dag_run.execution_date, self.start_date, self.end_date))

# Ensure we unset next_method and next_kwargs to ensure that any
# retries don't re-use them.
Expand Down
75 changes: 63 additions & 12 deletions airflow/providers/amazon/aws/transfers/mysql_to_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,33 @@
# under the License.

import os
import warnings
from collections import namedtuple
from enum import Enum
from tempfile import NamedTemporaryFile
from typing import Optional, Union

import numpy as np
import pandas as pd
from typing_extensions import Literal

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.mysql.hooks.mysql import MySqlHook

FILE_FORMAT = Enum(
"FILE_FORMAT",
"CSV, PARQUET",
)

FileOptions = namedtuple('FileOptions', ['mode', 'suffix'])

FILE_OPTIONS_MAP = {
FILE_FORMAT.CSV: FileOptions('r+', '.csv'),
FILE_FORMAT.PARQUET: FileOptions('rb+', '.parquet'),
}


class MySQLToS3Operator(BaseOperator):
"""
Expand Down Expand Up @@ -60,6 +76,11 @@ class MySQLToS3Operator(BaseOperator):
:type index: str
:param header: whether to include header or not into the S3 file
:type header: bool
:param file_format: the destination file format, only string 'csv' or 'parquet' is accepted.
:type file_format: str
:param pd_kwargs: arguments to include in ``DataFrame.to_parquet()`` or
``DataFrame.to_csv()``. This is preferred than ``pd_csv_kwargs``.
:type pd_kwargs: dict
"""

template_fields = (
Expand All @@ -68,7 +89,11 @@ class MySQLToS3Operator(BaseOperator):
'query',
)
template_ext = ('.sql',)
template_fields_renderers = {"query": "sql", "pd_csv_kwargs": "json"}
template_fields_renderers = {
"query": "sql",
"pd_csv_kwargs": "json",
"pd_kwargs": "json",
}

def __init__(
self,
Expand All @@ -82,6 +107,8 @@ def __init__(
pd_csv_kwargs: Optional[dict] = None,
index: bool = False,
header: bool = False,
file_format: Literal['csv', 'parquet'] = 'csv',
pd_kwargs: Optional[dict] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -92,15 +119,38 @@ def __init__(
self.aws_conn_id = aws_conn_id
self.verify = verify

self.pd_csv_kwargs = pd_csv_kwargs or {}
if "path_or_buf" in self.pd_csv_kwargs:
raise AirflowException('The argument path_or_buf is not allowed, please remove it')
if "index" not in self.pd_csv_kwargs:
self.pd_csv_kwargs["index"] = index
if "header" not in self.pd_csv_kwargs:
self.pd_csv_kwargs["header"] = header
if file_format == "csv":
self.file_format = FILE_FORMAT.CSV
else:
self.file_format = FILE_FORMAT.PARQUET

if pd_csv_kwargs:
warnings.warn(
"pd_csv_kwargs is deprecated. Please use pd_kwargs.",
DeprecationWarning,
stacklevel=2,
)
if index or header:
warnings.warn(
"index and header are deprecated. Please pass them via pd_kwargs.",
DeprecationWarning,
stacklevel=2,
)

self.pd_kwargs = pd_kwargs or pd_csv_kwargs or {}
if self.file_format == FILE_FORMAT.CSV:
if "path_or_buf" in self.pd_kwargs:
raise AirflowException('The argument path_or_buf is not allowed, please remove it')
if "index" not in self.pd_kwargs:
self.pd_kwargs["index"] = index
if "header" not in self.pd_kwargs:
self.pd_kwargs["header"] = header
else:
if pd_csv_kwargs is not None:
raise TypeError("pd_csv_kwargs may not be specified when file_format='parquet'")

def _fix_int_dtypes(self, df: pd.DataFrame) -> None:
@staticmethod
def _fix_int_dtypes(df: pd.DataFrame) -> None:
"""Mutate DataFrame to set dtypes for int columns containing NaN values."""
for col in df:
if "float" in df[col].dtype.name and df[col].hasnans:
Expand All @@ -118,9 +168,10 @@ def execute(self, context) -> None:
self.log.info("Data from MySQL obtained")

self._fix_int_dtypes(data_df)
with NamedTemporaryFile(mode='r+', suffix='.csv') as tmp_csv:
data_df.to_csv(tmp_csv.name, **self.pd_csv_kwargs)
s3_conn.load_file(filename=tmp_csv.name, key=self.s3_key, bucket_name=self.s3_bucket)
file_options = FILE_OPTIONS_MAP[self.file_format]
with NamedTemporaryFile(mode=file_options.mode, suffix=file_options.suffix) as tmp_file:
data_df.to_csv(tmp_file.name, **self.pd_kwargs)
s3_conn.load_file(filename=tmp_file.name, key=self.s3_key, bucket_name=self.s3_bucket)

if s3_conn.check_for_key(self.s3_key, bucket_name=self.s3_bucket):
file_location = os.path.join(self.s3_bucket, self.s3_key)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/cncf/kubernetes/utils/pod_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def read_pod_logs(
)
except BaseHTTPError:
self.log.exception('There was an error reading the kubernetes API.')
# Reraise to be catched by self.monitor_pod.
# Reraise to be caught by self.monitor_pod.
raise

@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
Expand Down
9 changes: 5 additions & 4 deletions airflow/providers/facebook/ads/hooks/ads.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"""This module contains Facebook Ads Reporting hooks"""
import time
from enum import Enum
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

try:
from functools import cached_property
Expand Down Expand Up @@ -53,8 +53,9 @@ class FacebookAdsReportingHook(BaseHook):
:param facebook_conn_id: Airflow Facebook Ads connection ID
:type facebook_conn_id: str
:param api_version: The version of Facebook API. Default to v6.0
:type api_version: str
:param api_version: The version of Facebook API. Default to None. If it is None,
it will use the Facebook business SDK default version.
:type api_version: Optional[str]
"""

Expand All @@ -66,7 +67,7 @@ class FacebookAdsReportingHook(BaseHook):
def __init__(
self,
facebook_conn_id: str = default_conn_name,
api_version: str = "v6.0",
api_version: Optional[str] = None,
) -> None:
super().__init__()
self.facebook_conn_id = facebook_conn_id
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/google/cloud/hooks/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ def _build_new_schema(
# Turn schema_field_updates into a dict keyed on field names
schema_fields_updates = {field["name"]: field for field in deepcopy(schema_fields_updates)}

# Create a new dict for storing the new schema, initated based on the current_schema
# Create a new dict for storing the new schema, initiated based on the current_schema
# as of Python 3.6, dicts retain order.
new_schema = {field["name"]: field for field in deepcopy(current_schema)}

Expand Down
7 changes: 4 additions & 3 deletions airflow/providers/google/cloud/transfers/bigquery_to_mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ class BigQueryToMsSqlOperator(BaseOperator):
transfer_data = BigQueryToMsSqlOperator(
task_id='task_id',
dataset_table='origin_bq_table',
source_project_dataset_table='my-project.mydataset.mytable',
mssql_table='dest_table_name',
replace=True,
)
:param dataset_table: A dotted ``<dataset>.<table>``: the big query table of origin
:type dataset_table: str
:param source_project_dataset_table: A dotted ``<project>.<dataset>.<table>``:
the big query table of origin
:type source_project_dataset_table: str
:param selected_fields: List of fields to return (comma-separated). If
unspecified, all fields are returned.
:type selected_fields: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class FacebookAdsReportToGcsOperator(BaseOperator):
:type gcp_conn_id: str
:param facebook_conn_id: Airflow Facebook Ads connection ID
:type facebook_conn_id: str
:param api_version: The version of Facebook API. Default to v6.0
:param api_version: The version of Facebook API. Default to None. If it is None,
it will use the Facebook business SDK default version.
:type api_version: str
:param fields: List of fields that is obtained from Facebook. Found in AdsInsights.Field class.
https://developers.facebook.com/docs/marketing-api/insights/parameters/v6.0
Expand Down Expand Up @@ -95,7 +96,7 @@ def __init__(
params: Dict[str, Any] = None,
parameters: Dict[str, Any] = None,
gzip: bool = False,
api_version: str = "v6.0",
api_version: Optional[str] = None,
gcp_conn_id: str = "google_cloud_default",
facebook_conn_id: str = "facebook_default",
impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __init__(
self.ignore_if_missing = ignore_if_missing

def execute(self, context: dict) -> None:
self.log.info('Deleting blob: %s\nin wasb://%s', self.blob_name, self.container_name)
self.log.info('Deleting blob: %s\n in wasb://%s', self.blob_name, self.container_name)
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)

hook.delete_file(
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/microsoft/azure/sensors/wasb.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(
self.check_options = check_options

def poke(self, context: dict):
self.log.info('Poking for blob: %s\nin wasb://%s', self.blob_name, self.container_name)
self.log.info('Poking for blob: %s\n in wasb://%s', self.blob_name, self.container_name)
hook = WasbHook(wasb_conn_id=self.wasb_conn_id)
return hook.check_for_blob(self.container_name, self.blob_name, **self.check_options)

Expand Down
Loading

0 comments on commit c9a5852

Please sign in to comment.