Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support publishing BigDecimal columns to Kafka Avro. #1894

Merged
merged 10 commits into from
Jan 27, 2022
2 changes: 2 additions & 0 deletions Integrations/python/deephaven/ProduceKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ def avro(
field_to_col_mapping = _dictToMap(field_to_col_mapping)
column_properties = _dictToProperties(column_properties)
include_only_columns = _seqToSet(include_only_columns)
include_only_columns = _java_type_.predicateFromSet(include_only_columns)
exclude_columns = _seqToSet(exclude_columns)
exclude_columns = _java_type_.predicateFromSet(exclude_columns)
publish_schema = bool(publish_schema)
if have_actual_schema:
return _produce_jtype_.avroSpec(
Expand Down
61 changes: 44 additions & 17 deletions debezium-demo/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,15 @@ def make_cdc_table(table_name:str):
.dropColumns('item_id') \
.updateView('conversion_rate = orders / (double) pageviews')

top_5_pageviews = item_summary \
# These two 'top_*' tables match the 'Business Intelligence: Metabase' / dashboard
# part of the original example.
top_viewed_items = item_summary \
.sortDescending('pageviews') \
.head(5)
.head(20)

top_converting_items = item_summary \
.sortDescending('conversion_rate') \
.head(20)

minute_in_nanos = 60 * 1000 * 1000 * 1000

Expand Down Expand Up @@ -130,6 +136,7 @@ def make_cdc_table(table_name:str):
dd_flagged_profiles = ck.consumeToTable(
consume_properties,
topic = 'dd_flagged_profiles',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.simple('user_id_str', dh.string),
table_type = 'append'
Expand All @@ -139,8 +146,9 @@ def make_cdc_table(table_name:str):
.naturalJoin(pageviews_stg, 'user_id')

high_value_users = purchases \
.updateView('purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))') \
.aggBy(
.updateView(
'purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))'
).aggBy(
as_list([
agg.AggSum('lifetime_value = purchase_total'),
agg.AggCount('purchases'),
Expand All @@ -150,16 +158,35 @@ def make_cdc_table(table_name:str):
.where('lifetime_value > 10000') \
.naturalJoin(users, 'user_id = id', 'email')

#
# TODO: Publish to kafka the high-value-users-sink topic. Two missing pieces:
# * Need a way to automatically generate (and post) an Avro schema from a table definition.
# * Need support for publishing BigDecimal as Avro decimal logical type.
#
# callback = pk.produceFromTable(
# high_value_users,
# kafka_base_properties,
# 'high-value-users-sink',
# key = pk.avro(),
# value = pk.avro(),
# last_by_key_columns = True
#)
schema_namespace = 'io.deephaven.examples'

cancel_callback = pk.produceFromTable(
high_value_users,
kafka_base_properties,
topic = 'high_value_users_sink',
key = pk.avro(
'high_value_users_sink_key',
publish_schema = True,
schema_namespace = schema_namespace,
include_only_columns = [ 'user_id' ]
),
value = pk.avro(
'high_value_users_sink_value',
publish_schema = True,
schema_namespace = schema_namespace,
column_properties = {
"lifetime_value.precision" : "12",
"lifetime_value.scale" : "4"
}
),
last_by_key_columns = True
)

hvu_test = ck.consumeToTable(
consume_properties,
topic = 'high_value_users_sink',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.avro('high_value_users_sink_value'),
table_type = 'append'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package io.deephaven.engine.util;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;

import java.math.BigDecimal;
import java.util.Properties;

/**
* Utilities to support BigDecimal exhaust.
*
* Parquet and Avro decimal types make a whole column decimal type have a fixed precision and scale;
* BigDecimal columns in Deephaven are, each value, arbitrary precision (its own precision and scale).
*
* For static tables, it is possible to compute overall precision and scale values that fit every existing value.
* For refreshing tables, we need the user to tell us.
*/
public class BigDecimalUtils {
public static final int INVALID_PRECISION_OR_SCALE = -1;

/**
* Immutable way to store and pass precision and scale values.
*/
public static class PrecisionAndScale {
public final int precision;
public final int scale;

public PrecisionAndScale(final int precision, final int scale) {
this.precision = precision;
this.scale = scale;
}
}

/**
* Compute an overall precision and scale that would fit all existing values in a table.
*
* @param t a Deephaven table
* @param colName a Column for {@code t}, which should be of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final Table t, final String colName) {
final ColumnSource<BigDecimal> src = t.getColumnSource(colName, BigDecimal.class);
return computePrecisionAndScale(t.getRowSet(), src);
}

/**
* Compute an overall precision and scale that would fit all existing values in a column source.
*
* @param rowSet The rowset for the provided column
* @param source a {@code ColumnSource} of {@code BigDecimal} type
* @return a {@code PrecisionAndScale} object result.
*/
public static PrecisionAndScale computePrecisionAndScale(
final TrackingRowSet rowSet,
final ColumnSource<BigDecimal> source) {
final int sz = 4096;
// we first compute max(precision - scale) and max(scale), which corresponds to
// max(digits left of the decimal point), max(digits right of the decimal point).
// Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = 0;
int maxScale = 0;
try (final ChunkSource.GetContext context = source.makeGetContext(sz);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz);
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
}
}
return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
}

/**
* Immutable way to store and pass properties to get precision and scale for a given named column.
*/
public static class PropertyNames {
public final String columnName;
public final String precisionProperty;
public final String scaleProperty;

public PropertyNames(final String columnName) {
this.columnName = columnName;
precisionProperty = columnName + ".precision";
scaleProperty = columnName + ".scale";
}
}

private static int getPrecisionAndScaleFromColumnProperties(
final String columnName,
final String property,
final Properties columnProperties,
final boolean allowNulls) {
if (columnProperties == null) {
return INVALID_PRECISION_OR_SCALE;
}
final String propertyValue = columnProperties.getProperty(property);
if (propertyValue == null) {
if (!allowNulls) {
throw new IllegalArgumentException(
"column name '" + columnName + "' has type " + BigDecimal.class.getSimpleName() + "" +
" but no property '" + property + "' defined.");
}
return INVALID_PRECISION_OR_SCALE;
}
final int parsedResult;
try {
parsedResult = Integer.parseInt(propertyValue);
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Couldn't parse as int value '" + propertyValue + "' for property " + property);
}
if (parsedResult < 1) {
throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for property " + property);
}
return parsedResult;
}

/**
* Get a {@code PrecisionAndScale} value from a {@Properties} object.
*
* @param propertyNames The property names to read.
* @param columnProperties The {@Properties} object from where to read the properties
* @param allowNulls If true, do not throw when a property is missing, instead set the value to
* {@Code INVALID_PRECISION_OR_SCALE}
* @return A {@PrecisionAndScale} object with the values read.
*/
public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties(
final PropertyNames propertyNames,
final Properties columnProperties,
final boolean allowNulls) {
final int precision = getPrecisionAndScaleFromColumnProperties(
propertyNames.columnName,
propertyNames.precisionProperty,
columnProperties,
allowNulls);
final int scale = getPrecisionAndScaleFromColumnProperties(
propertyNames.columnName,
propertyNames.scaleProperty,
columnProperties,
allowNulls);
return new PrecisionAndScale(precision, scale);
}

/**
* Set the given names and values in the supplied {@code Properties} object.
*
* @param props Properties where the given property names and values would be set.
* @param names Property names to set
* @param values Property values to set
*/
public static void setProperties(final Properties props, final PropertyNames names, final PrecisionAndScale values) {
props.setProperty(names.precisionProperty, Integer.toString(values.precision));
props.setProperty(names.scaleProperty, Integer.toString(values.scale));
}
}
Loading