From 63ba9c72275011bc5d8c9751cb4921c89b808367 Mon Sep 17 00:00:00 2001 From: Brian Hulette Date: Fri, 12 Aug 2022 13:12:58 -0700 Subject: [PATCH] Add support for FLOAT to Python RowCoder (#22626) * Add support for FLOAT to Python RowCoder * Skip the new test case in Go --- .../model/fnexecution/v1/standard_coders.yaml | 11 ++++++++ .../regression/coders/fromyaml/fromyaml.go | 1 + sdks/python/apache_beam/coders/coder_impl.py | 16 +++++++++++ sdks/python/apache_beam/coders/coders.py | 28 ++++++++++++++++++- .../apache_beam/coders/coders_test_common.py | 3 +- sdks/python/apache_beam/coders/row_coder.py | 3 ++ sdks/python/apache_beam/coders/slow_stream.py | 6 ++++ sdks/python/apache_beam/coders/stream.pxd | 2 ++ sdks/python/apache_beam/coders/stream.pyx | 7 +++++ sdks/python/apache_beam/coders/stream_test.py | 13 +++++++++ 10 files changed, 88 insertions(+), 2 deletions(-) diff --git a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml index 8de508ca32ec..38ce3355860a 100644 --- a/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml +++ b/model/fn-execution/src/main/resources/org/apache/beam/model/fnexecution/v1/standard_coders.yaml @@ -455,6 +455,17 @@ examples: --- +coder: + urn: "beam:coder:row:v1" + # f_float: float32 + payload: "\n\r\n\x07f_float\x1a\x02\x10\x05\x12$8c97b6c5-69e5-4733-907b-26cd8edae612" +examples: + "\x01\x00\x00\x00\x00\x00": {f_float: "0.0"} + "\x01\x00?\x80\x00\x00": {f_float: "1.0"} + "\x01\x00@I\x0eV": {f_float: "3.1415"} + +--- + coder: urn: "beam:coder:sharded_key:v1" components: [{urn: "beam:coder:string_utf8:v1"}] diff --git a/sdks/go/test/regression/coders/fromyaml/fromyaml.go b/sdks/go/test/regression/coders/fromyaml/fromyaml.go index 44e610223bc1..417f801c6154 100644 --- a/sdks/go/test/regression/coders/fromyaml/fromyaml.go +++ b/sdks/go/test/regression/coders/fromyaml/fromyaml.go @@ -51,6 +51,7 @@ var unimplementedCoders = map[string]bool{ var filteredCases = []struct{ filter, reason string }{ {"logical", "BEAM-9615: Support logical types"}, {"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."}, + {"8c97b6c5-69e5-4733-907b-26cd8edae612", "https://github.com/apache/beam/issues/22629: Support single-precision float."}, } // Coder is a representation a serialized beam coder. diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index c8936e71be3d..198893440da0 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -750,6 +750,22 @@ def estimate_size(self, unused_value, nested=False): if unused_value is not None else 0) +class SinglePrecisionFloatCoderImpl(StreamCoderImpl): + """For internal use only; no backwards-compatibility guarantees.""" + def encode_to_stream(self, value, out, nested): + # type: (float, create_OutputStream, bool) -> None + out.write_bigendian_float(value) + + def decode_from_stream(self, in_stream, nested): + # type: (create_InputStream, bool) -> float + return in_stream.read_bigendian_float() + + def estimate_size(self, unused_value, nested=False): + # type: (Any, bool) -> int + # A double is encoded as 8 bytes, regardless of nesting. + return 4 + + class FloatCoderImpl(StreamCoderImpl): """For internal use only; no backwards-compatibility guarantees.""" def encode_to_stream(self, value, out, nested): diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 0d9bc536aa35..845020244617 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -102,6 +102,7 @@ 'ProtoCoder', 'ProtoPlusCoder', 'ShardedKeyCoder', + 'SinglePrecisionFloatCoder', 'SingletonCoder', 'StrUtf8Coder', 'TimestampCoder', @@ -678,8 +679,33 @@ def __hash__(self): Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder) +class SinglePrecisionFloatCoder(FastCoder): + """A coder used for single-precision floating-point values.""" + def _create_impl(self): + return coder_impl.SinglePrecisionFloatCoderImpl() + + def is_deterministic(self): + # type: () -> bool + return True + + def to_type_hint(self): + return float + + def __eq__(self, other): + return type(self) == type(other) + + def __hash__(self): + return hash(type(self)) + + class FloatCoder(FastCoder): - """A coder used for floating-point values.""" + """A coder used for **double-precision** floating-point values. + + Note that the name "FloatCoder" is in reference to Python's ``float`` built-in + which is generally implemented using C doubles. See + :class:`SinglePrecisionFloatCoder` for a single-precision version of this + coder. + """ def _create_impl(self): return coder_impl.FloatCoderImpl() diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0bb769cca2d6..b8a4bcb8fea8 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -158,7 +158,8 @@ def tearDownClass(cls): coders.ListLikeCoder, coders.ProtoCoder, coders.ProtoPlusCoder, - coders.ToBytesCoder + coders.SinglePrecisionFloatCoder, + coders.ToBytesCoder, ]) cls.seen_nested -= set( [coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder]) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index 600d6595f105..1b434d68b479 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -30,6 +30,7 @@ from apache_beam.coders.coders import IterableCoder from apache_beam.coders.coders import MapCoder from apache_beam.coders.coders import NullableCoder +from apache_beam.coders.coders import SinglePrecisionFloatCoder from apache_beam.coders.coders import StrUtf8Coder from apache_beam.coders.coders import VarIntCoder from apache_beam.portability import common_urns @@ -150,6 +151,8 @@ def _nonnull_coder_from_type(field_type): if type_info == "atomic_type": if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64): return VarIntCoder() + elif field_type.atomic_type == schema_pb2.FLOAT: + return SinglePrecisionFloatCoder() elif field_type.atomic_type == schema_pb2.DOUBLE: return FloatCoder() elif field_type.atomic_type == schema_pb2.STRING: diff --git a/sdks/python/apache_beam/coders/slow_stream.py b/sdks/python/apache_beam/coders/slow_stream.py index cf71c3e8ac7f..11ccf7fd2e37 100644 --- a/sdks/python/apache_beam/coders/slow_stream.py +++ b/sdks/python/apache_beam/coders/slow_stream.py @@ -72,6 +72,9 @@ def write_bigendian_int32(self, v): def write_bigendian_double(self, v): self.write(struct.pack('>d', v)) + def write_bigendian_float(self, v): + self.write(struct.pack('>f', v)) + def get(self): # type: () -> bytes return b''.join(self.data) @@ -172,6 +175,9 @@ def read_bigendian_int32(self): def read_bigendian_double(self): return struct.unpack('>d', self.read(8))[0] + def read_bigendian_float(self): + return struct.unpack('>f', self.read(4))[0] + def get_varint_size(v): """For internal use only; no backwards-compatibility guarantees. diff --git a/sdks/python/apache_beam/coders/stream.pxd b/sdks/python/apache_beam/coders/stream.pxd index faee45aac997..fc179bb8c1b6 100644 --- a/sdks/python/apache_beam/coders/stream.pxd +++ b/sdks/python/apache_beam/coders/stream.pxd @@ -30,6 +30,7 @@ cdef class OutputStream(object): cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v) cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v) cpdef write_bigendian_double(self, double d) + cpdef write_bigendian_float(self, float d) cpdef bytes get(self) cpdef size_t size(self) except? -1 @@ -62,6 +63,7 @@ cdef class InputStream(object): cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1 cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1 cpdef double read_bigendian_double(self) except? -1 + cpdef float read_bigendian_float(self) except? -1 cpdef bytes read_all(self, bint nested=*) cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value) diff --git a/sdks/python/apache_beam/coders/stream.pyx b/sdks/python/apache_beam/coders/stream.pyx index e0d364c66b13..2a6451cb29ea 100644 --- a/sdks/python/apache_beam/coders/stream.pyx +++ b/sdks/python/apache_beam/coders/stream.pyx @@ -98,6 +98,9 @@ cdef class OutputStream(object): cpdef write_bigendian_double(self, double d): self.write_bigendian_int64((&d)[0]) + cpdef write_bigendian_float(self, float f): + self.write_bigendian_int32((&f)[0]) + cpdef bytes get(self): return self.data[:self.pos] @@ -224,6 +227,10 @@ cdef class InputStream(object): cdef libc.stdint.int64_t as_long = self.read_bigendian_int64() return (&as_long)[0] + cpdef float read_bigendian_float(self) except? -1: + cdef libc.stdint.int32_t as_int = self.read_bigendian_int32() + return (&as_int)[0] + cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value): """Returns the size of the given integer value when encode as a VarInt.""" cdef libc.stdint.int64_t varint_size = 0 diff --git a/sdks/python/apache_beam/coders/stream_test.py b/sdks/python/apache_beam/coders/stream_test.py index 7a9c8711e7ba..35b64eb95813 100644 --- a/sdks/python/apache_beam/coders/stream_test.py +++ b/sdks/python/apache_beam/coders/stream_test.py @@ -22,6 +22,8 @@ import math import unittest +import numpy as np + from apache_beam.coders import slow_stream @@ -99,6 +101,17 @@ def test_read_write_double(self): for v in values: self.assertEqual(v, in_s.read_bigendian_double()) + def test_read_write_float(self): + values = 0, 1, -1, 1e20, 1.0 / 3, math.pi, float('inf') + # Restrict to single precision before coder roundtrip + values = tuple(float(np.float32(v)) for v in values) + out_s = self.OutputStream() + for v in values: + out_s.write_bigendian_float(v) + in_s = self.InputStream(out_s.get()) + for v in values: + self.assertEqual(v, in_s.read_bigendian_float()) + def test_read_write_bigendian_int64(self): values = 0, 1, -1, 2**63 - 1, -2**63, int(2**61 * math.pi) out_s = self.OutputStream()