Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and Improve streaming engines Kafka/RabbitMQ/NATS and data formats #42777

Merged
merged 33 commits into from
Jan 2, 2023

Conversation

Avogar
Copy link
Member

@Avogar Avogar commented Oct 28, 2022

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Refactor and Improve streaming engines Kafka/RabbitMQ/NATS and add support for all formats, also refactor formats a bit:

  • Fix producing messages in row-based formats with suffixes/prefixes. Now every message is formatted complitely with all delimiters and can be parsed back using input format.
  • Support block-based formats like Native, Parquet, ORC, etc. Every block is formatted as a separated message. The number of rows in one message depends on block size, so you can control it via setting max_block_size.
  • Add new engine settings kafka_max_rows_per_message/rabbitmq_max_rows_per_message/nats_max_rows_per_message. They control the number of rows formatted in one message in row-based formats. Default value: 1.
  • Fix high memory consumption in NATS table engine.
  • Support arbitrary binary data in NATS producer (previously it worked only with strings contained \0 at the end)
  • Add missing Kafka/RabbitMQ/NATS engine settings in documentation.
  • Refactor producing and consuming in Kafka/RabbitMQ/NATS, separate it from WriteBuffers/ReadBuffers semantic.
  • Refactor output formats: remove callbacks on each row used in Kafka/RabbitMQ/NATS (now we don't use callbacks there), allow to use IRowOutputFormat directly, clarify row end and row between delimiters, make it possible to reset output format to start formatting again
  • Add proper implementation in formatRow function (bonus after formats refactoring)

CC: @filimonov, @kssenii

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

@robot-clickhouse robot-clickhouse added the pr-improvement Pull request with some product improvements label Oct 28, 2022
Comment on lines +355 to +357
/// Need for backward compatibility.
if (format_name == "Avro" && local_context->getSettingsRef().output_format_avro_rows_in_file.changed)
max_rows = local_context->getSettingsRef().output_format_avro_rows_in_file.value;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is temporary, I will make a PR to depricate setting output_format_avro_rows_in_file after this PR because this setting doesn't make sense anymore

@filimonov
Copy link
Contributor

Quite a big change, did not finish the reading yet.

We need also to do benchmarks before merging

@Avogar
Copy link
Member Author

Avogar commented Nov 2, 2022

We need also to do benchmarks before merging

It will be great if you can help me with it

src/Processors/Executors/StreamingFormatExecutor.cpp Outdated Show resolved Hide resolved
src/Storages/MessageQueueSink.h Show resolved Hide resolved
src/Storages/MessageQueueSink.cpp Outdated Show resolved Hide resolved
src/Storages/Kafka/KafkaConsumer.h Outdated Show resolved Hide resolved
src/Storages/IMessageProducer.h Outdated Show resolved Hide resolved
src/Storages/IMessageProducer.h Outdated Show resolved Hide resolved
src/Storages/IMessageProducer.cpp Outdated Show resolved Hide resolved
src/Storages/IMessageProducer.h Show resolved Hide resolved
src/Storages/IMessageProducer.cpp Show resolved Hide resolved
@Avogar Avogar requested a review from kssenii December 15, 2022 19:56
@Avogar
Copy link
Member Author

Avogar commented Dec 30, 2022

I checked performance difference in kafka and found out that async producing is less effective. Seems like overhead on copying of each message from the buffer to queue surpasses all benefits. I will think about how to optimize producing later. Let's make kafka producing process singlethreaded as it was before.
BTW, @kssenii did you test that async producing for RabbitMQ gives real benefits?

@Avogar Avogar merged commit 966f57e into ClickHouse:master Jan 2, 2023
@kssenii
Copy link
Member

kssenii commented Jan 2, 2023

I checked performance difference in kafka and found out that async producing is less effective. Seems like overhead on copying of each message from the buffer to queue surpasses all benefits. I will think about how to optimize producing later. Let's make kafka producing process singlethreaded as it was before.
BTW, @kssenii did you test that async producing for RabbitMQ gives real benefits?

hm, no, initially seemed that asynchronous writing is preferable because rabbitmq library is event-based and requires to run event loops, but indeed need to compare and check the difference.

Avogar added a commit to Avogar/ClickHouse that referenced this pull request Jan 3, 2023
Avogar added a commit that referenced this pull request Jan 4, 2023
Revert some changes from #42777 to fix performance tests
azat added a commit to azat/ClickHouse that referenced this pull request Dec 27, 2023
Fixes: ClickHouse#42777
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
azat added a commit to azat/ClickHouse that referenced this pull request Dec 28, 2023
Fixes: ClickHouse#42777
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
(cherry picked from commit 51d4f58)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-improvement Pull request with some product improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants