Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to read from multiple kafka topics in same supervisor #14424

Merged
merged 14 commits into from
Aug 14, 2023

Conversation

abhishekagarwal87
Copy link
Contributor

@abhishekagarwal87 abhishekagarwal87 commented Jun 14, 2023

Description

This PR adds support to read from multiple Kafka topics in the same supervisor. A multi-topic ingestion can be useful in scenarios where a cluster admin has no control over input streams. Different teams in an org may create different input topics that they can write the data to. However, the cluster admin wants all this data to be queryable in one data source.

Implementing this change required

  • refactoring of Kafka indexing extension to move the type id of partition from Integer to KafkaTopicPartition. KafkaTopicPartition class stores both topic and partition information so a partition 0 for topic tp1 can be distinguished from partition 0 of topic tp2.
  • Passing a list of topics instead of one topic when druid tries to find out the list of partitions. This was a one-line change.

The refactoring was straightforward as well however the main challenge is maintaining backward compatibility. As the Integer partition id is part of serialized metadata and just simply changing the partition id type would fail existing supervisors. Admins should also be able to roll back to previous versions without running into any compatibility issues.

In order to achieve that, KafkaTopicPartition has an additional field called multiTopicPartition. This field will be set to true if a particular supervisor is running in multi-topic mode. That mode gets triggered if user is using a comma-separated list of topics such as tp1,tp2 instead of a single topic.

This field must be set explicitly anywhere in the code where KafkaTopicPartition object is created. So the code is refactored accordingly so we know in those places whether the supervisor is multi-topic or single-topic. If this field is false, the partition id is serialized in the old format. if the field is false, then topic field in this class is completely ignored for all purposes such as comparison, hashCode, etc. If this field is true, then the partition id is serialized in a new format. Below are some examples

KafkaTopicPartition ( multiTopicPartition : false, topic: "tp", partition: 0)

  • serialized - "0"
    KafkaTopicPartition ( multiTopicPartition : false, topic: null, partition: 0)
  • serialized - "0"
    KafkaTopicPartition ( multiTopicPartition : true, topic: "tp", partition: 0)
  • serialized - "tp:0"

It's also possible that someone changes the stream in a running supervisor and tries to switch between multiple streams and single streams. I will probably add some code to disallow that kind of update.

We could also enhance the serialization for KafkaTopicPartition so it can write a format version too. Though I don't think that we will really be changing that anytime soon.

With these changes, there should be no impact on currently running supervisors. If an admin tries to roll back a supervisor running in multi-topic mode, that supervisor will start failing (I am yet to explicitly test this scenario)

There is no limit on number of topics that a user can set. There is really no overhead that comes from multi-topic support. We didn't have any limit on the number of partitions that user can ingest data from. and we don't have such limit now. It's just that we store extra bits for a partition id.

There is also no change required on core task execution engine. A task is assigned the partitions the same way that it was assigned partitions before. Just the partition id is changed.

Other todos

  • Add an IT for multi-topic kafka ingestion
  • docs changes
  • Possibly adding a feature flag to turn on multi-topic

Looking for any design feedback before I finish these todos.

Release note

Now you can ingest data from multiple Kafka topics into one datasource using a single supervisor. Before this release, you needed to union multiple streams into one stream before ingesting into druid.


Key changed/added classes in this PR
  • KafkaDatasourceMetadata
  • KafkaTopicPartition

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@abhishekagarwal87 abhishekagarwal87 changed the title [Test PR] Add support to read from multiple kafka topics in same supervisor Jun 17, 2023
@kfaraz
Copy link
Contributor

kfaraz commented Jun 19, 2023

@abhishekagarwal87 , these are very interesting changes!

Since the PR is already open for design feedback, could you please add some more details in the description:

  • A line or two describing a typical situation where multi-topic ingestion would be useful.
  • Based on the current description, it seems that this allows ingestion from any number of topics and not just 2. Would we want to put a limit on this number?
  • How can we declare separate column mappings for reading from each of the topics?
  • How does number of tasks interplay with topics having a different number of partitions? Would there be a restriction on the number of partitions in the topics?

@abhishekagarwal87
Copy link
Contributor Author

I added some details. There is no need to declare separate column mapping for each topic. Just like there is no need to declare separate column mapping for each partition in a topic. Topics can have any number of partitions. Ingestion system doesn't care. Ingestion system tells Kakfa

  • give me partitions for (tp1, tp2, tp3)

Kafka gives

  • tp1:0, tp2:0, tp2:1, tp3:0, tp3:1, tp3:2

From ingestion perspective, tp1:0 is just another partition like tp3:2.

@kfaraz
Copy link
Contributor

kfaraz commented Jun 19, 2023

That's pretty neat, @abhishekagarwal87 , thanks for the clarification!

@pjain1
Copy link
Member

pjain1 commented Jun 19, 2023

@abhishekagarwal87 In what order record from different topics are pulled ? Is it round robin ? What happens if one of the topics does not any record but other has, are we handling this case?

@abhishekagarwal87
Copy link
Contributor Author

@pjain1 - The answer any question of sort "How is data consumed across multiple topics if XYZ is happening with topic A and topic B" is same as the answer to the question of "How is data consumed across multiple partitions if XYZ is happening with partition A and partition B"
Q - In what order record from different topics are pulled
A - In what order record from different partitions are pulled

Q - What happens if one of topics does not have any record
A - Same thing that happens if one of partitions does not have any record

The topic really is an irrelevant entity. For Kafka servers, consuming data from tp0:0 and tp1:0 is the same as consuming from tp0:0 and tp0:1. We ask for partitions for a list of topics and from there on, the work unit for code is partition.

@AmatyaAvadhanula
Copy link
Contributor

SeekableStreamSupervisor#emitLag tries to emit lag with the Stream as one of its dimensions. It may be helpful to understand topic-wise patterns by emitting lag metrics with separate streams in the dimensions instead of having the complete list.

@abhishekagarwal87
Copy link
Contributor Author

@AmatyaAvadhanula - Good point. In that case, we will be emitting "0:tp" as the partition dimension instead of "0". The stream dimension will still be set as comma separated list of topics.

Copy link
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is neat and would help with operational simplicity. Few thoughts and questions:

  1. Will multi-stream processing work if the ioConfig is different for the topics? For example, the input topics span distinct brokers because different teams own them, the shape of data on the streams is different, or they've security settings configured differently.
    To that effect, have we considered an array of ioConfig instead of a comma-separated list of topics that share a single ioConfig? I don't know how backwards compatibility would work in this case, though.

    EDIT: It seems like @kfaraz asked something similar. Is it reasonable to say that the scope of this change is to support multi-stream processing only if the topics share the same properties -- i.e., ioConfig, dataSchema, etc are the same? For all other use cases, I wonder what effort it would take and if it the design is extensible in the future.

  2. What are the implications on kafka ingestion metrics and supervisor operations like reset offsets in the multi-stream supervisor mode?

  3. Fwiw, Kinesis also supports multi-streaming processing by a single consumer starting 2.3.1. Once we bump up the current kinesis client SDK version from 1.14.4 to the latest stable, we can add similar support for kinesis indexing as well :)

@Nullable
private final String topic;

// This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: incompatibilty -> incompatibility

Also, I think this could straight-up be a javadoc for the property instead of a multi-line comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making it a javadoc. Also the content needs updating given the current code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Done.

@2bethere
Copy link
Contributor

A few questions on this:

  1. How will different topics with different schema work? (Related to Stop java process in twitter example #3 below)
  2. How will schema auto-discovery work?
  3. What happens if different topics has incompatible schemas? (I assume each one ends up in their own segment file with their own schema)
  4. How will auto-compaction behave for time chunks with mixed schema?

@abhishekagarwal87
Copy link
Contributor Author

abhishekagarwal87 commented Jun 27, 2023

This is neat and would help with operational simplicity. Few thoughts and questions:

  1. Will multi-stream processing work if the ioConfig is different for the topics? For example, the input topics span distinct brokers because different teams own them, the shape of data on the streams is different, or they've security settings configured differently.

The topics have to belong to the same cluster. The use-cases that I know of so far do not require reading from topics in different clusters. The shape of the data can be different, just like how it can be different for multiple partitions within a topic. This patch doesn't support a topic-specific tuning config. We can't have that anyway. A task doesn't really care what topic it is reading from. It just reads from a set of partitions. Those partitions can belong to same topic or different topics.

To that effect, have we considered an array of ioConfig instead of a comma-separated list of topics that share a single ioConfig? I don't know how backwards compatibility would work in this case, though.
EDIT: It seems like @kfaraz asked something similar. Is it reasonable to say that the scope of this change is to support multi-stream processing only if the topics share the same properties -- i.e., ioConfig, dataSchema, etc are the same? For all other use cases, I wonder what effort it would take and if it the design is extensible in the future.

There is no reason for dataSchema to be uniform. Imagine that the ingestion system is emitting some metrics to one topic and query system is emitting some metrics to another topic. These topics will likely have different columns. If you have decided to put these data in same datasource, you have already assumed that you are ok with the merged schema. It's just like how you will ingest data from multiple files. A topic is an equivalent of a file.

  1. What are the implications on kafka ingestion metrics and supervisor operations like reset offsets in the multi-stream supervisor mode?

partition-id dimension value will change but otherwise, they are expected to work as it is.

  1. Fwiw, Kinesis also supports multi-streaming processing by a single consumer starting 2.3.1. Once we bump up the current kinesis client SDK version from 1.14.4 to the latest stable, we can add similar support for kinesis indexing as well :)

