Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add static getter for TableDefinition in KafkaTools #5956

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 74 additions & 28 deletions extensions/kafka/src/main/java/io/deephaven/kafka/KafkaTools.java
Original file line number Diff line number Diff line change
Expand Up @@ -1218,33 +1218,30 @@ public Function<TopicPartition, KafkaRecordConsumer> visit(@NotNull final PerPar
}

/**
* Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}.
*
* @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer
* @param topic Kafka topic name
* @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant
* {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions.
* @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @param streamConsumerRegistrarProvider A provider for a function to
* {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered
* stream consumers must accept {@link ChunkType chunk types} that correspond to
* {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied
* {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar)
* single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar)
* per-partition}.
* @param consumerLoopCallback callback to inject logic into the ingester's consumer loop
* Basic holder structure used to pass multiple objects back to a calling method.
*/
public static void consume(
private static class ConsumeStruct {
final TableDefinition tableDefinition;
final KafkaStreamPublisher.Parameters publisherParameters;
devinrsmith marked this conversation as resolved.
Show resolved Hide resolved
final Deserializer<?> keyDeser;
final Deserializer<?> valueDeser;

private ConsumeStruct(
@NotNull final TableDefinition tableDefinition,
@NotNull final KafkaStreamPublisher.Parameters publisherParameters,
@NotNull final Deserializer<?> keyDeser,
@NotNull final Deserializer<?> valueDeser) {
this.tableDefinition = tableDefinition;
this.publisherParameters = publisherParameters;
this.keyDeser = keyDeser;
this.valueDeser = valueDeser;
}
}

private static ConsumeStruct getConsumeStruct(
@NotNull final Properties kafkaProperties,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final InitialOffsetLookup partitionToInitialOffset,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider,
@Nullable final ConsumerLoopCallback consumerLoopCallback) {
@NotNull final Consume.KeyOrValueSpec valueSpec) {
if (Consume.isIgnore(keySpec) && Consume.isIgnore(valueSpec)) {
throw new IllegalArgumentException(
"can't ignore both key and value: keySpec and valueSpec can't both be ignore specs");
Expand Down Expand Up @@ -1297,12 +1294,61 @@ public static void consume(
.setValueToChunkObjectMapper(valueIngestData.toObjectChunkMapper);
}

final KafkaStreamPublisher.Parameters publisherParameters = publisherParametersBuilder.build();
return new ConsumeStruct(tableDefinition, publisherParametersBuilder.build(), keyDeser, valueDeser);
}

/**
* Construct a {@link TableDefinition} based on the input Properties and {@link Consume.KeyOrValueSpec} parameters.
*
* @param kafkaProperties Properties to configure this table
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @return A TableDefinition derived from the input Properties and KeyOrValueSpec instances
*/
@SuppressWarnings("unused")
public static TableDefinition getTableDefinition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explicitly call out the potential statefulness of KeyOrValueSpec. If the caller is going to rely on the correctness of this method, they must call consume with the exact same instances of keySpec / valueSpec.

ie, the following is incorrect:

TableDefinition tableDefinition = getTableDefinition(properties, null, avroSchema("MyValueSchema"));
Table table = consume(properties, ..., avroSchema("MyValueSchema"), ...)

it must be

KeyOrValueSpec myValueSpec = avroSchema("MyValueSchema");
TableDefinition tableDefinition = getTableDefinition(properties, null, myValueSpec);
Table table = consume(properties, ..., myValueSpec, ...)

Copy link
Member

@devinrsmith devinrsmith Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, we should add documentation about the "guarantees" between getTableDefinition and consume; in what conditions is the table definition from getTableDefinition guaranteed to be the table definition of consume? (In my mind, it should be guaranteed when you use the exact same keySpec / valueSpec due to the statefulness of them.)

@NotNull final Properties kafkaProperties,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec) {
return getConsumeStruct(kafkaProperties, keySpec, valueSpec).tableDefinition;
}

/**
* Consume from Kafka to {@link StreamConsumer stream consumers} supplied by {@code streamConsumerRegistrar}.
*
* @param kafkaProperties Properties to configure this table and also to be passed to create the KafkaConsumer
* @param topic Kafka topic name
* @param partitionFilter A predicate returning true for the partitions to consume. The convenience constant
* {@code ALL_PARTITIONS} is defined to facilitate requesting all partitions.
* @param partitionToInitialOffset A function specifying the desired initial offset for each partition consumed
* @param keySpec Conversion specification for Kafka record keys
* @param valueSpec Conversion specification for Kafka record values
* @param streamConsumerRegistrarProvider A provider for a function to
* {@link StreamPublisher#register(StreamConsumer) register} {@link StreamConsumer} instances. The registered
* stream consumers must accept {@link ChunkType chunk types} that correspond to
* {@link StreamChunkUtils#chunkTypeForColumnIndex(TableDefinition, int)} for the supplied
* {@link TableDefinition}. See {@link StreamConsumerRegistrarProvider#single(SingleConsumerRegistrar)
* single} and {@link StreamConsumerRegistrarProvider#perPartition(PerPartitionConsumerRegistrar)
* per-partition}.
* @param consumerLoopCallback callback to inject logic into the ingester's consumer loop
*/
public static void consume(
@NotNull final Properties kafkaProperties,
@NotNull final String topic,
@NotNull final IntPredicate partitionFilter,
@NotNull final InitialOffsetLookup partitionToInitialOffset,
@NotNull final Consume.KeyOrValueSpec keySpec,
@NotNull final Consume.KeyOrValueSpec valueSpec,
@NotNull final StreamConsumerRegistrarProvider streamConsumerRegistrarProvider,
@Nullable final ConsumerLoopCallback consumerLoopCallback) {
final ConsumeStruct consumeStruct = getConsumeStruct(kafkaProperties, keySpec, valueSpec);

final MutableObject<KafkaIngester> kafkaIngesterHolder = new MutableObject<>();

final Function<TopicPartition, KafkaRecordConsumer> kafkaRecordConsumerFactory =
streamConsumerRegistrarProvider.walk(
new KafkaRecordConsumerFactoryCreator(publisherParameters, kafkaIngesterHolder::getValue));
new KafkaRecordConsumerFactoryCreator(consumeStruct.publisherParameters,
kafkaIngesterHolder::getValue));

final KafkaIngester ingester = new KafkaIngester(
log,
Expand All @@ -1311,8 +1357,8 @@ public static void consume(
partitionFilter,
kafkaRecordConsumerFactory,
partitionToInitialOffset,
keyDeser,
valueDeser,
consumeStruct.keyDeser,
consumeStruct.valueDeser,
consumerLoopCallback);
kafkaIngesterHolder.setValue(ingester);
ingester.start();
Expand Down
Loading