Skip to content

Commit

Permalink
Updated output formats to output BigDecimal fields as numeric values …
Browse files Browse the repository at this point in the history
…instead of a byte representation.
  • Loading branch information
fernst committed Jun 16, 2022
1 parent 54f4a0f commit efa31f4
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.format.io;

import com.google.gson.stream.JsonWriter;
import io.cdap.cdap.api.common.Bytes;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.common.io.Encoder;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;

/**
* Custom Datum Writer for Structured Records, this class writes BigDecimal values as numbers instead of byte arrays in
* the JSON output.
*/
public class BigDecimalAwareJsonStructuredRecordDatumWriter extends JsonStructuredRecordDatumWriter {
@Override
protected void encode(Encoder encoder, Schema schema, Object value) throws IOException {
Schema nonNullableSchema = schema.isNullable() ? schema.getNonNullable() : schema;
Schema.LogicalType logicalType = nonNullableSchema.getLogicalType();

if (value != null && logicalType == Schema.LogicalType.DECIMAL) {
BigDecimal bdValue = fromObject(value, nonNullableSchema);
getJsonWriter(encoder).value(bdValue);
return;
}

super.encode(encoder, schema, value);
}

/**
* Extract a BigDecimal value from a supplied object
* @param value object to convert into BigDecimal
* @param logicalTypeSchema logical type schema for this field
* @return value converted ingo a BigDecimal.
*/
protected BigDecimal fromObject(Object value, Schema logicalTypeSchema) {
// Return BigDecimal as is
if (value instanceof BigDecimal) {
return (BigDecimal) value;
}

// Handle the value as a byte buffer
int scale = logicalTypeSchema.getScale();
if (value instanceof ByteBuffer) {
return new BigDecimal(new BigInteger(Bytes.toBytes((ByteBuffer) value)), scale);
}

// Handle the BigDecimal value
try {
return new BigDecimal(new BigInteger((byte[]) value), scale);
} catch (ClassCastException e) {
throw new ClassCastException(String.format("Field '%s' is expected to be a decimal, but is a %s.",
logicalTypeSchema.getDisplayName(),
value.getClass().getSimpleName()));
}
}

private JsonWriter getJsonWriter(Encoder encoder) {
// Type already checked in the encode method, hence assuming the casting is fine.
return ((JsonEncoder) encoder).getJsonWriter();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.format.io;

import com.google.gson.stream.JsonWriter;

import java.io.IOException;
import java.io.Writer;
import java.math.BigDecimal;

/**
* JsonWriter instance which handles writing BigDecimal fields.
*/
public class BigDecimalAwareJsonWriter extends JsonWriter {
public BigDecimalAwareJsonWriter(Writer out) {
super(out);
}

@Override
public JsonWriter value(Number value) throws IOException {
if (value == null) {
return this.nullValue();
}

// Wrap BigDecimal fields in a wrapper which handles the conversion to String.
if (value instanceof BigDecimal) {
value = new BigDecimalWrapper((BigDecimal) value);
}

super.value(value);
return this;
}

/**
* Wrapper used to ensure that BigDecimals are generated as plain strings.
*/
private static class BigDecimalWrapper extends Number {
BigDecimal wrapped;

protected BigDecimalWrapper(BigDecimal wrapped) {
this.wrapped = wrapped;
}

@Override
public String toString() {
return wrapped.toPlainString();
}

@Override
public int intValue() {
return wrapped.intValue();
}

@Override
public long longValue() {
return wrapped.longValue();
}

@Override
public float floatValue() {
return wrapped.floatValue();
}

@Override
public double doubleValue() {
return wrapped.doubleValue();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format;

import com.google.gson.stream.JsonWriter;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.format.io.BigDecimalAwareJsonStructuredRecordDatumWriter;
import io.cdap.cdap.format.io.BigDecimalAwareJsonWriter;
import io.cdap.cdap.format.io.JsonEncoder;

import java.io.IOException;
import java.io.StringWriter;
import java.math.BigDecimal;
import java.util.stream.Collectors;

/**
* Structured record converted that handles writing decimal files as numbers in the output (instead of int arrays).
*/
public class BigDecimalAwareStructuredRecordStringConverter {
private static final BigDecimalAwareJsonStructuredRecordDatumWriter JSON_DATUM_WRITER =
new BigDecimalAwareJsonStructuredRecordDatumWriter();

/**
* Converts a {@link StructuredRecord} to a json string.
*/
public static String toJsonString(StructuredRecord record) throws IOException {
StringWriter strWriter = new StringWriter();
try (JsonWriter writer = new BigDecimalAwareJsonWriter(strWriter)) {
JSON_DATUM_WRITER.encode(record, new JsonEncoder(writer));
return strWriter.toString();
}
}

/**
* Converts a {@link StructuredRecord} to a delimited string.
*/
public static String toDelimitedString(final StructuredRecord record, String delimiter) {
return record.getSchema().getFields().stream()
.map(f -> mapField(record, f))
.collect(Collectors.joining(delimiter));
}

/**
* Get the string representation for a given record field. BigDecimals are printed as plain strings.
* @param record record to process
* @param field field to extract
* @return String representing the value for this field.
*/
private static String mapField(StructuredRecord record, Schema.Field field) {
String fieldName = field.getName();
Object value = record.get(fieldName);

// Return null value as empty string.
if (value == null) {
return "";
}

// Get the field schema.
Schema fieldSchema = field.getSchema();
if (fieldSchema.isNullable()) {
fieldSchema = fieldSchema.getNonNullable();
}

// Write decimal values as decimal strings.
if (fieldSchema.getLogicalType() != null && fieldSchema.getLogicalType() == Schema.LogicalType.DECIMAL) {
BigDecimal decimalValue = record.getDecimal(fieldName);

// Throw exception if the field is expected tu be decimal, but it could not be processed as such.
if (decimalValue == null) {
throw new IllegalArgumentException("Invalid schema for field " + fieldName + ". Decimal was expected.");
}
return decimalValue.toPlainString();
}

return value.toString();
}

private BigDecimalAwareStructuredRecordStringConverter() {
//inaccessible constructor for static class
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright © 2022 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.plugin.format;

import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.MathContext;

public class BigDecimalAwareStructuredRecordStringConverterTest {
private static final Schema SCHEMA = Schema.recordOf(
"rec",
Schema.Field.of("i", Schema.of(Schema.Type.INT)),
Schema.Field.of("f", Schema.of(Schema.Type.FLOAT)),
Schema.Field.of("bd", Schema.nullableOf(Schema.decimalOf(32, 7))),
Schema.Field.of("d", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("l", Schema.of(Schema.Type.LONG)));

private static final BigDecimal BD1 =
new BigDecimal("12398321.8127312", MathContext.DECIMAL128).setScale(7);

private static final BigDecimal BD2 =
new BigDecimal("1000000000000000.0", MathContext.DECIMAL128).setScale(7);

private static final StructuredRecord RECORD1 = StructuredRecord.builder(SCHEMA)
.set("i", 1)
.set("f", 256.1f)
.setDecimal("bd", BD1)
.set("d", 12345.45678d)
.set("l", 123456789L)
.build();

private static final StructuredRecord RECORD2 = StructuredRecord.builder(SCHEMA)
.set("i", 1)
.set("f", 256.1f)
.setDecimal("bd", BD2)
.set("d", 12345.45678d)
.set("l", 123456789L)
.build();

private static final StructuredRecord RECORD3 = StructuredRecord.builder(SCHEMA)
.set("i", 1)
.set("f", 256.1f)
.setDecimal("bd", null)
.set("d", 12345.45678d)
.set("l", 123456789L)
.build();

@Test
public void testToJsonString1() throws IOException {
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD1);
Assert.assertTrue(output.startsWith("{\"i\":1,"));
Assert.assertTrue(output.contains(",\"f\":256.1"));
Assert.assertTrue(output.contains(",\"bd\":12398321.8127312,"));
Assert.assertTrue(output.contains(",\"d\":12345.45678,"));
Assert.assertTrue(output.endsWith(",\"l\":123456789}"));
}

@Test
public void testToJsonString2() throws IOException {
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD2);
Assert.assertTrue(output.startsWith("{\"i\":1,"));
Assert.assertTrue(output.contains(",\"f\":256.1"));
Assert.assertTrue(output.contains(",\"bd\":1000000000000000.0000000,"));
Assert.assertTrue(output.contains(",\"d\":12345.45678,"));
Assert.assertTrue(output.endsWith(",\"l\":123456789}"));
}

@Test
public void testToJsonString3() throws IOException {
String output = BigDecimalAwareStructuredRecordStringConverter.toJsonString(RECORD3);
Assert.assertTrue(output.startsWith("{\"i\":1,"));
Assert.assertTrue(output.contains(",\"f\":256.1"));
Assert.assertTrue(output.contains(",\"bd\":null,"));
Assert.assertTrue(output.contains(",\"d\":12345.45678,"));
Assert.assertTrue(output.endsWith(",\"l\":123456789}"));
}

@Test
public void testToDelimitedStringUsingComma() {
String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, ",");
Assert.assertTrue(output.startsWith("1,"));
Assert.assertTrue(output.contains(",256.1,"));
Assert.assertTrue(output.contains(",12398321.8127312,"));
Assert.assertTrue(output.contains(",12345.45678,"));
Assert.assertTrue(output.endsWith(",123456789"));
}

@Test
public void testToDelimitedStringUsingPipe() {
String output = BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(RECORD1, "|");
Assert.assertTrue(output.startsWith("1|"));
Assert.assertTrue(output.contains("|256.1|"));
Assert.assertTrue(output.contains("|12398321.8127312|"));
Assert.assertTrue(output.contains("|12345.45678|"));
Assert.assertTrue(output.endsWith("|123456789"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.cdap.api.dataset.lib.KeyValue;
import io.cdap.cdap.format.StructuredRecordStringConverter;
import io.cdap.plugin.format.BigDecimalAwareStructuredRecordStringConverter;
import io.cdap.plugin.format.output.DelegatingOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
Expand Down Expand Up @@ -63,8 +63,9 @@ protected OutputFormat<NullWritable, Text> createDelegate() {
@Override
protected Function<StructuredRecord, KeyValue<NullWritable, Text>> getConversion(TaskAttemptContext context) {
String delimiter = getDelimiter(context.getConfiguration());
return record -> new KeyValue<>(NullWritable.get(),
new Text(StructuredRecordStringConverter.toDelimitedString(record, delimiter)));
return record -> new KeyValue<>(
NullWritable.get(), new Text(BigDecimalAwareStructuredRecordStringConverter.toDelimitedString(record, delimiter))
);
}

@Nullable
Expand Down
Loading

0 comments on commit efa31f4

Please sign in to comment.