Skip to content

Commit

Permalink
Add standard coder and tests
Browse files Browse the repository at this point in the history
* Add decimal logical type in schema.proto

* Add unit test for FixedPrecisionNumeric

* Assert FixedPrecisionNumeric argument row assinable

* Add decimal test case in standard_coders.yaml
  • Loading branch information
Abacn committed Sep 27, 2022
1 parent 5158b36 commit 3e60371
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,18 @@ examples:

---

coder:
urn: "beam:coder:row:v1"
# f_float: float32, f_decimal: logical(decimal)
payload: "\n\r\n\x07f_float\x1a\x02\x10\x05\n1\n\tf_decimal\x1a$:\"\n\x1cbeam:logical_type:decimal:v1\x1a\x02\x10\t\x12$800c44ae-a1b7-4def-bbf6-6217cca89ec4"
examples:
"\x02\x00\x00\x00\x00\x00\x01\x01\x00": {f_float: "0.0", f_decimal: "0.0"}
"\x02\x00?\x80\x00\x00\x01\x01\n": {f_float: "1.0", f_decimal: "1.0"}
"\x02\x00@I\x0eV\x04\x02z\xb7": {f_float: "3.1415", f_decimal: "3.1415"}
"\x02\x00\xc2\xc8>\xfa\x03\x03\xfex\xe5": {f_float: "-100.123", f_decimal: "-100.123"}

---

