Skip to content

Commit

Permalink
openlineage: make value of slots in attrs.define consistent across al…
Browse files Browse the repository at this point in the history
…l OL usages (#40992)

Signed-off-by: Kacper Muda <mudakacper@gmail.com>
  • Loading branch information
kacpermuda committed Jul 24, 2024
1 parent 25e1cf1 commit 10f2503
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 13 deletions.
14 changes: 7 additions & 7 deletions airflow/providers/openlineage/plugins/facets.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
@define(slots=False)
@define
class AirflowMappedTaskRunFacet(RunFacet):
"""Run facet containing information about mapped tasks."""

Expand All @@ -47,7 +47,7 @@ def from_task_instance(cls, task_instance):
)


@define(slots=True)
@define
class AirflowJobFacet(JobFacet):
"""
Composite Airflow job facet.
Expand All @@ -70,7 +70,7 @@ class AirflowJobFacet(JobFacet):
tasks: dict


@define(slots=True)
@define
class AirflowStateRunFacet(RunFacet):
"""
Airflow facet providing state information.
Expand All @@ -89,7 +89,7 @@ class AirflowStateRunFacet(RunFacet):
tasksState: dict[str, str]


@define(slots=False)
@define
class AirflowRunFacet(RunFacet):
"""Composite Airflow run facet."""

Expand All @@ -100,15 +100,15 @@ class AirflowRunFacet(RunFacet):
taskUuid: str


@define(slots=True)
@define
class AirflowDagRunFacet(RunFacet):
"""Composite Airflow DAG run facet."""

dag: dict
dagRun: dict


@define(slots=False)
@define
class UnknownOperatorInstance(RedactMixin):
"""
Describes an unknown operator.
Expand All @@ -127,7 +127,7 @@ class UnknownOperatorInstance(RedactMixin):
reason="To be removed in the next release. Make sure to use information from AirflowRunFacet instead.",
category=AirflowProviderDeprecationWarning,
)
@define(slots=False)
@define
class UnknownOperatorAttributeRunFacet(RunFacet):
"""RunFacet that describes unknown operators in an Airflow DAG."""

Expand Down
14 changes: 13 additions & 1 deletion airflow/providers/openlineage/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,19 @@ def _include_fields(self):
setattr(self, field, getattr(self.obj, field))
self._fields.append(field)
else:
for field, val in self.obj.__dict__.items():
if hasattr(self.obj, "__dict__"):
obj_fields = self.obj.__dict__
elif attrs.has(self.obj.__class__): # e.g. attrs.define class with slots=True has no __dict__
obj_fields = {
field.name: getattr(self.obj, field.name) for field in attrs.fields(self.obj.__class__)
}
else:
raise ValueError(
"Cannot iterate over fields: "
f"The object of type {type(self.obj).__name__} neither has a __dict__ attribute "
"nor is defined as an attrs class."
)
for field, val in obj_fields.items():
if field not in self._fields and field not in self.excludes and field not in self.renames:
setattr(self, field, val)
self._fields.append(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ Writing a custom facet function
from airflow.providers.common.compat.openlineage.facet import RunFacet
@attrs.define(slots=False)
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom facet."""
Expand Down
28 changes: 25 additions & 3 deletions tests/providers/openlineage/plugins/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,28 @@ class TestInfo(InfoJsonEncodable):
casts = {"iwanttobeint": lambda x: int(x.imastring)}
renames = {"_faulty_name": "goody_name"}

@define
class Test:
exclude_1: str
imastring: str
_faulty_name: str
donotcare: str

obj = Test("val", "123", "not_funny", "abc")

assert json.loads(json.dumps(TestInfo(obj))) == {
"iwanttobeint": 123,
"goody_name": "not_funny",
"donotcare": "abc",
}


def test_info_json_encodable_without_slots():
class TestInfo(InfoJsonEncodable):
excludes = ["exclude_1", "exclude_2", "imastring"]
casts = {"iwanttobeint": lambda x: int(x.imastring)}
renames = {"_faulty_name": "goody_name"}

@define(slots=False)
class Test:
exclude_1: str
Expand All @@ -122,7 +144,7 @@ def test_info_json_encodable_list_does_not_flatten():
class TestInfo(InfoJsonEncodable):
includes = ["alist"]

@define(slots=False)
@define
class Test:
alist: list[str]

Expand All @@ -135,7 +157,7 @@ def test_info_json_encodable_list_does_include_nonexisting():
class TestInfo(InfoJsonEncodable):
includes = ["exists", "doesnotexist"]

@define(slots=False)
@define
class Test:
exists: str

Expand Down Expand Up @@ -191,7 +213,7 @@ def __init__(self):
self.password = "passwd"
self.transparent = "123"

@define(slots=False)
@define
class NestedMixined(RedactMixin):
_skip_redact = ["nested_field"]
password: str
Expand Down
2 changes: 1 addition & 1 deletion tests/providers/openlineage/utils/custom_facet_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.models.taskinstance import TaskInstance, TaskInstanceState


@attrs.define(slots=False)
@attrs.define
class MyCustomRunFacet(RunFacet):
"""Define a custom run facet."""

Expand Down

0 comments on commit 10f2503

Please sign in to comment.