Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parallel inserts #365

Open
mkherlakian opened this issue Apr 4, 2024 · 7 comments
Open

Support parallel inserts #365

mkherlakian opened this issue Apr 4, 2024 · 7 comments
Labels
enhancement New feature or request

Comments

@mkherlakian
Copy link

mkherlakian commented Apr 4, 2024

Is your feature request related to a problem? Please describe.
We encountered an issue while sinking high volume topics (a few thousand records per second) that at first seemed to indicate a bottleneck with ClickHouse but upon digging turned out to be related to

// TODO - Multi process???
for (String topicAndPartition : dataRecords.keySet()) {
// Running on etch topic & partition
List<Record> rec = dataRecords.get(topicAndPartition);
processing.doLogic(rec);
}

Our setup is such that we have 3 high volume topics, and we initially created a connector subscribing to all 3 topics, and topic2TableMap to sink in the proper tables.

We were running with max.poll.records at 4,000 and a max.fetch.bytes high enough to produce about 3-4 batches which seemed optimal for our configuration (confirmed via inserts and kafka connect logs).

Out of the blue, we started having severe consumer lag that we could not fix no matter what we tried, and while utilized, CH wasn't CPU or memory bound. We ended up tracking the issue to the fact that inserts were not parallelized when getting data from multiple topics. So what would happen is that we'd start falling behind on a topic (its partitions) that was producing more data than the 2 other ones, but inserts from the other 2 even if we were getting 100's of rows were taking long (since CH is optimized for large batches) and we'd never catch up.

We ended up creating 3 connectors, one for each table, and things were dramatically better - of the order of 12-15x. We ended up catching up on 5 hours of backlog in about 10 minutes.

So this request is to put it out there to implement parallel inserts - However even with that, I'm not sure it completely solves the problem, because you're basically as fast as your slowest insert, so you still might end up in a situation where you're waiting for one of the topic's inserts where you could've been processing other data.

Describe the solution you'd like
Parallelize data inserts

Describe alternatives you've considered
Single connector per topic

Additional context
I think that it would regardless be worth stating that one connector per topic would be preferred when high throughput is required on the CH doc page. I'll open a PR and see if it gets merged.

@mkherlakian mkherlakian added the enhancement New feature or request label Apr 4, 2024
@ne1r0n
Copy link
Contributor

ne1r0n commented Apr 4, 2024

@mkherlakian, Just curious, have you tried increasing the tasks.max connect property up to the number of partitions in the topic?

@mkherlakian
Copy link
Author

mkherlakian commented Apr 4, 2024

@ne1r0n Actually that's another interesting point thanks for bringing it up!

I did, yes, we set tasks.max to 3 - which is the number of partitions in each of the 3 topics we're interested in.
We knew Connect wasn't resource bound though as we were keeping pretty close tabs on resources, and we run it on k8s, so we have a lot of flexibility when it comes to resource allocation.

That said, and maybe more surprising, is that we also tried bumping up the number of tasks to 9, expecting that each task would get assigned to one partition but what happened instead is that it was still using 3 tasks total, with each task consuming from 3 partitions, one from each topic.

I believe however that this has more to do with Kafka's consumer model, and the consumer API, than Kafka Connect - that's the behaviour when you subscribe to multiple topics...

@ne1r0n
Copy link
Contributor

ne1r0n commented Apr 4, 2024

You can also try to increase replicas of the connect workers and the optimal thing to do would be to run 1 connect for each topic with tasks.max = number of partitions.

@mkherlakian
Copy link
Author

Yes, did that too! Tried with 3, 6 and 9 worker instances With a variety of different resource allocations (1,2,4 CPUs). Didn't make a difference.
And it makes sense if you think about it, as each batch insert would take 2-3 seconds to return... so Connect was never constrained, it was just waiting...

@Paultagoras
Copy link
Contributor

This is an interesting problem - on the one hand, we do the inserts serially for data integrity reasons. On the other, maybe there are performance gains around parallel processing - the question circles around how Kafka Connect would handle that, and are the tradeoffs worth it when other instances could be spun up.

Worth a discussion about though, for sure - thanks!

@mkherlakian
Copy link
Author

@Paultagoras
Thanks for looking at this!

and are the tradeoffs worth it when other instances could be spun up.

My understanding from experimentation is that more instances won't help with that particular issue unless the topics you're pulling from have more partitions... Our topics initially had 50 partitions, but we reduced that number in order to find the right balance of batch size, =data freshness and resources (more partitions = faster pull because more consumers, but smaller batch sizes so more insert queries) so our current setup has 3 partitions for our high throughput topics.

Adding more than 3 instances (in a config where the connector is subscribing to all topics) in that scenario doesn't seem to change anything because what ends up happening is that you get exactly 3 tasks that subscribe to one part from each topic and the remainder stays idle. I tried scaling both the Kafka connect cluster with more nodes, and with varying resource configurations per node - inevitably, we ended up with 3 active CPUs. Hence the conclusion that the bottleneck is I/O waiting for an insert to complete before initiating the next.

on the one hand, we do the inserts serially for data integrity reasons

The insert could fail in the middle of the operation with one insert or multiple ones in parallel, right? I'm just trying to understand how serializing inserts help with integrity...

@antcalvente
Copy link

Hello,
Just wanted to add my 2 cents for people visiting this thread, something I bypassed in the documentation is this: https://clickhouse.com/docs/en/integrations/kafka/cloud/amazon-msk#performance-tuning which might solve 80% of people's issue as I moved from averages of 520 messages/second to 7200 messages/second ingestion.

Gist with configuration example: https://gist.github.com/antcalvente/1557761bb3677f7c8a812ee5af72b466

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants