From 14a4992248cd50204f012502fa0f99f9230abf42 Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Wed, 15 Jun 2022 23:14:49 -0400 Subject: [PATCH] Updated output formats to output BigDecimal fields as numeric values instead of a byte representation. --- ...lAwareJsonStructuredRecordDatumWriter.java | 80 +++++++++ .../format/io/BigDecimalAwareJsonWriter.java | 83 ++++++++++ ...lAwareStructuredRecordStringConverter.java | 96 +++++++++++ ...reStructuredRecordStringConverterTest.java | 156 ++++++++++++++++++ .../StructuredDelimitedOutputFormat.java | 7 +- .../output/StructuredJsonOutputFormat.java | 5 +- 6 files changed, 422 insertions(+), 5 deletions(-) create mode 100644 format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonStructuredRecordDatumWriter.java create mode 100644 format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonWriter.java create mode 100644 format-common/src/main/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverter.java create mode 100644 format-common/src/test/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverterTest.java diff --git a/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonStructuredRecordDatumWriter.java b/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonStructuredRecordDatumWriter.java new file mode 100644 index 000000000..2c7bec335 --- /dev/null +++ b/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonStructuredRecordDatumWriter.java @@ -0,0 +1,80 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.format.io; + +import com.google.gson.stream.JsonWriter; +import io.cdap.cdap.api.common.Bytes; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.common.io.Encoder; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.nio.ByteBuffer; + +/** + * Custom Datum Writer for Structured Records, this class writes BigDecimal values as numbers instead of byte arrays in + * the JSON output. + */ +public class BigDecimalAwareJsonStructuredRecordDatumWriter extends JsonStructuredRecordDatumWriter { + @Override + protected void encode(Encoder encoder, Schema schema, Object value) throws IOException { + Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema; + Schema.LogicalType logicalType = nonNullableSchema.getLogicalType(); + + if (value != null && logicalType == Schema.LogicalType.DECIMAL) { + BigDecimal bdValue = fromObject(value, nonNullableSchema); + getJsonWriter(encoder).value(bdValue); + return; + } + + super.encode(encoder, schema, value); + } + + /** + * Extract a BigDecimal value from a supplied object + * @param value object to convert into BigDecimal + * @param logicalTypeSchema logical type schema for this field + * @return value converted ingo a BigDecimal. + */ + protected BigDecimal fromObject(Object value, Schema logicalTypeSchema) { + // Return BigDecimal as is + if (value instanceof BigDecimal) { + return (BigDecimal) value; + } + + // Handle the value as a byte buffer + int scale = logicalTypeSchema.getScale(); + if (value instanceof ByteBuffer) { + return new BigDecimal(new BigInteger(Bytes.toBytes((ByteBuffer) value)), scale); + } + + // Handle the BigDecimal value + try { + return new BigDecimal(new BigInteger((byte[]) value), scale); + } catch (ClassCastException e) { + throw new ClassCastException(String.format("Field '%s' is expected to be a decimal, but is a %s.", + logicalTypeSchema.getDisplayName(), + value.getClass().getSimpleName())); + } + } + + private JsonWriter getJsonWriter(Encoder encoder) { + // Type already checked in the encode method, hence assuming the casting is fine. + return ((JsonEncoder) encoder).getJsonWriter(); + } +} diff --git a/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonWriter.java b/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonWriter.java new file mode 100644 index 000000000..674cdf008 --- /dev/null +++ b/format-common/src/main/java/io/cdap/cdap/format/io/BigDecimalAwareJsonWriter.java @@ -0,0 +1,83 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.cdap.format.io; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.io.Writer; +import java.math.BigDecimal; + +/** + * JsonWriter instance which handles writing BigDecimal fields. + */ +public class BigDecimalAwareJsonWriter extends JsonWriter { + public BigDecimalAwareJsonWriter(Writer out) { + super(out); + } + + @Override + public JsonWriter value(Number value) throws IOException { + if (value == null) { + return this.nullValue(); + } + + // Wrap BigDecimal fields in a wrapper which handles the conversion to String. + if (value instanceof BigDecimal) { + value = new BigDecimalWrapper((BigDecimal) value); + } + + super.value(value); + return this; + } + + /** + * Wrapper used to ensure that BigDecimals are generated as plain strings. + */ + private static class BigDecimalWrapper extends Number { + BigDecimal wrapped; + + protected BigDecimalWrapper(BigDecimal wrapped) { + this.wrapped = wrapped; + } + + @Override + public String toString() { + return wrapped.toPlainString(); + } + + @Override + public int intValue() { + return wrapped.intValue(); + } + + @Override + public long longValue() { + return wrapped.longValue(); + } + + @Override + public float floatValue() { + return wrapped.floatValue(); + } + + @Override + public double doubleValue() { + return wrapped.doubleValue(); + } + } +} diff --git a/format-common/src/main/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverter.java b/format-common/src/main/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverter.java new file mode 100644 index 000000000..20ee4154e --- /dev/null +++ b/format-common/src/main/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverter.java @@ -0,0 +1,96 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format; + +import com.google.gson.stream.JsonWriter; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.format.io.BigDecimalAwareJsonStructuredRecordDatumWriter; +import io.cdap.cdap.format.io.BigDecimalAwareJsonWriter; +import io.cdap.cdap.format.io.JsonEncoder; + +import java.io.IOException; +import java.io.StringWriter; +import java.math.BigDecimal; +import java.util.stream.Collectors; + +/** + * Structured record converted that handles writing decimal files as numbers in the output (instead of int arrays). + */ +public class BigDecimalAwareStructuredRecordStringConverter { + private static final BigDecimalAwareJsonStructuredRecordDatumWriter JSON_DATUM_WRITER = + new BigDecimalAwareJsonStructuredRecordDatumWriter(); + + /** + * Converts a {@link StructuredRecord} to a json string. + */ + public static String toJsonString(StructuredRecord record) throws IOException { + StringWriter strWriter = new StringWriter(); + try (JsonWriter writer = new BigDecimalAwareJsonWriter(strWriter)) { + JSON_DATUM_WRITER.encode(record, new JsonEncoder(writer)); + return strWriter.toString(); + } + } + + /** + * Converts a {@link StructuredRecord} to a delimited string. + */ + public static String toDelimitedString(final StructuredRecord record, String delimiter) { + return record.getSchema().getFields().stream() + .map(f -> mapField(record, f)) + .collect(Collectors.joining(delimiter)); + } + + /** + * Get the string representation for a given record field. BigDecimals are printed as plain strings. + * @param record record to process + * @param field field to extract + * @return String representing the value for this field. + */ + private static String mapField(StructuredRecord record, Schema.Field field) { + String fieldName = field.getName(); + Object value = record.get(fieldName); + + // Return null value as empty string. + if (value == null) { + return ""; + } + + // Get the field schema. + Schema fieldSchema = field.getSchema(); + if (fieldSchema.isNullable()) { + fieldSchema = fieldSchema.getNonNullable(); + } + + // Write decimal values as decimal strings. + if (fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == Schema.LogicalType.DECIMAL) { + BigDecimal decimalValue = record.getDecimal(fieldName); + + // Throw exception if the field is expected tu be decimal, but it could not be processed as such. + if (decimalValue == null) { + throw new IllegalArgumentException("Invalid schema for field " + fieldName + ". Decimal was expected."); + } + return decimalValue.toPlainString(); + } + + return value.toString(); + } + + private BigDecimalAwareStructuredRecordStringConverter() { + //inaccessible constructor for static class + } +} diff --git a/format-common/src/test/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverterTest.java b/format-common/src/test/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverterTest.java new file mode 100644 index 000000000..1ae6b0972 --- /dev/null +++ b/format-common/src/test/java/io/cdap/plugin/format/BigDecimalAwareStructuredRecordStringConverterTest.java @@ -0,0 +1,156 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.format; + +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.MathContext; + +public class BigDecimalAwareStructuredRecordStringConverterTest { + private static final Schema SCHEMA = Schema.recordOf( + "rec", + Schema.Field.of("i", Schema.of(Schema.Type.INT)), + Schema.Field.of("f", Schema.of(Schema.Type.FLOAT)), + Schema.Field.of("bd", Schema.nullableOf(Schema.decimalOf(32, 7))), + Schema.Field.of("d", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("l", Schema.of(Schema.Type.LONG))); + + private static final BigDecimal BD1 = + new BigDecimal("12398321.8127312", MathContext.DECIMAL128).setScale(7); + + private static final BigDecimal BD2 = + new BigDecimal("1000000000000000.0", MathContext.DECIMAL128).setScale(7); + + private static final StructuredRecord RECORD1 = StructuredRecord.builder(SCHEMA) + .set("i", 1) + .set("f", 256.1f) + .setDecimal("bd", BD1) + .set("d", 12345.45678d) + .set("l", 123456789L) + .build(); + + private static final StructuredRecord RECORD2 = StructuredRecord.builder(SCHEMA) + .set("i", 1) + .set("f", 256.1f) + .setDecimal("bd", BD2) + .set("d", 12345.45678d) + .set("l", 123456789L) + .build(); + + private static final StructuredRecord RECORD3 = StructuredRecord.builder(SCHEMA) + .set("i", 1) + .set("f", 256.1f) + .setDecimal("bd", null) + .set("d", 12345.45678d) + .set("l", 123456789L) + .build(); + + @Test + public void testToJsonString1() throws IOException { + String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD1); + Assert.assertTrue(output.startsWith("{\"i\":1,")); + Assert.assertTrue(output.contains(",\"f\":256.1")); + Assert.assertTrue(output.contains(",\"bd\":12398321.8127312,")); + Assert.assertTrue(output.contains(",\"d\":12345.45678,")); + Assert.assertTrue(output.endsWith(",\"l\":123456789}")); + } + + @Test + public void testToJsonString2() throws IOException { + String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD2); + Assert.assertTrue(output.startsWith("{\"i\":1,")); + Assert.assertTrue(output.contains(",\"f\":256.1")); + Assert.assertTrue(output.contains(",\"bd\":1000000000000000.0000000,")); + Assert.assertTrue(output.contains(",\"d\":12345.45678,")); + Assert.assertTrue(output.endsWith(",\"l\":123456789}")); + } + + @Test + public void testToJsonString3() throws IOException { + String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD3); + Assert.assertTrue(output.startsWith("{\"i\":1,")); + Assert.assertTrue(output.contains(",\"f\":256.1")); + Assert.assertTrue(output.contains(",\"bd\":null,")); + Assert.assertTrue(output.contains(",\"d\":12345.45678,")); + Assert.assertTrue(output.endsWith(",\"l\":123456789}")); + } + + @Test + public void testToDelimitedStringUsingComma1() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, ","); + Assert.assertTrue(output.startsWith("1,")); + Assert.assertTrue(output.contains(",256.1,")); + Assert.assertTrue(output.contains(",12398321.8127312,")); + Assert.assertTrue(output.contains(",12345.45678,")); + Assert.assertTrue(output.endsWith(",123456789")); + } + + @Test + public void testToDelimitedStringUsingComma2() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD2, ","); + Assert.assertTrue(output.startsWith("1,")); + Assert.assertTrue(output.contains(",256.1,")); + Assert.assertTrue(output.contains(",1000000000000000.0000000,")); + Assert.assertTrue(output.contains(",12345.45678,")); + Assert.assertTrue(output.endsWith(",123456789")); + } + + @Test + public void testToDelimitedStringUsingComma3() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD3, ","); + Assert.assertTrue(output.startsWith("1,")); + Assert.assertTrue(output.contains(",256.1,")); + Assert.assertTrue(output.contains(",,")); + Assert.assertTrue(output.contains(",12345.45678,")); + Assert.assertTrue(output.endsWith(",123456789")); + } + + @Test + public void testToDelimitedStringUsingPipe1() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, "|"); + Assert.assertTrue(output.startsWith("1|")); + Assert.assertTrue(output.contains("|256.1|")); + Assert.assertTrue(output.contains("|12398321.8127312|")); + Assert.assertTrue(output.contains("|12345.45678|")); + Assert.assertTrue(output.endsWith("|123456789")); + } + + @Test + public void testToDelimitedStringUsingPipe2() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD2, "|"); + Assert.assertTrue(output.startsWith("1|")); + Assert.assertTrue(output.contains("|256.1|")); + Assert.assertTrue(output.contains("|1000000000000000.0000000|")); + Assert.assertTrue(output.contains("|12345.45678|")); + Assert.assertTrue(output.endsWith("|123456789")); + } + + @Test + public void testToDelimitedStringUsingPipe3() { + String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD3, "|"); + Assert.assertTrue(output.startsWith("1|")); + Assert.assertTrue(output.contains("|256.1|")); + Assert.assertTrue(output.contains("||")); + Assert.assertTrue(output.contains("|12345.45678|")); + Assert.assertTrue(output.endsWith("|123456789")); + } +} diff --git a/format-delimited/src/main/java/io/cdap/plugin/format/delimited/output/StructuredDelimitedOutputFormat.java b/format-delimited/src/main/java/io/cdap/plugin/format/delimited/output/StructuredDelimitedOutputFormat.java index bf280c38a..014321a85 100644 --- a/format-delimited/src/main/java/io/cdap/plugin/format/delimited/output/StructuredDelimitedOutputFormat.java +++ b/format-delimited/src/main/java/io/cdap/plugin/format/delimited/output/StructuredDelimitedOutputFormat.java @@ -19,7 +19,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.dataset.lib.KeyValue; -import io.cdap.cdap.format.StructuredRecordStringConverter; +import io.cdap.plugin.format.BigDecimalAwareStructuredRecordStringConverter; import io.cdap.plugin.format.output.DelegatingOutputFormat; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -63,8 +63,9 @@ protected OutputFormat createDelegate() { @Override protected Function> getConversion(TaskAttemptContext context) { String delimiter = getDelimiter(context.getConfiguration()); - return record -> new KeyValue<>(NullWritable.get(), - new Text(StructuredRecordStringConverter.toDelimitedString(record, delimiter))); + return record -> new KeyValue<>( + NullWritable.get(), new Text(BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(record, delimiter)) + ); } @Nullable diff --git a/format-json/src/main/java/io/cdap/plugin/format/json/output/StructuredJsonOutputFormat.java b/format-json/src/main/java/io/cdap/plugin/format/json/output/StructuredJsonOutputFormat.java index 972857ab9..e472d2a96 100644 --- a/format-json/src/main/java/io/cdap/plugin/format/json/output/StructuredJsonOutputFormat.java +++ b/format-json/src/main/java/io/cdap/plugin/format/json/output/StructuredJsonOutputFormat.java @@ -18,7 +18,7 @@ import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.dataset.lib.KeyValue; -import io.cdap.cdap.format.StructuredRecordStringConverter; +import io.cdap.plugin.format.BigDecimalAwareStructuredRecordStringConverter; import io.cdap.plugin.format.output.DelegatingOutputFormat; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; @@ -43,7 +43,8 @@ protected OutputFormat createDelegate() { protected Function> getConversion(TaskAttemptContext context) { return record -> { try { - return new KeyValue<>(NullWritable.get(), new Text(StructuredRecordStringConverter.toJsonString(record))); + return new KeyValue<>(NullWritable.get(), + new Text(BigDecimalAwareStructuredRecordStringConverter.toJsonString(record))); } catch (IOException e) { throw new RuntimeException("Unable to convert record into a json object", e); }