Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR301_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
from airflow import DAG, dag
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.weekday import BranchDayOfWeekOperator
from airflow.providers.amazon.aws.log.s3_task_handler import S3TaskHandler
from airflow.providers.apache.hdfs.log.hdfs_task_handler import HdfsTaskHandler
from airflow.providers.elasticsearch.log.es_task_handler import ElasticsearchTaskHandler
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.google.cloud.log.gcs_task_handler import GCSTaskHandler
from airflow.providers.standard.operators import datetime, trigger_dagrun
from airflow.providers.standard.sensors import weekday
from airflow.sensors.weekday import BranchDayOfWeekOperator, DayOfWeekSensor
from airflow.sensors.weekday import DayOfWeekSensor
from airflow.timetables.simple import NullTimetable

DAG(dag_id="class_schedule", schedule="@hourly")
Expand Down Expand Up @@ -50,10 +51,10 @@ def decorator_timetable():
@dag()
def decorator_deprecated_operator_args():
trigger_dagrun_op = trigger_dagrun.TriggerDagRunOperator(
task_id="trigger_dagrun_op1", execution_date="2024-12-04"
task_id="trigger_dagrun_op1", trigger_dag_id="test", execution_date="2024-12-04"
)
trigger_dagrun_op2 = TriggerDagRunOperator(
task_id="trigger_dagrun_op2", execution_date="2024-12-04"
task_id="trigger_dagrun_op2", trigger_dag_id="test", execution_date="2024-12-04"
)

branch_dt_op = datetime.BranchDateTimeOperator(
Expand All @@ -66,16 +67,30 @@ def decorator_deprecated_operator_args():
)

dof_task_sensor = weekday.DayOfWeekSensor(
task_id="dof_task_sensor", use_task_execution_day=True
task_id="dof_task_sensor",
week_day=1,
use_task_execution_day=True,
)
dof_task_sensor2 = DayOfWeekSensor(
task_id="dof_task_sensor2", use_task_execution_day=True
task_id="dof_task_sensor2",
week_day=1,
use_task_execution_day=True,
)

bdow_op = weekday.BranchDayOfWeekOperator(
task_id="bdow_op", use_task_execution_day=True
task_id="bdow_op",
follow_task_ids_if_false=None,
follow_task_ids_if_true=None,
week_day=1,
use_task_execution_day=True,
)
bdow_op2 = BranchDayOfWeekOperator(
task_id="bdow_op2",
follow_task_ids_if_false=None,
follow_task_ids_if_true=None,
week_day=1,
use_task_execution_day=True,
)
bdow_op2 = BranchDayOfWeekOperator(task_id="bdow_op2", use_task_execution_day=True)

trigger_dagrun_op >> trigger_dagrun_op2
branch_dt_op >> branch_dt_op2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
)
from airflow.datasets.manager import DatasetManager
from airflow.lineage.hook import DatasetLineageInfo, HookLineageCollector
from airflow.providers.amazon.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.amazon.aws.auth_manager.aws_auth_manager import AwsAuthManager
from airflow.providers.apache.beam.hooks import BeamHook, NotAir302HookError
from airflow.providers.google.cloud.secrets.secret_manager import (
CloudSecretManagerBackend,
Expand Down Expand Up @@ -83,8 +83,7 @@

# airflow.providers_manager
pm = ProvidersManager()
pm.initialize_providers_asset_uri_resources()
pm.dataset_factories
pm.initialize_providers_dataset_uri_resources()
pm.dataset_factories
pm.dataset_uri_handlers
pm.dataset_to_openlineage_converters
Expand Down
10 changes: 10 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR301_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,13 @@
from airflow.providers.trino.datasets.trino import sanitize_uri

sanitize_uri

# airflow.notifications.basenotifier
from airflow.notifications.basenotifier import BaseNotifier

BaseNotifier()

# airflow.auth.manager
from airflow.auth.managers.base_auth_manager import BaseAuthManager

BaseAuthManager()
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.api_connexion.security import requires_access_dataset
from airflow.auth.managers.models.resource_details import (
DatasetDetails,
is_authorized_dataset,

)
from airflow.datasets.manager import (
DatasetManager,
Expand All @@ -19,7 +19,7 @@
requires_access_dataset()

DatasetDetails()
is_authorized_dataset()


DatasetManager()
dataset_manager()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from airflow.providers.amazon.aws.auth_manager.avp.entities.AvpEntities import DATASET
from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities
from airflow.providers.amazon.aws.datasets.s3 import (
convert_dataset_to_openlineage as s3_convert_dataset_to_openlineage,
)
Expand All @@ -9,7 +9,7 @@
convert_dataset_to_openlineage as io_convert_dataset_to_openlineage,
)
from airflow.providers.common.io.dataset.file import create_dataset as io_create_dataset
from airflow.providers.fab.auth_manager.fab_auth_manager import is_authorized_dataset as fab_is_authorized_dataset

from airflow.providers.google.datasets.bigquery import (
create_dataset as bigquery_create_dataset,
)
Expand All @@ -22,15 +22,15 @@
translate_airflow_dataset,
)

