Skip to content

Commit

Permalink
Add support for FLOAT to Python RowCoder
Browse files Browse the repository at this point in the history
  • Loading branch information
TheNeuralBit committed Aug 12, 2022
1 parent 7a9bb76 commit 699ecf4
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,17 @@ examples:

---

coder:
urn: "beam:coder:row:v1"
# f_float: float32
payload: "\n\r\n\x07f_float\x1a\x02\x10\x05\x12$8c97b6c5-69e5-4733-907b-26cd8edae612"
examples:
"\x01\x00\x00\x00\x00\x00": {f_float: "0.0"}
"\x01\x00?\x80\x00\x00": {f_float: "1.0"}
"\x01\x00@I\x0eV": {f_float: "3.1415"}

---

coder:
urn: "beam:coder:sharded_key:v1"
components: [{urn: "beam:coder:string_utf8:v1"}]
Expand Down
16 changes: 16 additions & 0 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,22 @@ def estimate_size(self, unused_value, nested=False):
if unused_value is not None else 0)


class SinglePrecisionFloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def encode_to_stream(self, value, out, nested):
# type: (float, create_OutputStream, bool) -> None
out.write_bigendian_float(value)

def decode_from_stream(self, in_stream, nested):
# type: (create_InputStream, bool) -> float
return in_stream.read_bigendian_float()

def estimate_size(self, unused_value, nested=False):
# type: (Any, bool) -> int
# A double is encoded as 8 bytes, regardless of nesting.
return 4


class FloatCoderImpl(StreamCoderImpl):
"""For internal use only; no backwards-compatibility guarantees."""
def encode_to_stream(self, value, out, nested):
Expand Down
28 changes: 27 additions & 1 deletion sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
'ProtoCoder',
'ProtoPlusCoder',
'ShardedKeyCoder',
'SinglePrecisionFloatCoder',
'SingletonCoder',
'StrUtf8Coder',
'TimestampCoder',
Expand Down Expand Up @@ -678,8 +679,33 @@ def __hash__(self):
Coder.register_structured_urn(common_urns.coders.VARINT.urn, VarIntCoder)


class SinglePrecisionFloatCoder(FastCoder):
"""A coder used for single-precision floating-point values."""
def _create_impl(self):
return coder_impl.SinglePrecisionFloatCoderImpl()

def is_deterministic(self):
# type: () -> bool
return True

def to_type_hint(self):
return float

def __eq__(self, other):
return type(self) == type(other)

def __hash__(self):
return hash(type(self))


class FloatCoder(FastCoder):
"""A coder used for floating-point values."""
"""A coder used for **double-precision** floating-point values.
Note that the name "FloatCoder" is in reference to Python's ``float`` built-in
which is generally implemented using C doubles. See
:class:`SinglePrecisionFloatCoder` for a single-precision version of this
coder.
"""
def _create_impl(self):
return coder_impl.FloatCoderImpl()

Expand Down
3 changes: 2 additions & 1 deletion sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ def tearDownClass(cls):
coders.ListLikeCoder,
coders.ProtoCoder,
coders.ProtoPlusCoder,
coders.ToBytesCoder
coders.SinglePrecisionFloatCoder,
coders.ToBytesCoder,
])
cls.seen_nested -= set(
[coders.ProtoCoder, coders.ProtoPlusCoder, CustomCoder])
Expand Down
3 changes: 3 additions & 0 deletions sdks/python/apache_beam/coders/row_coder.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from apache_beam.coders.coders import IterableCoder
from apache_beam.coders.coders import MapCoder
from apache_beam.coders.coders import NullableCoder
from apache_beam.coders.coders import SinglePrecisionFloatCoder
from apache_beam.coders.coders import StrUtf8Coder
from apache_beam.coders.coders import VarIntCoder
from apache_beam.portability import common_urns
Expand Down Expand Up @@ -148,6 +149,8 @@ def _nonnull_coder_from_type(field_type):
if type_info == "atomic_type":
if field_type.atomic_type in (schema_pb2.INT32, schema_pb2.INT64):
return VarIntCoder()
elif field_type.atomic_type == schema_pb2.FLOAT:
return SinglePrecisionFloatCoder()
elif field_type.atomic_type == schema_pb2.DOUBLE:
return FloatCoder()
elif field_type.atomic_type == schema_pb2.STRING:
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/coders/slow_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def write_bigendian_int32(self, v):
def write_bigendian_double(self, v):
self.write(struct.pack('>d', v))