Nice. I will look at that.

{
return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides()
spec.getIoConfig().getConfigOverrides(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
KafkaSupervisorSpec.getIoConfig
should be avoided because it has been deprecated.
{
return new KafkaRecordSupplier(
spec.getIoConfig().getConsumerProperties(),
sortingMapper,
spec.getIoConfig().getConfigOverrides()
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
KafkaSupervisorSpec.getIoConfig
should be avoided because it has been deprecated.
@abhishekagarwal87 abhishekagarwal87 marked this pull request as ready for review July 14, 2023 12:32
@vogievetsky vogievetsky added the Needs web console change Backend API changes that would benefit from frontend support in the web console label Jul 21, 2023
@vogievetsky
Copy link
Contributor

I added a Needs console support label because at minimum the console needs to be able to set the multiTopic flag. If the user ingests from two topics a,b is it possible to see in the data what topic the even came from? It would be cool it was a column in the data.

@vogievetsky
Copy link
Contributor

Would it make senes to make the API such that topic in the spec could either be a string or an array of strings so like

"topic": "single_topic_ingestion"

and

"topic": ["ingest", "from", "multiple", "topics"]

That would make the multiTopic flag unnecessary

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay on this, @abhishekagarwal87 .

Overall, the changes look good. I have left some comments. The code formatting seems off in a couple of places, would be nice to fix those up.

I still need to go through some classes and try out the changes in a cluster.

@Nullable
private final String topic;

// This flag is used to maintain backward incompatibilty with older versions of kafka indexing. If this flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for making it a javadoc. Also the content needs updating given the current code.

SeekableStreamStartSequenceNumbers<Integer, Long> startSequenceNumbers,
SeekableStreamEndSequenceNumbers<Integer, Long> endSequenceNumbers,
SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> startSequenceNumbers,
SeekableStreamEndSequenceNumbers<KafkaTopicPartition, Long> endSequenceNumbers,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra space

@Override
public int hashCode()
{
if (multiTopicPartition) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about excluding the topic from equals and hashCode if multiTopic is false.

In the impl here, new KafkaTopicPartition(false, "topicA", 1) would be considered equal to new KafkaTopicPartition(false, "topicB", 1).

The argument against this possibility could be that we would never have a scenario where we are required to compare two such partitions. In single-topic mode, the supervisor (or a task) would be dealing with either topicA or topicB, never both.

But I guess a similar argument would apply when comparing new KafkaTopicPartition(false, "topicA", 1) and new KafkaTopicPartition(false, null, 1). In other words, the supervisor would never have to compare two such objects (because either all tasks would be running new Druid version or all tasks would be running old Druid version from pov of supervisor due to order of upgrades), and a task would never have to compare two such objects, because in the lifecycle of the task, it would ever see only one format of partitions.

In short, the hashCode and equals methods can be simplified to just check all the 3 fields.
But maybe I am missing some cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Good point. Its just a defensive check that if someone is passing a topic for a single-topic mode, we don't compare. I was seeing some test failures because of that. But I realize now that it's entirely in my control, how those objects get created. I will simplify these equals/hashcode methods. But in the constructor, I will set the topic to null if multi-topic mode is not enabled. what do you think?

}
List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
if (partitions == null) {
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throw some kind of DruidException instead?

}
List<PartitionInfo> partitions = consumer.partitionsFor(topic.trim());
if (partitions == null) {
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rephrase

Suggested change
throw new ISE("Topic [%s] is not found in KafkaConsumer's list of topics", topic.trim());
throw new ISE("Could not find topic[%s] using Kafka consumer", topic.trim());

List<PartitionInfo> allPartitions = new ArrayList<>();
for (String topic : stream.split(",")) {
if (!multiTopic && !allPartitions.isEmpty()) {
throw InvalidInput.exception("Comma separated list of topics [%s] is not supported unless you enabled "
Copy link
Contributor

@kfaraz kfaraz Aug 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this validation should be done before the loop using !isMultiTopic and stream.contains(",").

{
return partitionId % spec.getIoConfig().getTaskCount();
return partitionId.partition() % spec.getIoConfig().getTaskCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this seems a little tricky.

This logic would always lead to a skew even with single topic ingestion, but the difference between most assigned taskGroupId and least assigned taskGroupId would have been only 1. With multiple topics and no limit on the number of topics, this difference can be anything.

Say there are 5 topics A, B, C, D, E with 6 partitions each and task count is 4.
Then taskGroupId 0 and 1 would get 10 partitions each
whereas taskGroupId 2 and 3 would get 5 partitions each.

The skew can be reduced in two ways:

  • Get total partition count of each topic and then assign them new integer ids. e.g. A:1 -> 1, A:2 -> 2... A:5 -> 5, then B:1 -> 6, B:2 -> 7 and so on. Then do a module of these new overall IDs instead of the partitionId.partition(). This is very cumbersome though.
  • A simpler approach could be to offset the assignment for every topic. So A starts assignment at taskGroupId 0, but B starts assignment at taskGroupId 1 and so on.
    • While constructing this class, we just need to get the list of topics from the stream.
    • Then the code here could become
topics = List of topics
return (partitionId.partition() + topics.getIndexOf(partitionId.topic())) % taskCount;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow. Nice catch. I am wary of that logic as I don't want the assignment to change if someone adds one more topic to the stream. I am thinking something like below
(31*topic.hashCode() + partition) % taskCount. but that may not lead to an optimal assignment.

@kfaraz
Copy link
Contributor

kfaraz commented Aug 3, 2023

If the user ingests from two topics a,b is it possible to see in the data what topic the even came from? It would be cool it was a column in the data.

@vogievetsky , yeah, it might be nice to have but I don't think it would be very straightforward. For now, we should ensure that the metrics such as ingest/events/* report the stream dimension as well.

cc: @abhishekagarwal87

@abhishekagarwal87
Copy link
Contributor Author

abhishekagarwal87 commented Aug 4, 2023

@vogievetsky - Two reasons why I don't want that to be an array like

  • the stream string itself is used in various places outside the kafka extension. If I make that an array, it will require a lot more refactoring.
  • I believe that in the future, people will want functionality where stream can be a regex. It will be useful so that folks don't need to modify ingestion spec if a new topic is added. Users will just say ingest all topics matching (metrics-*) pattern. So it could be metrics-aws-data-centre-1, metrics-gcp-datacentre-2, etc.

Edit - I will make that change now itself so stream becomes a regex.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀 , the remaining items and improvements can be addressed in follow-up PRs.

Comment on lines +233 to +234
.build("Topic [%s] is not found."
+ "Check that the topic exists in Kafka cluster", stream);

Check notice

Code scanning / CodeQL

Missing space in string literal Note

This string appears to be missing a space after 'found.'.
{
return partitionId % spec.getIoConfig().getTaskCount();
Integer taskCount = spec.getIoConfig().getTaskCount();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation Note

Invoking
KafkaSupervisorSpec.getIoConfig
should be avoided because it has been deprecated.
@abhishekagarwal87
Copy link
Contributor Author

Going to merge this since failure was unrelated.

@abhishekagarwal87 abhishekagarwal87 merged commit 30b5dd4 into apache:master Aug 14, 2023
65 checks passed
@vogievetsky vogievetsky removed the Needs web console change Backend API changes that would benefit from frontend support in the web console label Aug 17, 2023
@LakshSingla LakshSingla added this to the 28.0 milestone Oct 12, 2023
@glasser
Copy link
Contributor

glasser commented Nov 15, 2023

This is an exciting new feature!

Just to check — this feature does not allow you to ingest from multiple sets of brokers into a single data source, right? (For example, when migrating from one Kafka cluster to another, you can't set up a single data source that temporarily ingests from both clusters, right?)

@abhishekagarwal87
Copy link
Contributor Author

That is correct. You can not ingest from multiple Kafka clusters using this feature.

@maytasm
Copy link
Contributor

maytasm commented Feb 7, 2024

@abhishekagarwal87
Regarding #14424 (comment) and #14424 (comment), do you see any difficulty on supporting ingest from multiple sets of brokers? Another use case is if the streams has multiple regions (us-west, us-east, etc). All the streams have the same schema, and we want to combine all the different region into a single Druid datasource.

@abhishekagarwal87
Copy link
Contributor Author

@maytasm - Supporting ingestion from different clusters will be a bit more involved. If you want to try it out, the best way to do is by modifying KafkaRecordSupplier class. As far as I can see, all the kafka consumer operations are done through this class. So you could keep a map of in this class assuming you have topic specific bootstrap address. Then for any call to a KafkaRecordSupplier method, rather than calling consumer#xyz you would first pull out the relevant consumer object and then call the method on that consumer.

There is #poll call and for that you would keep calling poll method on each consumer. You will have to assume that the topic names are different across clusters. If they are same, then you need to add cluster id to the partition spec.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.