Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class BaseExtractor(ABC, LoggingMixin):

_allowed_query_params: list[str] = []

def __init__(self, operator): # type: ignore
def __init__(self, operator):
super().__init__()
self.operator = operator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_or_create_openlineage_client(self) -> OpenLineageClient:
"OpenLineage configuration found. Transport type: `%s`",
config.get("transport", {}).get("type", "no type provided"),
)
self._client = OpenLineageClient(config=config) # type: ignore[call-arg]
self._client = OpenLineageClient(config=config)
else:
self.log.debug(
"OpenLineage configuration not found directly in Airflow. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def on_task_instance_running( # type: ignore[misc]
self,
previous_state: TaskInstanceState,
task_instance: TaskInstance,
session: Session, # type: ignore[valid-type]
session: Session,
) -> None:
from airflow.providers.openlineage.utils.utils import is_ti_rescheduled_already

Expand Down Expand Up @@ -261,7 +261,7 @@ def on_task_instance_success( # type: ignore[misc]
self,
previous_state: TaskInstanceState,
task_instance: TaskInstance,
session: Session, # type: ignore[valid-type]
session: Session,
) -> None:
self.log.debug("OpenLineage listener got notification about task instance success")
task = task_instance.task
Expand Down Expand Up @@ -391,7 +391,7 @@ def on_task_instance_failed( # type: ignore[misc]
previous_state: TaskInstanceState,
task_instance: TaskInstance,
error: None | str | BaseException,
session: Session, # type: ignore[valid-type]
session: Session,
) -> None:
self.log.debug("OpenLineage listener got notification about task instance failure")
task = task_instance.task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ def get_user_provided_run_facets(ti: TaskInstance, ti_state: TaskInstanceState)
def get_fully_qualified_class_name(operator: BaseOperator | MappedOperator) -> str:
if isinstance(operator, (MappedOperator, SerializedBaseOperator)):
# as in airflow.api_connexion.schemas.common_schema.ClassReferenceSchema
return operator._task_module + "." + operator._task_type # type: ignore
return operator._task_module + "." + operator._task_type
op_class = get_operator_class(operator)
return op_class.__module__ + "." + op_class.__name__

Expand Down Expand Up @@ -947,7 +947,7 @@ def translate_airflow_asset(asset: Asset, lineage_context) -> OpenLineageDataset
from airflow.sdk.definitions.asset import _get_normalized_scheme
else:
try:
from airflow.datasets import _get_normalized_scheme # type: ignore[no-redef, attr-defined]
from airflow.datasets import _get_normalized_scheme # type: ignore[no-redef]
except ImportError:
return None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def execute(self, context: Context) -> None:
if self.file_path is not None:
self.event_templates = {}
self.log.info("Reading OpenLineage event templates from file `%s`", self.file_path)
with open(self.file_path) as f: # type: ignore[arg-type]
with open(self.file_path) as f:
events = json.load(f)
for event in events:
# Just a single event per job and event type is loaded as this is the most common scenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class DifferentOperatorLineage:
job_facets: dict[str, BaseFacet] = Factory(dict)
some_other_param: dict = Factory(dict)

return DifferentOperatorLineage( # type: ignore
return DifferentOperatorLineage(
name="unused",
inputs=INPUTS,
outputs=OUTPUTS,
Expand All @@ -232,7 +232,7 @@ class WrongOperatorLineage:
outputs: list[Dataset] = Factory(list)
some_other_param: dict = Factory(dict)

return WrongOperatorLineage( # type: ignore
return WrongOperatorLineage(
inputs=INPUTS,
outputs=OUTPUTS,
some_other_param={"asdf": "fdsa"},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,7 @@ def sample_callable(**kwargs):
start_date=date,
run_type=types.DagRunType.MANUAL,
state=DagRunState.QUEUED,
**dagrun_kwargs, # type: ignore
**dagrun_kwargs, # type: ignore[arg-type]
)
task_instance = TaskInstance(t, run_id=run_id) # type: ignore
task_instance.dag_run = dagrun
Expand All @@ -1074,7 +1074,7 @@ def _create_listener_and_task_instance(

if not runtime_ti:
# TaskInstance is used when on API server (when listener gets called about manual state change)
task_instance = TaskInstance(task=MagicMock(), dag_version_id=uuid7()) # type: ignore
task_instance = TaskInstance(task=MagicMock(), dag_version_id=uuid7())
task_instance.dag_run = DagRun()
task_instance.dag_run.dag_id = "dag_id_from_dagrun_and_not_ti"
task_instance.dag_run.run_id = "dag_run_run_id"
Expand Down Expand Up @@ -1867,7 +1867,7 @@ def simple_callable(**kwargs):
run_type=types.DagRunType.MANUAL,
state=DagRunState.QUEUED,
execution_date=date,
) # type: ignore
)
self.task_instance_1 = TaskInstance(self.task_1, run_id=run_id, map_index=-1)
self.task_instance_2 = TaskInstance(self.task_2, run_id=run_id, map_index=-1)
self.task_instance_1.dag_run = self.task_instance_2.dag_run = self.dagrun
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def __init__(self):

@pytest.mark.enable_redact
def test_redact_with_exclusions(monkeypatch):
redactor = OpenLineageRedactor.from_masker(_secrets_masker()) # type: ignore[assignment]
redactor = OpenLineageRedactor.from_masker(_secrets_masker())

class NotMixin:
def __init__(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
if AIRFLOW_V_3_0_PLUS:
from airflow.sdk import BaseOperator, task
else:
from airflow.decorators import task # type: ignore[no-redef]
from airflow.decorators import task
from airflow.models.baseoperator import BaseOperator # type: ignore[no-redef]

BASH_OPERATOR_PATH = "airflow.providers.standard.operators.bash"
Expand Down