Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support publishing BigDecimal columns to Kafka Avro. #1894

Merged
merged 10 commits into from
Jan 27, 2022
2 changes: 2 additions & 0 deletions Integrations/python/deephaven/ProduceKafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ def avro(
field_to_col_mapping = _dictToMap(field_to_col_mapping)
column_properties = _dictToProperties(column_properties)
include_only_columns = _seqToSet(include_only_columns)
include_only_columns = _java_type_.predicateFromSet(include_only_columns)
exclude_columns = _seqToSet(exclude_columns)
exclude_columns = _java_type_.predicateFromSet(exclude_columns)
publish_schema = bool(publish_schema)
if have_actual_schema:
return _produce_jtype_.avroSpec(
Expand Down
61 changes: 44 additions & 17 deletions debezium-demo/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,15 @@ def make_cdc_table(table_name:str):
.dropColumns('item_id') \
.updateView('conversion_rate = orders / (double) pageviews')

top_5_pageviews = item_summary \
# These two 'top_*' tables match the 'Business Intelligence: Metabase' / dashboard
# part of the original example.
top_viewed_items = item_summary \
.sortDescending('pageviews') \
.head(5)
.head(20)

top_converting_items = item_summary \
.sortDescending('conversion_rate') \
.head(20)

minute_in_nanos = 60 * 1000 * 1000 * 1000

Expand Down Expand Up @@ -130,6 +136,7 @@ def make_cdc_table(table_name:str):
dd_flagged_profiles = ck.consumeToTable(
consume_properties,
topic = 'dd_flagged_profiles',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.simple('user_id_str', dh.string),
table_type = 'append'
Expand All @@ -139,8 +146,9 @@ def make_cdc_table(table_name:str):
.naturalJoin(pageviews_stg, 'user_id')

high_value_users = purchases \
.updateView('purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))') \
.aggBy(
.updateView(
'purchase_total = purchase_price.multiply(java.math.BigDecimal.valueOf(quantity))'
).aggBy(
as_list([
agg.AggSum('lifetime_value = purchase_total'),
agg.AggCount('purchases'),
Expand All @@ -150,16 +158,35 @@ def make_cdc_table(table_name:str):
.where('lifetime_value > 10000') \
.naturalJoin(users, 'user_id = id', 'email')

#
# TODO: Publish to kafka the high-value-users-sink topic. Two missing pieces:
# * Need a way to automatically generate (and post) an Avro schema from a table definition.
# * Need support for publishing BigDecimal as Avro decimal logical type.
#
# callback = pk.produceFromTable(
# high_value_users,
# kafka_base_properties,
# 'high-value-users-sink',
# key = pk.avro(),
# value = pk.avro(),
# last_by_key_columns = True
#)
schema_namespace = 'io.deephaven.examples'

cancel_callback = pk.produceFromTable(
high_value_users,
kafka_base_properties,
topic = 'high_value_users_sink',
key = pk.avro(
'high_value_users_sink_key',
publish_schema = True,
schema_namespace = schema_namespace,
include_only_columns = [ 'user_id' ]
),
value = pk.avro(
'high_value_users_sink_value',
publish_schema = True,
schema_namespace = schema_namespace,
column_properties = {
"lifetime_value.precision" : "12",
"lifetime_value.scale" : "4"
}
),
last_by_key_columns = True
)

hvu_test = ck.consumeToTable(
consume_properties,
topic = 'high_value_users_sink',
offsets = ck.ALL_PARTITIONS_SEEK_TO_BEGINNING,
key = ck.IGNORE,
value = ck.avro('high_value_users_sink_value'),
table_type = 'append'
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.deephaven.engine.util;

import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.TrackingRowSet;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.Table;

import java.math.BigDecimal;
import java.util.Properties;

public class BigDecimalUtils {
public static final int INVALID_PRECISION_OR_SCALE = -1;
public static class PrecisionAndScale {
public final int precision;
public final int scale;

public PrecisionAndScale(final int precision, final int scale) {
this.precision = precision;
this.scale = scale;
}
}

public static PrecisionAndScale computePrecisionAndScale(
final Table t, final String colName) {
final ColumnSource<BigDecimal> src = t.getColumnSource(colName, BigDecimal.class);
return computePrecisionAndScale(t.getRowSet(), src);
}

public static PrecisionAndScale computePrecisionAndScale(
final TrackingRowSet rowSet,
final ColumnSource<BigDecimal> source
) {
final int sz = 4096;
// we first compute max(precision - scale) and max(scale), which corresponds to
// max(digits left of the decimal point), max(digits right of the decimal point).
// Then we convert to (precision, scale) before returning.
int maxPrecisionMinusScale = 0;
int maxScale = 0;
try (final ChunkSource.GetContext context = source.makeGetContext(sz);
final RowSequence.Iterator it = rowSet.getRowSequenceIterator()) {
final RowSequence rowSeq = it.getNextRowSequenceWithLength(sz);
final ObjectChunk<BigDecimal, ? extends Values> chunk = source.getChunk(context, rowSeq).asObjectChunk();
for (int i = 0; i < chunk.size(); ++i) {
final BigDecimal x = chunk.get(i);
final int precision = x.precision();
final int scale = x.scale();
final int precisionMinusScale = precision - scale;
if (precisionMinusScale > maxPrecisionMinusScale) {
maxPrecisionMinusScale = precisionMinusScale;
}
if (scale > maxScale) {
maxScale = scale;
}
}
}
return new PrecisionAndScale(maxPrecisionMinusScale + maxScale, maxScale);
}

public static class PrecisionAndScalePropertyNames {
public final String columnName;
public final String precisionProperty;
public final String scaleProperty;

public PrecisionAndScalePropertyNames(final String columnName) {
this.columnName = columnName;
precisionProperty = columnName + ".precision";
scaleProperty = columnName + ".scale";
}
}

private static int getPrecisionAndScaleFromColumnProperties(
final String columnName,
final String property,
final Properties columnProperties,
final boolean allowNulls) {
if (columnProperties == null) {
return INVALID_PRECISION_OR_SCALE;
}
final String propertyValue = columnProperties.getProperty(property);
if (propertyValue == null) {
if (!allowNulls) {
throw new IllegalArgumentException(
"column name '" + columnName + "' has type " + BigDecimal.class.getSimpleName() + "" +
" but no property '" + property + "' defined.");
}
return INVALID_PRECISION_OR_SCALE;
}
final int parsedResult;
try {
parsedResult = Integer.parseInt(propertyValue);
} catch(NumberFormatException e) {
throw new IllegalArgumentException("Couldn't parse as int value '" + propertyValue + "' for property " + property);
}
if (parsedResult < 1) {
throw new IllegalArgumentException("Invalid value '" + parsedResult + "' for property " + property);
}
return parsedResult;
}

public static PrecisionAndScale getPrecisionAndScaleFromColumnProperties(
final PrecisionAndScalePropertyNames propertyNames,
final Properties columnProperties,
final boolean allowNulls
) {
final int precision = getPrecisionAndScaleFromColumnProperties(
propertyNames.columnName,
propertyNames.precisionProperty,
columnProperties,
allowNulls);
final int scale = getPrecisionAndScaleFromColumnProperties(
propertyNames.columnName,
propertyNames.scaleProperty,
columnProperties,
allowNulls);
return new PrecisionAndScale(precision, scale);
}
}
Loading