Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@

* Support configuring Firestore database on ReadFn transforms (Java) ([#36904](https://github.com/apache/beam/issues/36904)).

* Moved `Row` to `apache_beam.typehints.row` to avoid import cycles and improve module organization. Kept a compatibility alias in `pvalue.py` (Python) ([#35095](https://github.com/apache/beam/issues/35095)).


Comment on lines +78 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These extra blank lines can be removed for better readability and to keep the changelog concise.


## Breaking Changes

* (Python) Some Python dependencies have been split out into extras. To ensure all previously installed dependencies are installed, when installing Beam you can `pip install apache-beam[gcp,interactive,yaml,redis,hadoop,tfrecord]`, though most users will not need all of these extras ([#34554](https://github.com/apache/beam/issues/34554)).
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@
from apache_beam import version
from apache_beam.pipeline import *
from apache_beam.pvalue import PCollection
from apache_beam.pvalue import Row
from apache_beam.pvalue import TaggedOutput
from apache_beam.transforms import *
from apache_beam.typehints.row import Row

try:
# Add mitigation for CVE-2023-47248 while Beam allows affected versions
Expand Down
58 changes: 1 addition & 57 deletions sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from typing import Dict
from typing import Generic
from typing import Iterator
from typing import NamedTuple
from typing import Optional
from typing import Sequence
from typing import TypeVar
Expand All @@ -45,6 +44,7 @@
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.typehints.row import Row

if TYPE_CHECKING:
from apache_beam.pipeline import AppliedPTransform
Expand Down Expand Up @@ -636,59 +636,3 @@ class EmptySideInput(object):
want to create new instances of this class themselves.
"""
pass


class Row(object):
"""A dynamic schema'd row object.

This objects attributes are initialized from the keywords passed into its
constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y.

More importantly, when a Row object is returned from a `Map`, `FlatMap`, or
`DoFn` type inference is able to deduce the schema of the resulting
PCollection, e.g.

pc | beam.Map(lambda x: Row(x=x, y=0.5 * x))

when applied to a PCollection of ints will produce a PCollection with schema
`(x=int, y=float)`.

Note that in Beam 2.30.0 and later, Row objects are sensitive to field order.
So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)

def as_dict(self):
return dict(self.__dict__)

# For compatibility with named tuples.
_asdict = as_dict

def __iter__(self):
for _, value in self.__dict__.items():
yield value

def __repr__(self):
return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items())

def __hash__(self):
return hash(self.__dict__.items())

def __eq__(self, other):
if type(self) == type(other):
other_dict = other.__dict__
elif type(other) == type(NamedTuple):
other_dict = other._asdict()
else:
return False
return (
len(self.__dict__) == len(other_dict) and
all(s == o for s, o in zip(self.__dict__.items(), other_dict.items())))

def __reduce__(self):
return _make_Row, tuple(self.__dict__.items())


def _make_Row(*items):
return Row(**dict(items))
74 changes: 74 additions & 0 deletions sdks/python/apache_beam/typehints/row.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import NamedTuple


class Row(object):
"""A dynamic schema'd row object.
This objects attributes are initialized from the keywords passed into its
constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y.
More importantly, when a Row object is returned from a `Map`, `FlatMap`, or
`DoFn` type inference is able to deduce the schema of the resulting
PCollection, e.g.
pc | beam.Map(lambda x: Row(x=x, y=0.5 * x))
when applied to a PCollection of ints will produce a PCollection with schema
`(x=int, y=float)`.
Note that in Beam 2.30.0 and later, Row objects are sensitive to field order.
So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)

def as_dict(self):
return dict(self.__dict__)

# For compatibility with named tuples.
_asdict = as_dict

def __iter__(self):
for _, value in self.__dict__.items():
yield value

def __repr__(self):
return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items())

def __hash__(self):
return hash(self.__dict__.items())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The __hash__ method will raise a TypeError because dict.items() returns a view object, which is not hashable in Python 3. To fix this, you should convert the items to a hashable, ordered type like a tuple. Given that field order is important for Row objects, converting to a tuple is the correct approach.

Suggested change
return hash(self.__dict__.items())
return hash(tuple(self.__dict__.items()))


def __eq__(self, other):
if type(self) == type(other):
other_dict = other.__dict__
elif type(other) == type(NamedTuple):
other_dict = other._asdict()
Comment on lines +61 to +62
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check type(other) == type(NamedTuple) is incorrect for identifying NamedTuple instances. typing.NamedTuple is a class factory, and instances of named tuples do not have NamedTuple as their type. A more robust way to check for namedtuple-like objects is to see if it's a tuple and has an _asdict method.

Suggested change
elif type(other) == type(NamedTuple):
other_dict = other._asdict()
elif isinstance(other, tuple) and hasattr(other, '_asdict'):
other_dict = other._asdict()

else:
return False
return (
len(self.__dict__) == len(other_dict) and
all(s == o for s, o in zip(self.__dict__.items(), other_dict.items())))

def __reduce__(self):
return _make_Row, tuple(self.__dict__.items())


def _make_Row(*items):
return Row(**dict(items))
Loading