def write_bigendian_float(self, v):
self.write(struct.pack('>f', v))

def get(self):
# type: () -> bytes
return b''.join(self.data)
Expand Down Expand Up @@ -172,6 +175,9 @@ def read_bigendian_int32(self):
def read_bigendian_double(self):
return struct.unpack('>d', self.read(8))[0]

def read_bigendian_float(self):
return struct.unpack('>f', self.read(4))[0]


def get_varint_size(v):
"""For internal use only; no backwards-compatibility guarantees.
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/coders/stream.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cdef class OutputStream(object):
cpdef write_bigendian_uint64(self, libc.stdint.uint64_t signed_v)
cpdef write_bigendian_int32(self, libc.stdint.int32_t signed_v)
cpdef write_bigendian_double(self, double d)
cpdef write_bigendian_float(self, float d)

cpdef bytes get(self)
cpdef size_t size(self) except? -1
Expand Down Expand Up @@ -62,6 +63,7 @@ cdef class InputStream(object):
cpdef libc.stdint.uint64_t read_bigendian_uint64(self) except? -1
cpdef libc.stdint.int32_t read_bigendian_int32(self) except? -1
cpdef double read_bigendian_double(self) except? -1
cpdef float read_bigendian_float(self) except? -1
cpdef bytes read_all(self, bint nested=*)

cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value)
7 changes: 7 additions & 0 deletions sdks/python/apache_beam/coders/stream.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ cdef class OutputStream(object):
cpdef write_bigendian_double(self, double d):
self.write_bigendian_int64((<libc.stdint.int64_t*><char*>&d)[0])

cpdef write_bigendian_float(self, float f):
self.write_bigendian_int32((<libc.stdint.int32_t*><char*>&f)[0])

cpdef bytes get(self):
return self.data[:self.pos]

Expand Down Expand Up @@ -224,6 +227,10 @@ cdef class InputStream(object):
cdef libc.stdint.int64_t as_long = self.read_bigendian_int64()
return (<double*><char*>&as_long)[0]

cpdef float read_bigendian_float(self) except? -1:
cdef libc.stdint.int32_t as_int = self.read_bigendian_int32()
return (<float*><char*>&as_int)[0]

cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
"""Returns the size of the given integer value when encode as a VarInt."""
cdef libc.stdint.int64_t varint_size = 0
Expand Down
13 changes: 13 additions & 0 deletions sdks/python/apache_beam/coders/stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import math
import unittest

import numpy as np

from apache_beam.coders import slow_stream


Expand Down Expand Up @@ -99,6 +101,17 @@ def test_read_write_double(self):
for v in values:
self.assertEqual(v, in_s.read_bigendian_double())

def test_read_write_float(self):
values = 0, 1, -1, 1e20, 1.0 / 3, math.pi, float('inf')
# Restrict to single precision before coder roundtrip
values = tuple(float(np.float32(v)) for v in values)
out_s = self.OutputStream()
for v in values:
out_s.write_bigendian_float(v)
in_s = self.InputStream(out_s.get())
for v in values:
self.assertEqual(v, in_s.read_bigendian_float())

def test_read_write_bigendian_int64(self):
values = 0, 1, -1, 2**63 - 1, -2**63, int(2**61 * math.pi)
out_s = self.OutputStream()
Expand Down

0 comments on commit 699ecf4

Please sign in to comment.