Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Kafka Lookup Extractor to not register consumer group #12842

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions docs/development/extensions-core/kafka-extraction-namespace.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,56 @@ If you need updates to populate as promptly as possible, it is possible to plug
{
"type":"kafka",
"kafkaTopic":"testTopic",
"kafkaProperties":{"zookeeper.connect":"somehost:2181/kafka"}
"kafkaProperties":{
"bootstrap.servers":"kafka.service:9092"
}
}
```

|Parameter|Description|Required|Default|
|---------|-----------|--------|-------|
|`kafkaTopic`|The Kafka topic to read the data from|Yes||
|`kafkaProperties`|Kafka consumer properties. At least"zookeeper.connect" must be specified. Only the zookeeper connector is supported|Yes||
|`connectTimeout`|How long to wait for an initial connection|No|`0` (do not wait)|
|`isOneToOne`|The map is a one-to-one (see [Lookup DimensionSpecs](../../querying/dimensionspecs.md))|No|`false`|
| Parameter | Description | Required | Default |
|-------------------|-----------------------------------------------------------------------------------------|----------|-------------------|
| `kafkaTopic` | The Kafka topic to read the data from | Yes ||
| `kafkaProperties` | Kafka consumer properties (`bootstrap.servers` must be specified) | Yes ||
| `connectTimeout` | How long to wait for an initial connection | No | `0` (do not wait) |
| `isOneToOne` | The map is a one-to-one (see [Lookup DimensionSpecs](../../querying/dimensionspecs.md)) | No | `false` |

The extension `kafka-extraction-namespace` enables reading from a Kafka feed which has name/key pairs to allow renaming of dimension values. An example use case would be to rename an ID to a human readable format.
The extension `kafka-extraction-namespace` enables reading from an [Apache Kafka](https://kafka.apache.org/) topic which has name/key pairs to allow renaming of dimension values. An example use case would be to rename an ID to a human-readable format.

The consumer properties `group.id` and `auto.offset.reset` CANNOT be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()` and `smallest` respectively.
## How it Works

See [lookups](../../querying/lookups.md) for how to configure and use lookups.
The extractor works by consuming the configured Kafka topic from the beginning, and appending every record to an internal map. The key of the Kafka record is used as they key of the map, and the payload of the record is used as the value. At query time, a lookup can be used to transform the key into the associated value. See [lookups](../../querying/lookups.md) for how to configure and use lookups in a query. Keys and values are both stored as strings by the lookup extractor.

The extractor remains subscribed to the topic, so new records are added to the lookup map as they appear. This allows for lookup values to be updated in near-realtime. If two records are added to the topic with the same key, the record with the larger offset will replace the previous record in the lookup map. A record with a `null` payload will be treated as a tombstone record, and the associated key will be removed from the lookup map.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the PR, that has the code to remove a payload with null message, is yet to be merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this change will need to wait for the PR to be merged. I will resolve any merge conflicts when that happens.


The extractor treats the input topic much like a [KTable](https://kafka.apache.org/23/javadoc/org/apache/kafka/streams/kstream/KTable.html). As such, it is best to create your Kafka topic using a [log compaction](https://kafka.apache.org/documentation/#compaction) strategy, so that the most-recent version of a key is always preserved in Kafka. Without properly configuring retention and log compaction, older keys that are automatically removed from Kafka will not be available and will be lost when Druid services are restarted.

### Example

Consider a `country_codes` topic is being consumed, and the following records are added to the topic in the following order:

| Offset | Key | Payload |
|--------|-----|-------------|
| 1 | NZ | Nu Zeelund |
| 2 | AU | Australia |
| 3 | NZ | New Zealand |
| 4 | AU | `null` |
| 5 | NZ | Aotearoa |
| 6 | CZ | Czechia |

This input topic would be consumed from the beginning, and result in a lookup namespace containing the following mappings (notice that the entry for _Australia_ was added and then deleted):

| Key | Value |
|-----|-----------|
| NZ | Aotearoa |
| CZ | Czechia |

Now when a query uses this extraction namespace, the country codes can be mapped to the full country name at query time.

## Limitations

Currently the Kafka lookup extractor feeds the entire Kafka stream into a local cache. If you are using on-heap caching, this can easily clobber your java heap if the Kafka stream spews a lot of unique keys.
off-heap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored.
There is currently no eviction policy.
The consumer properties `group.id`, `auto.offset.reset` and `enable.auto.commit` cannot be set in `kafkaProperties` as they are set by the extension as `UUID.randomUUID().toString()`, `earliest` and `false` respectively. This is because the entire topic must be consumed by the Druid service from the very beginning so that a complete map of lookup values can be built. Setting any of these consumer properties will cause the extractor to not start.

Currently, the Kafka lookup extractor feeds the entire Kafka topic into a local cache. If you are using on-heap caching, this can easily clobber your java heap if the Kafka stream spews a lot of unique keys. Off-heap caching should alleviate these concerns, but there is still a limit to the quantity of data that can be stored. There is currently no eviction policy.

## Testing the Kafka rename functionality

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,13 @@ ListenableFuture<?> getFuture()
return future;
}

/**
* Check that the user has not set forbidden Kafka consumer props
*
* Some consumer properties must be set in order to guarantee that
* the consumer will consume the entire topic from the beginning.
* Otherwise, lookup data may not be loaded completely.
*/
private void verifyKafkaProperties()
{
if (kafkaProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
Expand All @@ -360,10 +367,17 @@ private void verifyKafkaProperties()
}
if (kafkaProperties.containsKey(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)) {
throw new IAE(
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [smallest]. Found [%s]",
"Cannot set kafka property [auto.offset.reset]. Property will be forced to [earliest]. Found [%s]",
kafkaProperties.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
);
}
if (kafkaProperties.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) &&
!kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
throw new IAE(
"Cannot set kafka property [enable.auto.commit]. Property will be forced to [false]. Found [%s]",
kafkaProperties.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
);
}
Preconditions.checkNotNull(
kafkaProperties.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
"bootstrap.servers required property"
Expand Down Expand Up @@ -391,9 +405,10 @@ private Properties getConsumerProperties()
{
final Properties properties = new Properties();
properties.putAll(kafkaProperties);
// Enable publish-subscribe
// Set the consumer to consume everything and never commit offsets
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, factoryId);
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,22 @@ public void testStartFailsOnAutoOffset()
Assert.assertTrue(factory.close());
}

@Test
public void testStartFailsOnAutoCommit()
{
final KafkaLookupExtractorFactory factory = new KafkaLookupExtractorFactory(
cacheManager,
TOPIC,
ImmutableMap.of("enable.auto.commit", "true")
);
Assert.assertThrows(
"Cannot set kafka property [enable.auto.commit]. Property will be forced to [false]. Found [true]",
IAE.class,
() -> factory.start()
);
Assert.assertTrue(factory.close());
}

@Test
public void testFailsGetNotStarted()
{
Expand Down
8 changes: 8 additions & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,11 @@ gcs-connector
hadoop2
hdfs
- ../docs/development/extensions-core/kafka-extraction-namespace.md
Aotearoa
Czechia
KTable
LookupExtractorFactory
Zeelund
zookeeper.connect
- ../docs/development/extensions-core/kafka-ingestion.md
0.11.x.
Expand Down Expand Up @@ -2107,3 +2111,7 @@ TIMESTAMPDIFF
TRUNC
VAR_POP
VAR_SAMP
KTable
Aotearoa
Czechia
Zeelund