DATASET
AvpEntities.DATASET

s3_create_dataset()
s3_convert_dataset_to_openlineage()

io_create_dataset()
io_convert_dataset_to_openlineage()

fab_is_authorized_dataset()


# airflow.providers.google.datasets.bigquery
bigquery_create_dataset()
Expand Down
114 changes: 48 additions & 66 deletions crates/ruff_linter/src/rules/airflow/rules/removal_in_3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ pub(crate) fn airflow_3_removal_expr(checker: &Checker, expr: &Expr) {
check_method(checker, call_expr);
check_context_key_usage_in_call(checker, call_expr);
}
Expr::Attribute(attribute_expr @ ExprAttribute { attr, .. }) => {
check_name(checker, expr, attr.range());
Expr::Attribute(attribute_expr @ ExprAttribute { range, .. }) => {
check_name(checker, expr, *range);
check_class_attribute(checker, attribute_expr);
}
Expr::Name(ExprName { id, ctx, range }) => {
Expand Down Expand Up @@ -260,8 +260,9 @@ fn check_call_arguments(checker: &Checker, qualified_name: &QualifiedName, argum
..,
"operators",
"weekday",
"DayOfWeekSensor" | "BranchDayOfWeekOperator",
] => {
"BranchDayOfWeekOperator",
]
| ["airflow", .., "sensors", "weekday", "DayOfWeekSensor"] => {
checker.report_diagnostics(diagnostic_for_argument(
arguments,
"use_task_execution_day",
Expand Down Expand Up @@ -496,17 +497,6 @@ fn check_method(checker: &Checker, call_expr: &ExprCall) {
"collected_datasets" => Replacement::AttrName("collected_assets"),
_ => return,
},
[
"airflow",
"providers",
"amazon",
"auth_manager",
"aws_auth_manager",
"AwsAuthManager",
] => match attr.as_str() {
"is_authorized_dataset" => Replacement::AttrName("is_authorized_asset"),
_ => return,
},
["airflow", "providers_manager", "ProvidersManager"] => match attr.as_str() {
"initialize_providers_dataset_uri_resources" => {
Replacement::AttrName("initialize_providers_asset_uri_resources")
Expand Down Expand Up @@ -539,6 +529,12 @@ fn check_method(checker: &Checker, call_expr: &ExprCall) {
"get_connections" => Replacement::AttrName("get_connection"),
_ => return,
}
} else if is_airflow_auth_manager(segments) {
if attr.as_str() == "is_authorized_dataset" {
Replacement::AttrName("is_authorized_asset")
} else {
return;
}
} else {
return;
}
Expand Down Expand Up @@ -596,16 +592,16 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
] => Replacement::Message("Use `sys.version_info` instead"),

// airflow.api_connexion.security
["airflow", "api_connexion", "security", "requires_access"] => {
Replacement::Message("Use `airflow.api_connexion.security.requires_access_*` instead")
}
["airflow", "api_connexion", "security", "requires_access"] => Replacement::Message(
"Use `airflow.api_fastapi.core_api.security.requires_access_*` instead",
),
[
"airflow",
"api_connexion",
"security",
"requires_access_dataset",
] => Replacement::AutoImport {
module: "airflow.api_connexion.security",
module: "airflow.api_fastapi.core_api.security",
name: "requires_access_asset",
},

Expand All @@ -614,33 +610,34 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
"airflow",
"auth",
"managers",
"models",
"resource_details",
"DatasetDetails",
"base_auth_manager",
"BaseAuthManager",
] => Replacement::AutoImport {
module: "airflow.api_fastapi.auth.managers.models.resource_details",
name: "AssetDetails",
module: "airflow.api_fastapi.auth.managers.base_auth_manager",
name: "BaseAuthManager",
},
[
"airflow",
"auth",
"managers",
"base_auth_manager",
"is_authorized_dataset",
"models",
"resource_details",
"DatasetDetails",
] => Replacement::AutoImport {
module: "airflow.api_fastapi.auth.managers.base_auth_manager",
name: "is_authorized_asset",
module: "airflow.api_fastapi.auth.managers.models.resource_details",
name: "AssetDetails",
},

// airflow.configuration
// TODO: check whether we could improve it
[
"airflow",
"configuration",
rest @ ("as_dict" | "get" | "getboolean" | "getfloat" | "getint" | "has_option"
| "remove_option" | "set"),
] => Replacement::SourceModuleMoved {
module: "airflow.configuration.conf",
name: (*rest).to_string(),
module: "airflow.configuration",
name: format!("conf.{rest}"),
},

// airflow.contrib.*
Expand Down Expand Up @@ -680,7 +677,6 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
},

// airflow.listeners.spec
// TODO: this is removed
["airflow", "listeners", "spec", "dataset", rest] => match *rest {
"on_dataset_created" => Replacement::AutoImport {
module: "airflow.listeners.spec.asset",
Expand Down Expand Up @@ -708,7 +704,7 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {

// airflow.notifications
["airflow", "notifications", "basenotifier", "BaseNotifier"] => Replacement::AutoImport {
module: "airflow.sdk",
module: "airflow.sdk.bases.notifier",
name: "BaseNotifier",
},

Expand Down Expand Up @@ -818,22 +814,18 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
},

// airflow.www
// TODO: www has been removed
["airflow", "www", "auth", "has_access"] => {
Replacement::Message("Use `airflow.www.auth.has_access_*` instead")
}
["airflow", "www", "auth", "has_access_dataset"] => Replacement::AutoImport {
module: "airflow.www.auth",
name: "has_access_asset",
},
["airflow", "www", "utils", "get_sensitive_variables_fields"] => Replacement::AutoImport {
module: "airflow.utils.log.secrets_masker",
name: "get_sensitive_variables_fields",
},
["airflow", "www", "utils", "should_hide_value_for_key"] => Replacement::AutoImport {
module: "airflow.utils.log.secrets_masker",
name: "should_hide_value_for_key",
},
[
"airflow",
"www",
"auth",
"has_access" | "has_access_dataset",
] => Replacement::None,
[
"airflow",
"www",
"utils",
"get_sensitive_variables_fields" | "should_hide_value_for_key",
] => Replacement::None,

// airflow.providers.amazon
[
Expand Down Expand Up @@ -870,8 +862,8 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
"AvpEntities",
"DATASET",
] => Replacement::AutoImport {
module: "airflow.providers.amazon.aws.auth_manager.avp.entities.AvpEntities",
name: "ASSET",
module: "airflow.providers.amazon.aws.auth_manager.avp.entities",
name: "AvpEntities.ASSET",
},

// airflow.providers.common.io
Expand Down Expand Up @@ -900,19 +892,6 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
_ => return,
},

// airflow.providers.fab
[
"airflow",
"providers",
"fab",
"auth_manager",
"fab_auth_manager",
"is_authorized_dataset",
] => Replacement::AutoImport {
module: "airflow.providers.fab.auth_manager.fab_auth_manager",
name: "is_authorized_asset",
},

// airflow.providers.google
// airflow.providers.google.datasets
["airflow", "providers", "google", "datasets", rest @ ..] => match &rest {
Expand Down Expand Up @@ -1016,13 +995,16 @@ fn check_name(checker: &Checker, expr: &Expr, range: TextRange) {
if is_guarded_by_try_except(expr, module, name, semantic) {
return;
}

let import_target = name.split('.').next().unwrap_or(name);

diagnostic.try_set_fix(|| {
let (import_edit, binding) = checker.importer().get_or_import_symbol(
&ImportRequest::import_from(module, name),
let (import_edit, _) = checker.importer().get_or_import_symbol(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're supposed to use the binding returned by get_or_import_symbol. Was something going wrong here when you did that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There 2 cases like the following that need to be handled this way.

from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities

AvpEntities.DATASET

which should be fixed as

from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities

AvpEntities.ASSET

is we use binding, it will be fixed as

from airflow.providers.amazon.aws.auth_manager.avp.entities import AvpEntities

AvpEntities

which is wrong

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think that makes sense. Thanks!

&ImportRequest::import_from(module, import_target),
expr.start(),
checker.semantic(),
)?;
let replacement_edit = Edit::range_replacement(binding, range);
let replacement_edit = Edit::range_replacement(name.to_string(), range);
Ok(Fix::safe_edits(import_edit, [replacement_edit]))
});
}
Expand Down
Loading
Loading