coder:
urn: "beam:coder:sharded_key:v1"
components: [{urn: "beam:coder:string_utf8:v1"}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ message LogicalTypes {
// corresponds to chronological order.
MILLIS_INSTANT = 2 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:millis_instant:v1"];

// A URN for Decimal type
// - Representation type: BYTES
// - A decimal number with specified scale and value. Its BYTES
// representation consists of an integer (INT32) scale followed by a
// two's complement encoded big integer.
DECIMAL = 3 [(org.apache.beam.model.pipeline.v1.beam_urn) =
"beam:logical_type:decimal:v1"];
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -425,6 +426,8 @@ private static Object parseField(Object value, Schema.FieldType fieldType) {
case DATETIME:
// convert shifted millis to epoch millis as in InstantCoder
return new Instant((Long) value + -9223372036854775808L);
case DECIMAL:
return new BigDecimal((String) value);
case BYTES:
// extract String as byte[]
return ((String) value).getBytes(StandardCharsets.ISO_8859_1);
Expand Down Expand Up @@ -468,7 +471,7 @@ private static Object parseField(Object value, Schema.FieldType fieldType) {
return fieldType
.getLogicalType()
.toInputType(parseField(value, fieldType.getLogicalType().getBaseType()));
default: // DECIMAL
default:
throw new IllegalArgumentException("Unsupported type name: " + fieldType.getTypeName());
}
}
Expand Down
1 change: 1 addition & 0 deletions sdks/go/test/regression/coders/fromyaml/fromyaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var filteredCases = []struct{ filter, reason string }{
{"logical", "BEAM-9615: Support logical types"},
{"30ea5a25-dcd8-4cdb-abeb-5332d15ab4b9", "https://github.com/apache/beam/issues/21206: Support encoding position."},
{"80be749a-5700-4ede-89d8-dd9a4433a3f8", "https://github.com/apache/beam/issues/19817: Support millis_instant."},
{"800c44ae-a1b7-4def-bbf6-6217cca89ec4", "https://github.com/apache/beam/issues/19817: Support decimal."},
}

// Coder is a representation a serialized beam coder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.math.BigDecimal;
import java.math.MathContext;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.SchemaApi;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.values.Row;
Expand All @@ -30,14 +32,21 @@
public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
public static final String IDENTIFIER = "beam:logical_type:fixed_decimal:v1";

// TODO(https://github.com/apache/beam/issues/19817) implement beam:logical_type:decimal:v1 as
// TODO(https://github.com/apache/beam/issues/23374) implement beam:logical_type:decimal:v1 as
// CoderLogicalType (once CoderLogicalType is implemented).
/**
* Identifier of the unspecified precision numeric type. It corresponds to Java SDK's {@link
* FieldType#DECIMAL}. It is the underlying representation type of FixedPrecisionNumeric logical
* type in order to be compatible with existing Java field types.
*/
public static final String BASE_IDENTIFIER = "beam:logical_type:decimal:v1";
public static final String BASE_IDENTIFIER =
SchemaApi.LogicalTypes.Enum.DECIMAL
.getValueDescriptor()
.getOptions()
.getExtension(RunnerApi.beamUrn);

private static final Schema SCHEMA =
Schema.builder().addInt32Field("precision").addInt32Field("scale").build();

private final int precision;
private final int scale;
Expand All @@ -47,8 +56,7 @@ public class FixedPrecisionNumeric extends PassThroughLogicalType<BigDecimal> {
* indicates unspecified precision.
*/
public static FixedPrecisionNumeric of(int precision, int scale) {
Schema schema = Schema.builder().addInt32Field("precision").addInt32Field("scale").build();
return new FixedPrecisionNumeric(schema, precision, scale);
return new FixedPrecisionNumeric(precision, scale);
}

/** Create a FixedPrecisionNumeric instance with specified scale and unspecified precision. */
Expand All @@ -58,6 +66,11 @@ public static FixedPrecisionNumeric of(int scale) {

/** Create a FixedPrecisionNumeric instance with specified argument row. */
public static FixedPrecisionNumeric of(Row row) {
checkArgument(
row.getSchema().assignableTo(SCHEMA),
"Row has an incompatible schema to construct the logical type object: %s",
row.getSchema());

final Integer precision = row.getInt32("precision");
final Integer scale = row.getInt32("scale");
checkArgument(
Expand All @@ -67,28 +80,30 @@ public static FixedPrecisionNumeric of(Row row) {
return of(firstNonNull(precision, -1), firstNonNull(scale, 0));
}

private FixedPrecisionNumeric(Schema schema, int precision, int scale) {
private FixedPrecisionNumeric(int precision, int scale) {
super(
IDENTIFIER,
FieldType.row(schema),
Row.withSchema(schema).addValues(precision, scale).build(),
FieldType.row(SCHEMA),
Row.withSchema(SCHEMA).addValues(precision, scale).build(),
FieldType.DECIMAL);
this.precision = precision;
this.scale = scale;
}

@Override
public BigDecimal toInputType(BigDecimal base) {
checkArgument(
base == null
|| (base.precision() <= precision && base.scale() <= scale)
// for cases when received values can be safely coerced to the schema
|| base.round(new MathContext(precision)).compareTo(base) == 0,
"Expected BigDecimal base to be null or have precision <= %s (was %s), scale <= %s (was %s)",
precision,
(base == null) ? null : base.precision(),
scale,
(base == null) ? null : base.scale());
if (precision != -1) {
// check value not causing overflow when precision is fixed.
checkArgument(
base == null
|| (base.precision() <= precision && base.scale() <= scale)
|| base.round(new MathContext(precision)).compareTo(base) == 0,
"Expected BigDecimal base to be null or have precision <= %s (was %s), scale <= %s (was %s)",
precision,
(base == null) ? null : base.precision(),
scale,
(base == null) ? null : base.scale());
}
return base;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
package org.apache.beam.sdk.schemas.logicaltypes;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand Down Expand Up @@ -130,4 +133,51 @@ public void testSchema() {
schemaValue,
new SchemaLogicalType().toInputType(new SchemaLogicalType().toBaseType(schemaValue)));
}

@Test
public void testFixedPrecisionNumeric() {
final int precision = 10;
final int scale = 2;
Random random = new Random();

Schema argumentSchema =
Schema.builder().addInt32Field("precision").addInt32Field("scale").build();

// invalid schema
final Schema invalidArgumentSchema =
Schema.builder().addInt32Field("invalid").addInt32Field("schema").build();
assertThrows(
IllegalArgumentException.class,
() ->
FixedPrecisionNumeric.of(
Row.withSchema(invalidArgumentSchema).addValues(precision, scale).build()));

// FixedPrecisionNumeric specified precision and scale
FixedPrecisionNumeric numeric =
FixedPrecisionNumeric.of(
Row.withSchema(argumentSchema).addValues(precision, scale).build());
Schema schema = Schema.builder().addLogicalTypeField("decimal", numeric).build();

// check argument valid case
BigDecimal decimal = BigDecimal.valueOf(random.nextInt(), scale);
Row row = Row.withSchema(schema).addValues(decimal).build();
assertEquals(decimal, row.getLogicalTypeValue(0, BigDecimal.class));

// check argument invalid case
decimal = BigDecimal.valueOf(random.nextLong() + 100_000_000_000L, scale);
assertThrows(IllegalArgumentException.class, Row.withSchema(schema).addValues(decimal)::build);

// FixedPrecisionNumeric without specifying precision
numeric = FixedPrecisionNumeric.of(scale);
schema = Schema.builder().addLogicalTypeField("decimal", numeric).build();

// check argument always valid
decimal = BigDecimal.valueOf(random.nextInt(), scale);
row = Row.withSchema(schema).addValues(decimal).build();
assertEquals(decimal, row.getLogicalTypeValue(0, BigDecimal.class));

decimal = BigDecimal.valueOf(random.nextLong() + 100_000_000_000L, scale);
row = Row.withSchema(schema).addValues(decimal).build();
assertEquals(decimal, row.getLogicalTypeValue(0, BigDecimal.class));
}
}
1 change: 1 addition & 0 deletions sdks/python/apache_beam/portability/common_urns.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@

java_class_lookup = ExpansionMethods.Enum.JAVA_CLASS_LOOKUP

decimal = LogicalTypes.Enum.DECIMAL
micros_instant = LogicalTypes.Enum.MICROS_INSTANT
millis_instant = LogicalTypes.Enum.MILLIS_INSTANT
python_callable = LogicalTypes.Enum.PYTHON_CALLABLE
24 changes: 10 additions & 14 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
representation=self.typing_to_runner_api(
logical_type.representation_type())))
else:
# TODO(bhulette,yathu): Complete support for logical types that require
# arguments.
# TODO(https://github.com/apache/beam/issues/23373): Complete support
# for logical types that require arguments.
# This include implement SchemaTranslation.value_to_runner_api (see Java
# SDK's SchemaTranslation.fieldValueToProto)
return schema_pb2.FieldType(
Expand Down Expand Up @@ -773,14 +773,12 @@ def to_language_type(self, value):


class DecimalLogicalType(NoArgumentLogicalType[decimal.Decimal, bytes]):
def __init__(self):
from apache_beam.coders.coder_impl import DecimalCoderImpl

self.coder_impl = DecimalCoderImpl()

"""A logical type for decimal objects handling values consistent with that
encoded by ``BigDecimalCoder`` in the Java SDK.
"""
@classmethod
def urn(cls):
return "beam:logical_type:decimal:v1"
return common_urns.decimal.urn

@classmethod
def representation_type(cls):
Expand All @@ -793,13 +791,11 @@ def language_type(cls):

def to_representation_type(self, value):
# type: (decimal.Decimal) -> bytes

return self.coder_impl.encode(value)
return str(value).encode()

def to_language_type(self, value):
# type: (bytes) -> decimal.Decimal

return self.coder_impl.decode(value)
return decimal.Decimal(value.decode())


@LogicalType.register_logical_type
Expand Down Expand Up @@ -829,12 +825,12 @@ def language_type(cls):
def to_representation_type(self, value):
# type: (decimal.Decimal) -> bytes

return DecimalLogicalType().coder_impl.encode(value)
return DecimalLogicalType().to_representation_type(value)

def to_language_type(self, value):
# type: (bytes) -> decimal.Decimal

return DecimalLogicalType().coder_impl.decode(value)
return DecimalLogicalType().to_language_type(value)

@classmethod
def argument_type(cls):
Expand Down

0 comments on commit 3e60371

Please sign in to comment.