From 38f222b59e8ca0f48666dfcd019057e47c80bd7a Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Tue, 25 Jan 2022 21:31:56 -0500 Subject: [PATCH 1/9] Initial BigDecimal Kafka Avro Publish support. --- .../java/io/deephaven/kafka/KafkaTools.java | 2 +- .../GenericRecordKeyOrValueSerializer.java | 141 ++++++++++++++---- 2 files changed, 109 insertions(+), 34 deletions(-) diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 74033498784..f78ee7d902f 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -1251,7 +1251,7 @@ private static KeyOrValueSerializer getAvroSerializer( @NotNull final Produce.KeyOrValueSpec.Avro avroSpec, @NotNull final String[] columnNames) { return new GenericRecordKeyOrValueSerializer( - t, avroSpec.schema, columnNames, avroSpec.timestampFieldName); + t, avroSpec.schema, columnNames, avroSpec.timestampFieldName, avroSpec.columnProperties); } private static KeyOrValueSerializer getJsonSerializer( diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java index 2bfb5ff57ef..595f286d4de 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java @@ -4,7 +4,6 @@ import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.Table; import io.deephaven.kafka.KafkaSchemaUtils; -import io.deephaven.kafka.ingest.GenericRecordUtil; import io.deephaven.time.DateTime; import io.deephaven.engine.util.string.StringUtils; import io.deephaven.engine.table.ColumnSource; @@ -19,6 +18,10 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.math.MathContext; +import java.math.RoundingMode; import java.util.*; import java.util.concurrent.TimeUnit; @@ -36,10 +39,13 @@ public class GenericRecordKeyOrValueSerializer implements KeyOrValueSerializer fieldProcessors = new ArrayList<>(); - public GenericRecordKeyOrValueSerializer(final Table source, + public GenericRecordKeyOrValueSerializer( + final Table source, final Schema schema, final String[] columnNames, - final String timestampFieldName) { + final String timestampFieldName, + final Properties columnProperties + ) { this.source = source; if (schema.isUnion()) { throw new UnsupportedOperationException("Schemas defined as a union of records are not supported"); @@ -61,11 +67,7 @@ public GenericRecordKeyOrValueSerializer(final Table source, if (haveTimestampField && timestampFieldName.equals(field.name())) { continue; } - if (columnNames == null) { - makeFieldProcessor(field, null); - } else { - makeFieldProcessor(field, columnNames[i]); - } + makeFieldProcessor(field, (columnNames == null) ? null : columnNames[i], columnProperties); ++i; } @@ -299,6 +301,32 @@ private static GenericRecordFieldProcessor makeDateTimeToMicrosFieldProcessor( (final int ii, final ObjectChunk inputChunk) -> ((DateTime) inputChunk.get(ii)).getMicros()); } + private static BigInteger toBigIntegerAtPrecisionAndScale( + final BigDecimal v, final MathContext mathContext, final int scale) { + final BigDecimal rescaled = v + .scaleByPowerOfTen(scale) + .setScale(0, mathContext.getRoundingMode()) + .round(mathContext) + ; + return rescaled.toBigIntegerExact(); + } + + private static GenericRecordFieldProcessor makeBigDecimalFieldProcessor( + final String fieldName, + final ColumnSource chunkSource, + final int precision, + final int scale) { + final MathContext mathContext = new MathContext(precision, RoundingMode.HALF_UP); + return makeGenericFieldProcessor( + fieldName, + chunkSource, + (final int ii, final ObjectChunk inputChunk) -> { + final BigDecimal bd = (BigDecimal) inputChunk.get(ii); + final BigInteger bi = toBigIntegerAtPrecisionAndScale(bd, mathContext, scale); + return bi.toByteArray(); + }); + } + private static class TimestampFieldProcessor extends GenericRecordFieldProcessor { private final long fromNanosToUnitDenominator; @@ -364,46 +392,93 @@ final String getLogicalType(final String fieldName, final Schema.Field field) { * @param field The field in the schema. * @param columnNameIn The Deephaven column to be translated into publishable format */ - private void makeFieldProcessor(final Schema.Field field, final String columnNameIn) { + private void makeFieldProcessor( + final Schema.Field field, + final String columnNameIn, + final Properties columnProperties + ) { final String fieldName = field.name(); final String columnName = (columnNameIn == null) ? fieldName : columnNameIn; // getColumnSource should throw a ColumnNotFoundException if it can't find the column, // which will blow us up here. final ColumnSource src = source.getColumnSource(columnName); final Class type = src.getType(); - final GenericRecordFieldProcessor proc; + final GenericRecordFieldProcessor proc = getFieldProcessorForType(type, field, fieldName, columnName, src, columnProperties); + fieldProcessors.add(proc); + } + + private GenericRecordFieldProcessor getFieldProcessorForType( + final Class type, + final Schema.Field field, + final String fieldName, + final String columnName, + final ColumnSource src, + final Properties columnProperties) { if (type == char.class) { - proc = makeCharFieldProcessor(fieldName, src); - } else if (type == byte.class) { - proc = makeByteFieldProcessor(fieldName, src); - } else if (type == short.class) { - proc = makeShortFieldProcessor(fieldName, src); - } else if (type == int.class) { - proc = makeIntFieldProcessor(fieldName, src); - } else if (type == long.class) { - proc = getLongProcessor(field, fieldName, source.getColumn(columnName).getType(), src); - } else if (type == float.class) { - proc = makeFloatFieldProcessor(fieldName, src); - } else if (type == double.class) { - proc = makeDoubleFieldProcessor(fieldName, src); - } else if (type == DateTime.class) { + return makeCharFieldProcessor(fieldName, src); + } + if (type == byte.class) { + return makeByteFieldProcessor(fieldName, src); + } + if (type == short.class) { + return makeShortFieldProcessor(fieldName, src); + } + if (type == int.class) { + return makeIntFieldProcessor(fieldName, src); + } + if (type == long.class) { + return getLongProcessor(field, fieldName, source.getColumn(columnName).getType(), src); + } + if (type == float.class) { + return makeFloatFieldProcessor(fieldName, src); + } + if (type == double.class) { + return makeDoubleFieldProcessor(fieldName, src); + } + if (type == DateTime.class) { final String logicalType = getLogicalType(fieldName, field); if (logicalType == null) { throw new IllegalArgumentException( "field " + fieldName + " for column " + columnName + " has no logical type."); } if (logicalType.equals("timestamp-millis")) { - proc = makeDateTimeToMillisFieldProcessor(fieldName, src); - } else if (logicalType.equals("timestamp-micros")) { - proc = makeDateTimeToMicrosFieldProcessor(fieldName, src); - } else { - throw new IllegalArgumentException("field " + fieldName + " for column " + columnName - + " has unrecognized logical type " + logicalType); + return makeDateTimeToMillisFieldProcessor(fieldName, src); } - } else { - proc = makeObjectFieldProcessor(fieldName, src); + if (logicalType.equals("timestamp-micros")) { + return makeDateTimeToMicrosFieldProcessor(fieldName, src); + } + throw new IllegalArgumentException("field " + fieldName + " for column " + columnName + + " has unrecognized logical type " + logicalType); } - fieldProcessors.add(proc); + if (type == BigDecimal.class) { + final int precision = getPrecisionOrScale("precision", columnName, columnProperties); + final int scale = getPrecisionOrScale("scale", columnName, columnProperties); + return makeBigDecimalFieldProcessor(fieldName, src, precision, scale); + } + return makeObjectFieldProcessor(fieldName, src); + } + + private static final int getPrecisionOrScale( + final String precisionOrScale, + final String columnName, + final Properties columnProperties) { + final String property = columnName + "." + precisionOrScale; + final String propertyValue = columnProperties.getProperty(property); + if (propertyValue == null) { + throw new IllegalArgumentException( + "column name '" + columnName + "' has type " + BigDecimal.class.getSimpleName() + "" + + " but no property '" + property + "' defined."); + } + final int parsedResult; + try { + parsedResult = Integer.parseInt(propertyValue); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Couldn't parse as int value '" + propertyValue + "' for property " + property); + } + if (parsedResult < 1) { + throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for proprety " + property); + } + return parsedResult; } /** From 0dafa0fa331836514d76a45e87c000ad86b273aa Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Tue, 25 Jan 2022 22:36:04 -0500 Subject: [PATCH 2/9] Shared precison/scale bigd calculation from Parquet to common utility. --- .../engine/util/BigDecimalUtils.java | 59 ++++++++++++++ .../java/io/deephaven/kafka/KafkaTools.java | 81 ++++++++++++++----- .../parquet/table/ParquetTableWriter.java | 3 +- .../io/deephaven/parquet/table/TypeInfos.java | 45 +---------- 4 files changed, 127 insertions(+), 61 deletions(-) create mode 100644 engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java new file mode 100644 index 00000000000..b7b3d60b236 --- /dev/null +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -0,0 +1,59 @@ +package io.deephaven.engine.util; + +import io.deephaven.chunk.ObjectChunk; +import io.deephaven.chunk.attributes.Values; +import io.deephaven.engine.rowset.RowSequence; +import io.deephaven.engine.rowset.TrackingRowSet; +import io.deephaven.engine.table.ChunkSource; +import io.deephaven.engine.table.ColumnSource; +import io.deephaven.engine.table.Table; + +import java.math.BigDecimal; + +public class BigDecimalUtils { + public static class PrecisionAndScale { + public final int precision; + public final int scale; + + public PrecisionAndScale(final int precision, final int scale) { + this.precision = precision; + this.scale = scale; + } + } + + public static PrecisionAndScale computePrecisionAndScale( + final Table t, final String colName) { + final ColumnSource src = t.getColumnSource(colName, BigDecimal.class); + return computePrecisionAndScale(t.getRowSet(), src); + } + + public static PrecisionAndScale computePrecisionAndScale( + final TrackingRowSet rowSet, + final ColumnSource source + ) { + final int sz = 4096; + // we first compute max(precision - scale) and max(scale), which corresponds to + // max(digits left of the decimal point), max(digits right of the decimal point). + // Then we convert to (precision, scale) before returning. + int maxPrecisionMinusScale = 0; + int maxScale = 0; + try (final ChunkSource.GetContext context = source.makeGetContext(sz); + final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { + final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz); + final ObjectChunk chunk = source.getChunk(context, rowSeq).asObjectChunk(); + for (int i = 0; i < chunk.size(); ++i) { + final BigDecimal x = chunk.get(i); + final int precision = x.precision(); + final int scale = x.scale(); + final int precisionMinusScale = precision - scale; + if (precisionMinusScale > maxPrecisionMinusScale) { + maxPrecisionMinusScale = precisionMinusScale; + } + if (scale > maxScale) { + maxScale = scale; + } + } + } + return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); + } +} diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index f78ee7d902f..a89cd7f14c6 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -27,6 +27,7 @@ import io.deephaven.engine.table.impl.StreamTableTools; import io.deephaven.engine.table.TableMap; import io.deephaven.engine.table.TransformableTableMap; +import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.internal.log.LoggerFactory; import io.deephaven.io.logger.Logger; import io.deephaven.kafka.ingest.*; @@ -192,13 +193,16 @@ public static String putAvroSchema( } public static Schema columnDefinitionsToAvroSchema( - final ColumnDefinition[] colDefs, + final Table t, final String schemaName, final String namespace, final Properties colProps, final Predicate includeOnly, - final Predicate exclude) { + final Predicate exclude, + final MutableObject colPropsOut) { SchemaBuilder.FieldAssembler fass = SchemaBuilder.record(schemaName).namespace(namespace).fields(); + final ColumnDefinition[] colDefs = t.getDefinition().getColumns(); + colPropsOut.setValue(colProps); for (final ColumnDefinition colDef : colDefs) { if (includeOnly != null && !includeOnly.test(colDef.getName())) { continue; @@ -206,14 +210,16 @@ public static Schema columnDefinitionsToAvroSchema( if (exclude != null && exclude.test(colDef.getName())) { continue; } - fass = addFieldForColDef(fass, colDef, colProps); + fass = addFieldForColDef(t, fass, colDef, colPropsOut); } return fass.endRecord(); } private static SchemaBuilder.FieldAssembler addFieldForColDef( - final SchemaBuilder.FieldAssembler fassIn, final ColumnDefinition colDef, - final Properties colProps) { + final Table t, + final SchemaBuilder.FieldAssembler fassIn, + final ColumnDefinition colDef, + final MutableObject colPropsMu) { final String colNameToPropSeparator = "."; final String logicalTypeName = "logicalType"; final String dhTypeAttribute = "dhType"; @@ -221,6 +227,7 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( final Class type = colDef.getDataType(); final String colName = colDef.getName(); final SchemaBuilder.BaseFieldTypeBuilder base = fass.name(colName).type().nullable(); + final Properties colProps = colPropsMu.getValue(); if (type == byte.class || type == char.class || type == short.class) { fass = base.intBuilder().prop(dhTypeAttribute, type.getName()).endInt().noDefault(); } else if (type == int.class) { @@ -239,16 +246,55 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( final String precisionName = "precision"; final String scaleName = "scale"; String precision = null; + final String precisionProperty = colName + colNameToPropSeparator + precisionName; String scale = null; + final String scaleProperty = colName + colNameToPropSeparator + scaleName; if (colProps != null) { - precision = colProps.getProperty(colName + colNameToPropSeparator + precisionName); - scale = colProps.getProperty(colName + colNameToPropSeparator + scaleName); + precision = colProps.getProperty(precisionProperty); + scale = colProps.getProperty(scaleProperty); } - if (precision == null || scale == null) { - throw new IllegalArgumentException( - "When " + BigDecimal.class.getSimpleName() + " type columns are included, " + - " column properties '" + precisionName + "' and '" + scaleName - + "' for the given columns should be provided."); + if (t.isRefreshing()) { + final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + + " in a refreshing table implies both properties '" + + precisionProperty + "' and '" + scaleProperty + "' should be defined; "; + + if (precision == null && scale == null) { + throw new IllegalArgumentException(exBaseMsg + " missing both"); + } + if (precision == null) { + throw new IllegalArgumentException(exBaseMsg + " missing '" + precisionProperty + "'"); + } + if (scale == null) { + throw new IllegalArgumentException(exBaseMsg + " missing '" + scaleProperty + "'"); + } + } else { // non refreshing table + if (precision == null || scale == null) { + final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + + " in a non refreshing table implies either both properties '" + + precisionProperty + "' and '" + scaleProperty + "' should be defined, or none of them;"; + if (precision != null) { + throw new IllegalArgumentException( + exBaseMsg + " only '" + precisionProperty + "' is defined, missing '" + scaleProperty + "'"); + } + if (scale != null) { + throw new IllegalArgumentException( + exBaseMsg + " only '" + scaleProperty + "' is defined, missing '" + precisionProperty + "'"); + } + // Both precision and scale are null; compute them ourselves. + final BigDecimalUtils.PrecisionAndScale precisionAndScale = + BigDecimalUtils.computePrecisionAndScale(t, colName); + precision = Integer.toString(precisionAndScale.precision); + scale = Integer.toString(precisionAndScale.scale); + final Properties toSet; + if (colProps == null) { + toSet = new Properties(); + colPropsMu.setValue(toSet); + } else { + toSet = colProps; + } + toSet.setProperty(precisionProperty, precision); + toSet.setProperty(scaleProperty, scale); + } } fass = base.bytesBuilder() .prop(logicalTypeName, "decimal") @@ -715,7 +761,7 @@ static final class Avro extends KeyOrValueSpec { final Predicate excludeColumns; final boolean publishSchema; final String schemaNamespace; - final Properties columnProperties; + final MutableObject columnProperties; Avro(final Schema schema, final String schemaName, @@ -736,7 +782,7 @@ static final class Avro extends KeyOrValueSpec { this.excludeColumns = excludeColumns; this.publishSchema = publishSchema; this.schemaNamespace = schemaNamespace; - this.columnProperties = columnProperties; + this.columnProperties = new MutableObject<>(columnProperties); } @Override @@ -750,8 +796,8 @@ void ensureSchema(final Table t, final Properties kafkaProperties) { } final String schemaServiceUrl = ensureAndGetSchemaServerProprety(kafkaProperties); if (publishSchema) { - schema = columnDefinitionsToAvroSchema(t.getDefinition().getColumns(), - schemaName, schemaNamespace, columnProperties, includeOnlyColumns, excludeColumns); + schema = columnDefinitionsToAvroSchema(t, + schemaName, schemaNamespace, columnProperties.getValue(), includeOnlyColumns, excludeColumns, columnProperties); final String putVersion = putAvroSchema(schema, schemaServiceUrl, schemaName); if (putVersion != null && schemaVersion != null && !schemaVersion.equals(putVersion)) { throw new IllegalStateException("Specified expected version " + schemaVersion @@ -792,7 +838,6 @@ String[] getColumnNames(final Table t, final Properties kafkaProperties) { } } final int timestampFieldCount = ((timestampFieldName != null) ? 1 : 0); - final int nColNames = fields.size() - timestampFieldCount; final List columnNames = new ArrayList<>(); for (final Schema.Field field : fields) { final String fieldName = field.name(); @@ -1251,7 +1296,7 @@ private static KeyOrValueSerializer getAvroSerializer( @NotNull final Produce.KeyOrValueSpec.Avro avroSpec, @NotNull final String[] columnNames) { return new GenericRecordKeyOrValueSerializer( - t, avroSpec.schema, columnNames, avroSpec.timestampFieldName, avroSpec.columnProperties); + t, avroSpec.schema, columnNames, avroSpec.timestampFieldName, avroSpec.columnProperties.getValue()); } private static KeyOrValueSerializer getJsonSerializer( diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java index 1bc83d9f4dc..5c70b98e8d2 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/ParquetTableWriter.java @@ -7,6 +7,7 @@ import io.deephaven.api.agg.Aggregation; import io.deephaven.engine.table.*; import io.deephaven.engine.table.impl.CodecLookup; +import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.vector.Vector; import io.deephaven.stringset.StringSet; import io.deephaven.time.DateTime; @@ -669,7 +670,7 @@ private static TransferObject getDestinationBuffer( } else if (BigDecimal.class.equals(columnType)) { // noinspection unchecked final ColumnSource bigDecimalColumnSource = (ColumnSource) columnSource; - final TypeInfos.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( + final BigDecimalUtils.PrecisionAndScale precisionAndScale = TypeInfos.getPrecisionAndScale( computedCache, columnDefinition.getName(), tableRowSet, () -> bigDecimalColumnSource); final ObjectCodec codec = new BigDecimalParquetBytesCodec( precisionAndScale.precision, precisionAndScale.scale, -1); diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java index 95aab437d12..fa6482fd1d0 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/TypeInfos.java @@ -1,15 +1,11 @@ package io.deephaven.parquet.table; -import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.table.impl.CodecLookup; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.stringset.StringSet; import io.deephaven.time.DateTime; import io.deephaven.engine.table.ColumnSource; -import io.deephaven.engine.table.ChunkSource; -import io.deephaven.chunk.ObjectChunk; import io.deephaven.engine.rowset.TrackingRowSet; -import io.deephaven.engine.rowset.RowSequence; import io.deephaven.util.codec.ExternalizableCodec; import io.deephaven.util.codec.SerializableCodec; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -22,6 +18,9 @@ import org.apache.parquet.schema.Types.PrimitiveBuilder; import org.jetbrains.annotations.NotNull; +import static io.deephaven.engine.util.BigDecimalUtils.PrecisionAndScale; +import static io.deephaven.engine.util.BigDecimalUtils.computePrecisionAndScale; + import java.io.Externalizable; import java.math.BigDecimal; import java.util.*; @@ -101,44 +100,6 @@ static Pair getCodecAndArgs( return new ImmutablePair<>(SerializableCodec.class.getName(), null); } - static class PrecisionAndScale { - public final int precision; - public final int scale; - - public PrecisionAndScale(final int precision, final int scale) { - this.precision = precision; - this.scale = scale; - } - } - - private static PrecisionAndScale computePrecisionAndScale(final TrackingRowSet rowSet, - final ColumnSource source) { - final int sz = 4096; - // we first compute max(precision - scale) and max(scale), which corresponds to - // max(digits left of the decimal point), max(digits right of the decimal point). - // Then we convert to (precision, scale) before returning. - int maxPrecisionMinusScale = 0; - int maxScale = 0; - try (final ChunkSource.GetContext context = source.makeGetContext(sz); - final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { - final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz); - final ObjectChunk chunk = source.getChunk(context, rowSeq).asObjectChunk(); - for (int i = 0; i < chunk.size(); ++i) { - final BigDecimal x = chunk.get(i); - final int precision = x.precision(); - final int scale = x.scale(); - final int precisionMinusScale = precision - scale; - if (precisionMinusScale > maxPrecisionMinusScale) { - maxPrecisionMinusScale = precisionMinusScale; - } - if (scale > maxScale) { - maxScale = scale; - } - } - } - return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); - } - static PrecisionAndScale getPrecisionAndScale( final Map> computedCache, final String columnName, From 24fd462ca33a19bac9b3eab1eea68965569ed3d5 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Wed, 26 Jan 2022 19:00:04 -0500 Subject: [PATCH 3/9] Support BigDecimal columns for Kafka Avro publishing. --- .../engine/util/BigDecimalUtils.java | 61 ++++++++++++++++++ .../java/io/deephaven/kafka/KafkaTools.java | 64 ++++++++++--------- .../GenericRecordKeyOrValueSerializer.java | 35 +++------- 3 files changed, 102 insertions(+), 58 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index b7b3d60b236..621a0b68d23 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -9,8 +9,10 @@ import io.deephaven.engine.table.Table; import java.math.BigDecimal; +import java.util.Properties; public class BigDecimalUtils { + public static final int INVALID_PRECISION_OR_SCALE = -1; public static class PrecisionAndScale { public final int precision; public final int scale; @@ -56,4 +58,63 @@ public static PrecisionAndScale computePrecisionAndScale( } return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); } + + public static class PrecisionAndScalePropertyNames { + public final String columnName; + public final String precisionProperty; + public final String scaleProperty; + + public PrecisionAndScalePropertyNames(final String columnName) { + this.columnName = columnName; + precisionProperty = columnName + ".precision"; + scaleProperty = columnName + ".scale"; + } + } + + private static int getPrecisionAndScaleFromColumnProperties( + final String columnName, + final String property, + final Properties columnProperties, + final boolean allowNulls) { + if (columnProperties == null) { + return INVALID_PRECISION_OR_SCALE; + } + final String propertyValue = columnProperties.getProperty(property); + if (propertyValue == null) { + if (!allowNulls) { + throw new IllegalArgumentException( + "column name '" + columnName + "' has type " + BigDecimal.class.getSimpleName() + "" + + " but no property '" + property + "' defined."); + } + return INVALID_PRECISION_OR_SCALE; + } + final int parsedResult; + try { + parsedResult = Integer.parseInt(propertyValue); + } catch(NumberFormatException e) { + throw new IllegalArgumentException("Couldn't parse as int value '" + propertyValue + "' for property " + property); + } + if (parsedResult < 1) { + throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for property " + property); + } + return parsedResult; + } + + public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( + final PrecisionAndScalePropertyNames propertyNames, + final Properties columnProperties, + final boolean allowNulls + ) { + final int precision = getPrecisionAndScaleFromColumnProperties( + propertyNames.columnName, + propertyNames.precisionProperty, + columnProperties, + allowNulls); + final int scale = getPrecisionAndScaleFromColumnProperties( + propertyNames.columnName, + propertyNames.scaleProperty, + columnProperties, + allowNulls); + return new PrecisionAndScale(precision, scale); + } } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index a89cd7f14c6..75cc3e5d6c9 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -243,48 +243,44 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( } else if (type == DateTime.class) { fass = base.longBuilder().prop(logicalTypeName, "timestamp-micros").endLong().noDefault(); } else if (type == BigDecimal.class) { - final String precisionName = "precision"; - final String scaleName = "scale"; - String precision = null; - final String precisionProperty = colName + colNameToPropSeparator + precisionName; - String scale = null; - final String scaleProperty = colName + colNameToPropSeparator + scaleName; - if (colProps != null) { - precision = colProps.getProperty(precisionProperty); - scale = colProps.getProperty(scaleProperty); - } + final BigDecimalUtils.PrecisionAndScalePropertyNames propertyNames = + new BigDecimalUtils.PrecisionAndScalePropertyNames(colName); + final BigDecimalUtils.PrecisionAndScale precisionAndScaleFromProperties = + BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, colProps, true); + int precision = precisionAndScaleFromProperties.precision; + int scale = precisionAndScaleFromProperties.scale; if (t.isRefreshing()) { final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + " in a refreshing table implies both properties '" + - precisionProperty + "' and '" + scaleProperty + "' should be defined; "; + propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + "' should be defined; "; - if (precision == null && scale == null) { + if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE && scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException(exBaseMsg + " missing both"); } - if (precision == null) { - throw new IllegalArgumentException(exBaseMsg + " missing '" + precisionProperty + "'"); + if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException(exBaseMsg + " missing '" + propertyNames.precisionProperty + "'"); } - if (scale == null) { - throw new IllegalArgumentException(exBaseMsg + " missing '" + scaleProperty + "'"); + if (scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException(exBaseMsg + " missing '" + propertyNames.scaleProperty + "'"); } } else { // non refreshing table - if (precision == null || scale == null) { + if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE || scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + " in a non refreshing table implies either both properties '" + - precisionProperty + "' and '" + scaleProperty + "' should be defined, or none of them;"; - if (precision != null) { + propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + "' should be defined, or none of them;"; + if (precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException( - exBaseMsg + " only '" + precisionProperty + "' is defined, missing '" + scaleProperty + "'"); + exBaseMsg + " only '" + propertyNames.precisionProperty + "' is defined, missing '" + propertyNames.scaleProperty + "'"); } - if (scale != null) { + if (scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException( - exBaseMsg + " only '" + scaleProperty + "' is defined, missing '" + precisionProperty + "'"); + exBaseMsg + " only '" + propertyNames.scaleProperty + "' is defined, missing '" + propertyNames.precisionProperty + "'"); } // Both precision and scale are null; compute them ourselves. final BigDecimalUtils.PrecisionAndScale precisionAndScale = BigDecimalUtils.computePrecisionAndScale(t, colName); - precision = Integer.toString(precisionAndScale.precision); - scale = Integer.toString(precisionAndScale.scale); + precision = precisionAndScale.precision; + scale = precisionAndScale.scale; final Properties toSet; if (colProps == null) { toSet = new Properties(); @@ -292,14 +288,14 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( } else { toSet = colProps; } - toSet.setProperty(precisionProperty, precision); - toSet.setProperty(scaleProperty, scale); + toSet.setProperty(propertyNames.precisionProperty, Integer.toString(precision)); + toSet.setProperty(propertyNames.scaleProperty, Integer.toString(scale)); } } fass = base.bytesBuilder() .prop(logicalTypeName, "decimal") - .prop(precisionName, precision) - .prop(scaleName, scale) + .prop("precision", precision) + .prop("scale", scale) .endBytes() .noDefault(); } else { @@ -340,6 +336,11 @@ private static void pushColumnTypesFromAvroField( fieldSchema, mappedName, fieldType, fieldPathToColumnName); } + private static LogicalType getEffectiveLogicalType(final String fieldName, final Schema fieldSchema) { + final Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema); + return effectiveSchema.getLogicalType(); + } + private static void pushColumnTypesFromAvroField( final List> columnsOut, final Map fieldPathToColumnNameOut, @@ -357,7 +358,7 @@ private static void pushColumnTypesFromAvroField( columnsOut.add(ColumnDefinition.ofInt(mappedName)); break; case LONG: { - final LogicalType logicalType = fieldSchema.getLogicalType(); + final LogicalType logicalType = getEffectiveLogicalType(fieldName, fieldSchema); if (LogicalTypes.timestampMicros().equals(logicalType) || LogicalTypes.timestampMillis().equals(logicalType)) { columnsOut.add(ColumnDefinition.ofTime(mappedName)); @@ -376,13 +377,14 @@ private static void pushColumnTypesFromAvroField( case STRING: columnsOut.add(ColumnDefinition.ofString(mappedName)); break; - case UNION: + case UNION: { final Schema effectiveSchema = KafkaSchemaUtils.getEffectiveSchema(fieldName, fieldSchema); pushColumnTypesFromAvroField( columnsOut, fieldPathToColumnNameOut, prefix, fieldName, effectiveSchema, mappedName, effectiveSchema.getType(), fieldPathToColumnName); return; + } case RECORD: // Linearize any nesting. for (final Schema.Field nestedField : fieldSchema.getFields()) { @@ -394,7 +396,7 @@ private static void pushColumnTypesFromAvroField( return; case BYTES: case FIXED: { - final LogicalType logicalType = fieldSchema.getLogicalType(); + final LogicalType logicalType = getEffectiveLogicalType(fieldName, fieldSchema); if (logicalType instanceof LogicalTypes.Decimal) { columnsOut.add(ColumnDefinition.fromGenericType(mappedName, BigDecimal.class)); break; diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java index 595f286d4de..37251530b77 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java @@ -3,6 +3,7 @@ import io.deephaven.chunk.attributes.Values; import io.deephaven.engine.table.ChunkSource; import io.deephaven.engine.table.Table; +import io.deephaven.engine.util.BigDecimalUtils; import io.deephaven.kafka.KafkaSchemaUtils; import io.deephaven.time.DateTime; import io.deephaven.engine.util.string.StringUtils; @@ -22,6 +23,7 @@ import java.math.BigInteger; import java.math.MathContext; import java.math.RoundingMode; +import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.TimeUnit; @@ -323,7 +325,7 @@ private static GenericRecordFieldProcessor makeBigDecimalFieldProcessor( (final int ii, final ObjectChunk inputChunk) -> { final BigDecimal bd = (BigDecimal) inputChunk.get(ii); final BigInteger bi = toBigIntegerAtPrecisionAndScale(bd, mathContext, scale); - return bi.toByteArray(); + return ByteBuffer.wrap(bi.toByteArray()); }); } @@ -451,36 +453,15 @@ private GenericRecordFieldProcessor getFieldProcessorForType( + " has unrecognized logical type " + logicalType); } if (type == BigDecimal.class) { - final int precision = getPrecisionOrScale("precision", columnName, columnProperties); - final int scale = getPrecisionOrScale("scale", columnName, columnProperties); - return makeBigDecimalFieldProcessor(fieldName, src, precision, scale); + final BigDecimalUtils.PrecisionAndScalePropertyNames propertyNames = + new BigDecimalUtils.PrecisionAndScalePropertyNames(columnName); + final BigDecimalUtils.PrecisionAndScale precisionAndScale = + BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, columnProperties, true); + return makeBigDecimalFieldProcessor(fieldName, src, precisionAndScale.precision, precisionAndScale.scale); } return makeObjectFieldProcessor(fieldName, src); } - private static final int getPrecisionOrScale( - final String precisionOrScale, - final String columnName, - final Properties columnProperties) { - final String property = columnName + "." + precisionOrScale; - final String propertyValue = columnProperties.getProperty(property); - if (propertyValue == null) { - throw new IllegalArgumentException( - "column name '" + columnName + "' has type " + BigDecimal.class.getSimpleName() + "" + - " but no property '" + property + "' defined."); - } - final int parsedResult; - try { - parsedResult = Integer.parseInt(propertyValue); - } catch(NumberFormatException e) { - throw new IllegalArgumentException("Couldn't parse as int value '" + propertyValue + "' for property " + property); - } - if (parsedResult < 1) { - throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for proprety " + property); - } - return parsedResult; - } - /** * Process the given update RowSequence and returns a list of JSON strings, reach representing one row of data. * From aff0f2fe63ad8a78b00e1ac7976614fb879ae9d1 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Wed, 26 Jan 2022 19:43:51 -0500 Subject: [PATCH 4/9] Fixed an issue with mappings for columns in ProduceKafka.py; add sink to debezium demo. --- Integrations/python/deephaven/ProduceKafka.py | 2 + debezium-demo/demo.py | 43 +++++++++++++------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/Integrations/python/deephaven/ProduceKafka.py b/Integrations/python/deephaven/ProduceKafka.py index f33959a0c4b..d59cf358b39 100644 --- a/Integrations/python/deephaven/ProduceKafka.py +++ b/Integrations/python/deephaven/ProduceKafka.py @@ -185,7 +185,9 @@ def avro( field_to_col_mapping = _dictToMap(field_to_col_mapping) column_properties = _dictToProperties(column_properties) include_only_columns = _seqToSet(include_only_columns) + include_only_columns = _java_type_.predicateFromSet(include_only_columns) exclude_columns = _seqToSet(exclude_columns) + exclude_columns = _java_type_.predicateFromSet(exclude_columns) publish_schema = bool(publish_schema) if have_actual_schema: return _produce_jtype_.avroSpec( diff --git a/debezium-demo/demo.py b/debezium-demo/demo.py index 30d56d30adf..c421e6468b1 100644 --- a/debezium-demo/demo.py +++ b/debezium-demo/demo.py @@ -130,6 +130,7 @@ def make_cdc_table(table_name:str): dd_flagged_profiles = ck.consumeToTable( consume_properties, topic = 'dd_flagged_profiles', + offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING, key = ck.IGNORE, value = ck.simple('user_id_str', dh.string), table_type = 'append' @@ -150,16 +151,32 @@ def make_cdc_table(table_name:str): .where('lifetime_value > 10000') \ .naturalJoin(users, 'user_id = id', 'email') -# -# TODO: Publish to kafka the high-value-users-sink topic. Two missing pieces: -# * Need a way to automatically generate (and post) an Avro schema from a table definition. -# * Need support for publishing BigDecimal as Avro decimal logical type. -# -# callback = pk.produceFromTable( -# high_value_users, -# kafka_base_properties, -# 'high-value-users-sink', -# key = pk.avro(), -# value = pk.avro(), -# last_by_key_columns = True -#) +schema_namespace = 'io.deephaven.examples' + +cancel_callback = pk.produceFromTable( + high_value_users, + kafka_base_properties, + topic = 'high_value_users_sink', + key = pk.avro( + 'high_value_users_sink_key', + publish_schema = True, + schema_namespace = schema_namespace, + include_only_columns = [ 'user_id' ] + ), + value = pk.avro( + 'high_value_users_sink_value', + publish_schema = True, + schema_namespace = schema_namespace, + column_properties = { "lifetime_value.precision" : "12", "lifetime_value.scale" : "4" } + ), + last_by_key_columns = True +) + +hvu_test = ck.consumeToTable( + consume_properties, + topic = 'high_value_users_sink', + offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING, + key = ck.IGNORE, + value = ck.avro('high_value_users_sink_value'), + table_type = 'append' +) From f4fd64d4994b2afcfa181a08317dce3b2eddaba0 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Wed, 26 Jan 2022 19:54:14 -0500 Subject: [PATCH 5/9] Cosmetic change to demo.py. --- debezium-demo/demo.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/debezium-demo/demo.py b/debezium-demo/demo.py index c421e6468b1..3398a75b600 100644 --- a/debezium-demo/demo.py +++ b/debezium-demo/demo.py @@ -140,8 +140,9 @@ def make_cdc_table(table_name:str): .naturalJoin(pageviews_stg, 'user_id') high_value_users = purchases \ - .updateView('purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))') \ - .aggBy( + .updateView( + 'purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))' + ).aggBy( as_list([ agg.AggSum('lifetime_value = purchase_total'), agg.AggCount('purchases'), @@ -167,7 +168,10 @@ def make_cdc_table(table_name:str): 'high_value_users_sink_value', publish_schema = True, schema_namespace = schema_namespace, - column_properties = { "lifetime_value.precision" : "12", "lifetime_value.scale" : "4" } + column_properties = { + "lifetime_value.precision" : "12", + "lifetime_value.scale" : "4" + } ), last_by_key_columns = True ) From 8232629f93e7db92c201b86433dbe036569b7a97 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Wed, 26 Jan 2022 20:04:52 -0500 Subject: [PATCH 6/9] Added 'business intelligence' (...) dashboards (tables). --- debezium-demo/demo.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/debezium-demo/demo.py b/debezium-demo/demo.py index 3398a75b600..f5c2939f8d1 100644 --- a/debezium-demo/demo.py +++ b/debezium-demo/demo.py @@ -78,9 +78,15 @@ def make_cdc_table(table_name:str): .dropColumns('item_id') \ .updateView('conversion_rate = orders / (double) pageviews') -top_5_pageviews = item_summary \ +# These two 'top_*' tables match the 'Business Intelligence: Metabase' / dashboard +# part of the original example. +top_viewed_items = item_summary \ .sortDescending('pageviews') \ - .head(5) + .head(20) + +top_converting_items = item_summary \ + .sortDescending('conversion_rate') \ + .head(20) minute_in_nanos = 60 * 1000 * 1000 * 1000 From d9d399c42ae83f37970136e32d4956d58305c04f Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Wed, 26 Jan 2022 20:33:34 -0500 Subject: [PATCH 7/9] Spotlessfy. --- .../engine/util/BigDecimalUtils.java | 14 +++++----- .../java/io/deephaven/kafka/KafkaTools.java | 26 ++++++++++++------- .../GenericRecordKeyOrValueSerializer.java | 12 ++++----- 3 files changed, 29 insertions(+), 23 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index 621a0b68d23..4b46c87a2cd 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -13,6 +13,7 @@ public class BigDecimalUtils { public static final int INVALID_PRECISION_OR_SCALE = -1; + public static class PrecisionAndScale { public final int precision; public final int scale; @@ -31,8 +32,7 @@ public static PrecisionAndScale computePrecisionAndScale( public static PrecisionAndScale computePrecisionAndScale( final TrackingRowSet rowSet, - final ColumnSource source - ) { + final ColumnSource source) { final int sz = 4096; // we first compute max(precision - scale) and max(scale), which corresponds to // max(digits left of the decimal point), max(digits right of the decimal point). @@ -40,7 +40,7 @@ public static PrecisionAndScale computePrecisionAndScale( int maxPrecisionMinusScale = 0; int maxScale = 0; try (final ChunkSource.GetContext context = source.makeGetContext(sz); - final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { + final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) { final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz); final ObjectChunk chunk = source.getChunk(context, rowSeq).asObjectChunk(); for (int i = 0; i < chunk.size(); ++i) { @@ -91,8 +91,9 @@ private static int getPrecisionAndScaleFromColumnProperties( final int parsedResult; try { parsedResult = Integer.parseInt(propertyValue); - } catch(NumberFormatException e) { - throw new IllegalArgumentException("Couldn't parse as int value '" + propertyValue + "' for property " + property); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Couldn't parse as int value '" + propertyValue + "' for property " + property); } if (parsedResult < 1) { throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for property " + property); @@ -103,8 +104,7 @@ private static int getPrecisionAndScaleFromColumnProperties( public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( final PrecisionAndScalePropertyNames propertyNames, final Properties columnProperties, - final boolean allowNulls - ) { + final boolean allowNulls) { final int precision = getPrecisionAndScaleFromColumnProperties( propertyNames.columnName, propertyNames.precisionProperty, diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 75cc3e5d6c9..5d0d8ee8a31 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -252,29 +252,36 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( if (t.isRefreshing()) { final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + " in a refreshing table implies both properties '" + - propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + "' should be defined; "; + propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + + "' should be defined; "; - if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE && scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE + && scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException(exBaseMsg + " missing both"); } if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException(exBaseMsg + " missing '" + propertyNames.precisionProperty + "'"); + throw new IllegalArgumentException( + exBaseMsg + " missing '" + propertyNames.precisionProperty + "'"); } if (scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException(exBaseMsg + " missing '" + propertyNames.scaleProperty + "'"); } - } else { // non refreshing table - if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE || scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + } else { // non refreshing table + if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE + || scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + " in a non refreshing table implies either both properties '" + - propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + "' should be defined, or none of them;"; + propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty + + "' should be defined, or none of them;"; if (precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException( - exBaseMsg + " only '" + propertyNames.precisionProperty + "' is defined, missing '" + propertyNames.scaleProperty + "'"); + exBaseMsg + " only '" + propertyNames.precisionProperty + "' is defined, missing '" + + propertyNames.scaleProperty + "'"); } if (scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { throw new IllegalArgumentException( - exBaseMsg + " only '" + propertyNames.scaleProperty + "' is defined, missing '" + propertyNames.precisionProperty + "'"); + exBaseMsg + " only '" + propertyNames.scaleProperty + "' is defined, missing '" + + propertyNames.precisionProperty + "'"); } // Both precision and scale are null; compute them ourselves. final BigDecimalUtils.PrecisionAndScale precisionAndScale = @@ -799,7 +806,8 @@ void ensureSchema(final Table t, final Properties kafkaProperties) { final String schemaServiceUrl = ensureAndGetSchemaServerProprety(kafkaProperties); if (publishSchema) { schema = columnDefinitionsToAvroSchema(t, - schemaName, schemaNamespace, columnProperties.getValue(), includeOnlyColumns, excludeColumns, columnProperties); + schemaName, schemaNamespace, columnProperties.getValue(), includeOnlyColumns, + excludeColumns, columnProperties); final String putVersion = putAvroSchema(schema, schemaServiceUrl, schemaName); if (putVersion != null && schemaVersion != null && !schemaVersion.equals(putVersion)) { throw new IllegalStateException("Specified expected version " + schemaVersion diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java index 37251530b77..bd64426225e 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java @@ -46,8 +46,7 @@ public GenericRecordKeyOrValueSerializer( final Schema schema, final String[] columnNames, final String timestampFieldName, - final Properties columnProperties - ) { + final Properties columnProperties) { this.source = source; if (schema.isUnion()) { throw new UnsupportedOperationException("Schemas defined as a union of records are not supported"); @@ -308,8 +307,7 @@ private static BigInteger toBigIntegerAtPrecisionAndScale( final BigDecimal rescaled = v .scaleByPowerOfTen(scale) .setScale(0, mathContext.getRoundingMode()) - .round(mathContext) - ; + .round(mathContext); return rescaled.toBigIntegerExact(); } @@ -397,15 +395,15 @@ final String getLogicalType(final String fieldName, final Schema.Field field) { private void makeFieldProcessor( final Schema.Field field, final String columnNameIn, - final Properties columnProperties - ) { + final Properties columnProperties) { final String fieldName = field.name(); final String columnName = (columnNameIn == null) ? fieldName : columnNameIn; // getColumnSource should throw a ColumnNotFoundException if it can't find the column, // which will blow us up here. final ColumnSource src = source.getColumnSource(columnName); final Class type = src.getType(); - final GenericRecordFieldProcessor proc = getFieldProcessorForType(type, field, fieldName, columnName, src, columnProperties); + final GenericRecordFieldProcessor proc = + getFieldProcessorForType(type, field, fieldName, columnName, src, columnProperties); fieldProcessors.add(proc); } From fc3fc0eb0bfc84f5514d379c9b068a2ca135b984 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Thu, 27 Jan 2022 13:15:25 -0500 Subject: [PATCH 8/9] Followup to review comments. --- .../engine/util/BigDecimalUtils.java | 56 +++++++- .../java/io/deephaven/kafka/KafkaTools.java | 124 ++++++++++-------- .../GenericRecordKeyOrValueSerializer.java | 4 +- 3 files changed, 122 insertions(+), 62 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index 4b46c87a2cd..8e623bae61d 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -11,9 +11,21 @@ import java.math.BigDecimal; import java.util.Properties; +/** + * Utilities to support BigDecimal exhaust. + * + * Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; + * BigDecimal columns in Deephaven are, each value, arbitrary precision (its own precision and scale). + * + * For static tables, it is possible to compute overall precision and scale values that fit every existing value. + * For refreshing tables, we need the user to tell us. + */ public class BigDecimalUtils { public static final int INVALID_PRECISION_OR_SCALE = -1; + /** + * Immutable way to store and pass precision and scale values. + */ public static class PrecisionAndScale { public final int precision; public final int scale; @@ -24,12 +36,26 @@ public PrecisionAndScale(final int precision, final int scale) { } } + /** + * Compute an overall precision and scale that would fit all existing values in a table. + * + * @param t a Deephaven table + * @param colName a Column for {@code t}, which should be of {@code BigDecimal} type + * @return a {@code PrecisionAndScale} object result. + */ public static PrecisionAndScale computePrecisionAndScale( final Table t, final String colName) { final ColumnSource src = t.getColumnSource(colName, BigDecimal.class); return computePrecisionAndScale(t.getRowSet(), src); } + /** + * Compute an overall precision and scale that would fit all existing values in a column source. + * + * @param rowSet The rowset for the provided column + * @param source a {@code ColumnSource} of {@code BigDecimal} type + * @return a {@code PrecisionAndScale} object result. + */ public static PrecisionAndScale computePrecisionAndScale( final TrackingRowSet rowSet, final ColumnSource source) { @@ -59,12 +85,15 @@ public static PrecisionAndScale computePrecisionAndScale( return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale); } - public static class PrecisionAndScalePropertyNames { + /** + * Immutable way to store and pass properties to get precision and scale for a given named column. + */ + public static class PropertyNames { public final String columnName; public final String precisionProperty; public final String scaleProperty; - public PrecisionAndScalePropertyNames(final String columnName) { + public PropertyNames(final String columnName) { this.columnName = columnName; precisionProperty = columnName + ".precision"; scaleProperty = columnName + ".scale"; @@ -101,8 +130,17 @@ private static int getPrecisionAndScaleFromColumnProperties( return parsedResult; } + /** + * Get a {@code PrecisionAndScale} value from a {@Properties} object. + * + * @param propertyNames The property names to read. + * @param columnProperties The {@Properties} object from where to read the properties + * @param allowNulls If true, do not throw when a property is missing, instead set the value to + * {@Code INVALID_PRECISION_OR_SCALE} + * @return A {@PrecisionAndScale} object with the values read. + */ public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( - final PrecisionAndScalePropertyNames propertyNames, + final PropertyNames propertyNames, final Properties columnProperties, final boolean allowNulls) { final int precision = getPrecisionAndScaleFromColumnProperties( @@ -117,4 +155,16 @@ public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( allowNulls); return new PrecisionAndScale(precision, scale); } + + /** + * Set the given names and values in the supplied {@code Properties} object. + * + * @param props Properties where the given property names and values would be set. + * @param names Property names to set + * @param values Property values to set + */ + public static void setProperties(final Properties props, final PropertyNames names, final PrecisionAndScale values) { + props.setProperty(names.precisionProperty, Integer.toString(values.precision)); + props.setProperty(names.scaleProperty, Integer.toString(values.scale)); + } } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index 5d0d8ee8a31..cbb78273904 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -215,19 +215,76 @@ public static Schema columnDefinitionsToAvroSchema( return fass.endRecord(); } + private static void validatePrecisionAndScaleForRefreshingTable( + final BigDecimalUtils.PropertyNames names, + final BigDecimalUtils.PrecisionAndScale values) { + final String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + + " in a refreshing table implies both properties '" + + names.precisionProperty + "' and '" + names.scaleProperty + + "' should be defined; "; + + if (values.precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE + && values.scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException(exBaseMsg + " missing both"); + } + if (values.precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException( + exBaseMsg + " missing '" + names.precisionProperty + "'"); + } + if (values.scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException(exBaseMsg + " missing '" + names.scaleProperty + "'"); + } + } + + private static BigDecimalUtils.PrecisionAndScale ensurePrecisionAndScaleForStaticTable( + final MutableObject colPropsMu, + final Table t, + final BigDecimalUtils.PropertyNames names, + final BigDecimalUtils.PrecisionAndScale valuesIn + ) { + if (valuesIn.precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE + && valuesIn.scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + return valuesIn; + } + final String exBaseMsg = "Column " + names.columnName + " of type " + BigDecimal.class.getSimpleName() + + " in a non refreshing table implies either both properties '" + + names.precisionProperty + "' and '" + names.scaleProperty + + "' should be defined, or none of them;"; + if (valuesIn.precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException( + exBaseMsg + " only '" + names.precisionProperty + "' is defined, missing '" + + names.scaleProperty + "'"); + } + if (valuesIn.scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { + throw new IllegalArgumentException( + exBaseMsg + " only '" + names.scaleProperty + "' is defined, missing '" + + names.precisionProperty + "'"); + } + // Both precision and scale are null; compute them ourselves. + final BigDecimalUtils.PrecisionAndScale newValues = BigDecimalUtils.computePrecisionAndScale(t, names.columnName); + final Properties toSet; + final Properties colProps = colPropsMu.getValue(); + if (colProps == null) { + toSet = new Properties(); + colPropsMu.setValue(toSet); + } else { + toSet = colProps; + } + BigDecimalUtils.setProperties(toSet, names, newValues); + return newValues; + } + private static SchemaBuilder.FieldAssembler addFieldForColDef( final Table t, final SchemaBuilder.FieldAssembler fassIn, final ColumnDefinition colDef, final MutableObject colPropsMu) { - final String colNameToPropSeparator = "."; final String logicalTypeName = "logicalType"; final String dhTypeAttribute = "dhType"; SchemaBuilder.FieldAssembler fass = fassIn; final Class type = colDef.getDataType(); final String colName = colDef.getName(); final SchemaBuilder.BaseFieldTypeBuilder base = fass.name(colName).type().nullable(); - final Properties colProps = colPropsMu.getValue(); if (type == byte.class || type == char.class || type == short.class) { fass = base.intBuilder().prop(dhTypeAttribute, type.getName()).endInt().noDefault(); } else if (type == int.class) { @@ -243,66 +300,19 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( } else if (type == DateTime.class) { fass = base.longBuilder().prop(logicalTypeName, "timestamp-micros").endLong().noDefault(); } else if (type == BigDecimal.class) { - final BigDecimalUtils.PrecisionAndScalePropertyNames propertyNames = - new BigDecimalUtils.PrecisionAndScalePropertyNames(colName); - final BigDecimalUtils.PrecisionAndScale precisionAndScaleFromProperties = - BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, colProps, true); - int precision = precisionAndScaleFromProperties.precision; - int scale = precisionAndScaleFromProperties.scale; + final BigDecimalUtils.PropertyNames propertyNames = + new BigDecimalUtils.PropertyNames(colName); + BigDecimalUtils.PrecisionAndScale values = + BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, colPropsMu.getValue(), true); if (t.isRefreshing()) { - final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + - " in a refreshing table implies both properties '" + - propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty - + "' should be defined; "; - - if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE - && scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException(exBaseMsg + " missing both"); - } - if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException( - exBaseMsg + " missing '" + propertyNames.precisionProperty + "'"); - } - if (scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException(exBaseMsg + " missing '" + propertyNames.scaleProperty + "'"); - } + validatePrecisionAndScaleForRefreshingTable(propertyNames, values); } else { // non refreshing table - if (precision == BigDecimalUtils.INVALID_PRECISION_OR_SCALE - || scale == BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - final String exBaseMsg = "Column " + colName + " of type " + BigDecimal.class.getSimpleName() + - " in a non refreshing table implies either both properties '" + - propertyNames.precisionProperty + "' and '" + propertyNames.scaleProperty - + "' should be defined, or none of them;"; - if (precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException( - exBaseMsg + " only '" + propertyNames.precisionProperty + "' is defined, missing '" - + propertyNames.scaleProperty + "'"); - } - if (scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { - throw new IllegalArgumentException( - exBaseMsg + " only '" + propertyNames.scaleProperty + "' is defined, missing '" - + propertyNames.precisionProperty + "'"); - } - // Both precision and scale are null; compute them ourselves. - final BigDecimalUtils.PrecisionAndScale precisionAndScale = - BigDecimalUtils.computePrecisionAndScale(t, colName); - precision = precisionAndScale.precision; - scale = precisionAndScale.scale; - final Properties toSet; - if (colProps == null) { - toSet = new Properties(); - colPropsMu.setValue(toSet); - } else { - toSet = colProps; - } - toSet.setProperty(propertyNames.precisionProperty, Integer.toString(precision)); - toSet.setProperty(propertyNames.scaleProperty, Integer.toString(scale)); - } + ensurePrecisionAndScaleForStaticTable(colPropsMu, t, propertyNames, values); } fass = base.bytesBuilder() .prop(logicalTypeName, "decimal") - .prop("precision", precision) - .prop("scale", scale) + .prop("precision", values.precision) + .prop("scale", values.scale) .endBytes() .noDefault(); } else { diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java index bd64426225e..e5090c951a2 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/publish/GenericRecordKeyOrValueSerializer.java @@ -451,8 +451,8 @@ private GenericRecordFieldProcessor getFieldProcessorForType( + " has unrecognized logical type " + logicalType); } if (type == BigDecimal.class) { - final BigDecimalUtils.PrecisionAndScalePropertyNames propertyNames = - new BigDecimalUtils.PrecisionAndScalePropertyNames(columnName); + final BigDecimalUtils.PropertyNames propertyNames = + new BigDecimalUtils.PropertyNames(columnName); final BigDecimalUtils.PrecisionAndScale precisionAndScale = BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, columnProperties, true); return makeBigDecimalFieldProcessor(fieldName, src, precisionAndScale.precision, precisionAndScale.scale); From 9d6f43962bd3960529eb8bf3f1517fab5440b7c5 Mon Sep 17 00:00:00 2001 From: Cristian Ferretti Date: Thu, 27 Jan 2022 13:42:50 -0500 Subject: [PATCH 9/9] Spotlessfy. --- .../engine/util/BigDecimalUtils.java | 21 ++++++++++--------- .../java/io/deephaven/kafka/KafkaTools.java | 9 ++++---- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java index 8e623bae61d..b633730a75a 100644 --- a/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java +++ b/engine/table/src/main/java/io/deephaven/engine/util/BigDecimalUtils.java @@ -14,11 +14,11 @@ /** * Utilities to support BigDecimal exhaust. * - * Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; - * BigDecimal columns in Deephaven are, each value, arbitrary precision (its own precision and scale). + * Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale; BigDecimal columns + * in Deephaven are, each value, arbitrary precision (its own precision and scale). * - * For static tables, it is possible to compute overall precision and scale values that fit every existing value. - * For refreshing tables, we need the user to tell us. + * For static tables, it is possible to compute overall precision and scale values that fit every existing value. For + * refreshing tables, we need the user to tell us. */ public class BigDecimalUtils { public static final int INVALID_PRECISION_OR_SCALE = -1; @@ -133,11 +133,11 @@ private static int getPrecisionAndScaleFromColumnProperties( /** * Get a {@code PrecisionAndScale} value from a {@Properties} object. * - * @param propertyNames The property names to read. - * @param columnProperties The {@Properties} object from where to read the properties - * @param allowNulls If true, do not throw when a property is missing, instead set the value to - * {@Code INVALID_PRECISION_OR_SCALE} - * @return A {@PrecisionAndScale} object with the values read. + * @param propertyNames The property names to read. + * @param columnProperties The {@Properties} object from where to read the properties + * @param allowNulls If true, do not throw when a property is missing, instead set the value to + * {@Code INVALID_PRECISION_OR_SCALE} + * @return A {@PrecisionAndScale} object with the values read. */ public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( final PropertyNames propertyNames, @@ -163,7 +163,8 @@ public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties( * @param names Property names to set * @param values Property values to set */ - public static void setProperties(final Properties props, final PropertyNames names, final PrecisionAndScale values) { + public static void setProperties(final Properties props, final PropertyNames names, + final PrecisionAndScale values) { props.setProperty(names.precisionProperty, Integer.toString(values.precision)); props.setProperty(names.scaleProperty, Integer.toString(values.scale)); } diff --git a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java index cbb78273904..8a03e7d181d 100644 --- a/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java +++ b/extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java @@ -240,8 +240,7 @@ private static BigDecimalUtils.PrecisionAndScale ensurePrecisionAndScaleForStati final MutableObject colPropsMu, final Table t, final BigDecimalUtils.PropertyNames names, - final BigDecimalUtils.PrecisionAndScale valuesIn - ) { + final BigDecimalUtils.PrecisionAndScale valuesIn) { if (valuesIn.precision != BigDecimalUtils.INVALID_PRECISION_OR_SCALE && valuesIn.scale != BigDecimalUtils.INVALID_PRECISION_OR_SCALE) { return valuesIn; @@ -261,7 +260,8 @@ private static BigDecimalUtils.PrecisionAndScale ensurePrecisionAndScaleForStati + names.precisionProperty + "'"); } // Both precision and scale are null; compute them ourselves. - final BigDecimalUtils.PrecisionAndScale newValues = BigDecimalUtils.computePrecisionAndScale(t, names.columnName); + final BigDecimalUtils.PrecisionAndScale newValues = + BigDecimalUtils.computePrecisionAndScale(t, names.columnName); final Properties toSet; final Properties colProps = colPropsMu.getValue(); if (colProps == null) { @@ -303,7 +303,8 @@ private static SchemaBuilder.FieldAssembler addFieldForColDef( final BigDecimalUtils.PropertyNames propertyNames = new BigDecimalUtils.PropertyNames(colName); BigDecimalUtils.PrecisionAndScale values = - BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, colPropsMu.getValue(), true); + BigDecimalUtils.getPrecisionAndScaleFromColumnProperties(propertyNames, colPropsMu.getValue(), + true); if (t.isRefreshing()) { validatePrecisionAndScaleForRefreshingTable(propertyNames, values); } else { // non refreshing table