Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

excessive memory allocation == poor performance? #3

Closed
rngadam opened this issue Jan 10, 2013 · 5 comments
Closed

excessive memory allocation == poor performance? #3

rngadam opened this issue Jan 10, 2013 · 5 comments
Assignees

Comments

@rngadam
Copy link

rngadam commented Jan 10, 2013

Testing with code based on the example to stream data from rsyslog to kafka. I'm hit by the poor performance - when I would expect 5000 messages / s, I'm only getting ~300 messages / s.

Reading the code, I'm concerned that there's at least two mallocs in the producer thread:

First, in the client:

        char *opbuf = malloc(len + 1);
        strncpy(opbuf, buf, len + 1);

...then the actual producer:

void rd_kafka_produce (rd_kafka_t *rk, char *topic, uint32_t partition,
               int msgflags,
               char *payload, size_t len) {
    rd_kafka_op_t *rko;

    rko = calloc(1, sizeof(*rko));

It seems that it would be best for librdkafka to allocate, track usage and reuse of rd_kafka_op_t and rd_kafka_op_t->rko_payload for optimal performance.

@ghost ghost assigned edenhill Jan 11, 2013
@edenhill
Copy link
Contributor

I'm unable to duplicate your test results but I've added a new tool to the examples directory better suited for performance testing: examples/rdkafka_performance.c

On an intel core i7 machine running a local kafka 0.7.2 broker (with default settings, storing the kafka logs on an in-memory filesystem) I'm getting the following performance figures:

Producer
Sending 1,000,000 messages of 200 bytes each with no copying or memduping of the payload:

# ./rdkafka_performance -P -t perf1 -p 0 -s 200 -c 1000000
...
% 1000000 messages and 200000000 bytes sent in 8342ms: 119000 msgs/s and 23975.00 Mb/s

Same test but with payload memduping and freeing:

# ./rdkafka_performance -P -t perf1 -p 0 -s 200 -c 1000000 -D
....
% 1000000 messages and 200000000 bytes sent in 8390ms: 119000 msgs/s and 23837.00 Mb/s

So I dont think the extra memdupping and allocations are an issue.
Either way it is really up to the application to decide if data needs to be duplicated before sending off to librdkafka.

(Note: with profiling enabled it shows that 92% of the time is spent calculating checksums on the payload)

Consumer
Receiving 1,000,000 messages of 200 bytes each and just throwing them away (offsets are stored to local file (without fsync()) for each received message):

# ./rdkafka_performance -C -t perf1 -c 1000000 -i 1000
....
% 1000000 messages and 200000000 bytes received in 9406ms: 106000 msgs/s and 21263.00 Mb/s
% Average application fetch latency: 3us

An important configuration setting on the consumer for this kind of thruput is the conf.consumer.replyq_low_thres :

        /* Tell rdkafka to (try to) maintain 10000 messages
         * in its internal receive buffers. This is to avoid
         * application -> rdkafka -> broker  per-message ping-pong
         * latency. */
        conf.consumer.replyq_low_thres = 100000;

It would be interesting if you could re-run your tests using the rdkafka_performance tool instead (or basing your implementation on rdkafka_performance's configuration struct).

Please also indicate the degree of network latency between the rdkafka producer and the kafka broker.

@rngadam
Copy link
Author

rngadam commented Jan 17, 2013

Wow, OK, these are really surprising results to me...

 ./rdkafka_performance -P -t perf1 -p 0 -s 200 -c 1000000 -b 192.168.11.250

Results:

% 1000000 messages and 200000000 bytes sent in 31744ms: 31000 msgs/s and 6300.00 Mb/s
% 1000000 messages and 200000000 bytes sent in 31736ms: 31000 msgs/s and 6301.00 Mb/s

latency:

ping 192.168.11.250
PING 192.168.11.250 (192.168.11.250) 56(84) bytes of data.
64 bytes from 192.168.11.250: icmp_req=1 ttl=64 time=0.546 ms
64 bytes from 192.168.11.250: icmp_req=2 ttl=64 time=0.375 ms
64 bytes from 192.168.11.250: icmp_req=3 ttl=64 time=0.346 ms
64 bytes from 192.168.11.250: icmp_req=4 ttl=64 time=0.301 ms

Memory copies:

 ./rdkafka_performance -P -t perf1 -p 0 -s 200 -c 1000000 -b 192.168.11.250 -D

results:

% 1000000 messages and 200000000 bytes sent in 31629ms: 31000 msgs/s and 6323.00 Mb/s
% 1000000 messages and 200000000 bytes sent in 31895ms: 31000 msgs/s and 6270.00 Mb/s

Larger messages (100 times larger, count 100 times smaller):

no copy:

% 10000 messages and 200000000 bytes sent in 18801ms: 0 msgs/s and 10637.00 Mb/s

copy:

% 10000 messages and 200000000 bytes sent in 18605ms: 0 msgs/s and 10749.00 Mb/s

I'd need to dig into your test to convince myself that it's doing what it's supposed to be doing ;).

@rngadam
Copy link
Author

rngadam commented Jan 17, 2013

Thanks for taking the time to look at time! I'll re-open if I find something interesting.

@rngadam rngadam closed this as completed Jan 17, 2013
@edenhill
Copy link
Contributor

Uhm, those thruput numbers are in Kb/s actually, not Mb/s.. Sorry about that ;)
So msg/s calculation is correct, but Mb/s is really 6Mb/s and 10Mb/s respetively in your test.
I'll push a fix.
With that small embarasment out of the way;

Your initial msg/s numbers are about 100 times (s)lower than your latest test. Can it all be attributed to the differences between rdkafka_example and rdkafka_performance? I'm not so sure.

What kind of network connection do you have between the producer and the broker? If its 100Mbit the thruput speeds look reasonable (link is saturated when using the bigger message sizes), but if its a gigabit link I would expect much higher thruput and I would have to look in to that.

Running kafka locally and storing logs on an SSD disk, with 20kb messages, gives:
% 100000 messages and 2000000000 bytes sent in 17172ms: 5882 msgs/s and 116.47 Mb/s

Running the same test but storing logs in memory gives:
% 100000 messages and 2000000000 bytes sent in 11615ms: 9090 msgs/s and 172.19 Mb/s

The jvm (for zookeeper & kafka) consumes 99% CPU while rdkafka_performance consumes about 45%.
The system io-wait during SSD disk test was about 7%.

@edenhill edenhill reopened this Jan 17, 2013
@rngadam
Copy link
Author

rngadam commented Jan 18, 2013

I think the initial test wasn't really comparable; I was running everything (services, test program) from my (relatively slow) laptop and since it seems these operations are mostly CPU bound, it's to be expected that the initially reported test results would be low compared to LAN / two hosts (one with a faster disk).

I think it's really interesting that checksumming is the bottleneck; wonder how much of a performance improvement could be made by tweaking that particular operation?

In any case, I since moved (at least for now) from Kafka since on the consumer-side I couldn't find a good, event-based library for Node.JS (The best library I could find does continuous polling: (cainus/Prozess#25) so it isn't as nice as Redis pubsub support.

@winbatch winbatch mentioned this issue Feb 10, 2014
@jcalcote jcalcote mentioned this issue Jun 28, 2019
7 tasks
azat added a commit to azat-archive/librdkafka that referenced this issue Feb 29, 2024
TSan report (founded by ClickHouse CI):

    Exception: Sanitizer assert found for instance �==================
    WARNING: ThreadSanitizer: data race (pid=1)
      Read of size 8 at 0x7b7800127158 by thread T987 (mutexes: read M0, write M1, write M2):
        #0 __tsan_memcpy <null> (clickhouse+0x74eebb7) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_avg_rollover build_docker/./contrib/librdkafka/src/rdavg.h:153:22 (clickhouse+0x1e39753b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#2 rd_kafka_stats_emit_avg build_docker/./contrib/librdkafka/src/rdkafka.c:1354:9 (clickhouse+0x1e39753b)
        confluentinc#3 rd_kafka_stats_emit_all build_docker/./contrib/librdkafka/src/rdkafka.c:1717:17 (clickhouse+0x1e395c8b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_stats_emit_tmr_cb build_docker/./contrib/librdkafka/src/rdkafka.c:1898:2 (clickhouse+0x1e395c8b)
        confluentinc#5 rd_kafka_timers_run build_docker/./contrib/librdkafka/src/rdkafka_timer.c:288:4 (clickhouse+0x1e46498a) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 rd_kafka_thread_main build_docker/./contrib/librdkafka/src/rdkafka.c:2021:3 (clickhouse+0x1e3919e9) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#7 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

      Previous write of size 8 at 0x7b7800127158 by thread T986:
        #0 rd_avg_calc build_docker/./contrib/librdkafka/src/rdavg.h:104:38 (clickhouse+0x1e37d71d) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_kafka_broker_timeout_scan build_docker/./contrib/librdkafka/src/rdkafka_broker.c:880:25 (clickhouse+0x1e37d71d)
        confluentinc#2 rd_kafka_broker_ops_io_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:3416:17 (clickhouse+0x1e37d71d)
        confluentinc#3 rd_kafka_broker_consumer_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:4975:17 (clickhouse+0x1e378e5e) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_broker_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5080:17 (clickhouse+0x1e378e5e)
        confluentinc#5 rd_kafka_broker_thread_main build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5237:25 (clickhouse+0x1e372619) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

Refs: ClickHouse/ClickHouse#60443
azat added a commit to azat-archive/librdkafka that referenced this issue Feb 29, 2024
TSan report (founded by ClickHouse CI):

    Exception: Sanitizer assert found for instance �==================
    WARNING: ThreadSanitizer: data race (pid=1)
      Read of size 8 at 0x7b7800127158 by thread T987 (mutexes: read M0, write M1, write M2):
        #0 __tsan_memcpy <null> (clickhouse+0x74eebb7) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_avg_rollover build_docker/./contrib/librdkafka/src/rdavg.h:153:22 (clickhouse+0x1e39753b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#2 rd_kafka_stats_emit_avg build_docker/./contrib/librdkafka/src/rdkafka.c:1354:9 (clickhouse+0x1e39753b)
        confluentinc#3 rd_kafka_stats_emit_all build_docker/./contrib/librdkafka/src/rdkafka.c:1717:17 (clickhouse+0x1e395c8b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_stats_emit_tmr_cb build_docker/./contrib/librdkafka/src/rdkafka.c:1898:2 (clickhouse+0x1e395c8b)
        confluentinc#5 rd_kafka_timers_run build_docker/./contrib/librdkafka/src/rdkafka_timer.c:288:4 (clickhouse+0x1e46498a) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 rd_kafka_thread_main build_docker/./contrib/librdkafka/src/rdkafka.c:2021:3 (clickhouse+0x1e3919e9) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#7 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

      Previous write of size 8 at 0x7b7800127158 by thread T986:
        #0 rd_avg_calc build_docker/./contrib/librdkafka/src/rdavg.h:104:38 (clickhouse+0x1e37d71d) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_kafka_broker_timeout_scan build_docker/./contrib/librdkafka/src/rdkafka_broker.c:880:25 (clickhouse+0x1e37d71d)
        confluentinc#2 rd_kafka_broker_ops_io_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:3416:17 (clickhouse+0x1e37d71d)
        confluentinc#3 rd_kafka_broker_consumer_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:4975:17 (clickhouse+0x1e378e5e) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_broker_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5080:17 (clickhouse+0x1e378e5e)
        confluentinc#5 rd_kafka_broker_thread_main build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5237:25 (clickhouse+0x1e372619) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

Refs: ClickHouse/ClickHouse#60443
azat added a commit to azat-archive/librdkafka that referenced this issue Jul 9, 2024
TSan report (founded by ClickHouse CI):

    Exception: Sanitizer assert found for instance �==================
    WARNING: ThreadSanitizer: data race (pid=1)
      Read of size 8 at 0x7b7800127158 by thread T987 (mutexes: read M0, write M1, write M2):
        #0 __tsan_memcpy <null> (clickhouse+0x74eebb7) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_avg_rollover build_docker/./contrib/librdkafka/src/rdavg.h:153:22 (clickhouse+0x1e39753b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#2 rd_kafka_stats_emit_avg build_docker/./contrib/librdkafka/src/rdkafka.c:1354:9 (clickhouse+0x1e39753b)
        confluentinc#3 rd_kafka_stats_emit_all build_docker/./contrib/librdkafka/src/rdkafka.c:1717:17 (clickhouse+0x1e395c8b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_stats_emit_tmr_cb build_docker/./contrib/librdkafka/src/rdkafka.c:1898:2 (clickhouse+0x1e395c8b)
        confluentinc#5 rd_kafka_timers_run build_docker/./contrib/librdkafka/src/rdkafka_timer.c:288:4 (clickhouse+0x1e46498a) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 rd_kafka_thread_main build_docker/./contrib/librdkafka/src/rdkafka.c:2021:3 (clickhouse+0x1e3919e9) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#7 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

      Previous write of size 8 at 0x7b7800127158 by thread T986:
        #0 rd_avg_calc build_docker/./contrib/librdkafka/src/rdavg.h:104:38 (clickhouse+0x1e37d71d) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#1 rd_kafka_broker_timeout_scan build_docker/./contrib/librdkafka/src/rdkafka_broker.c:880:25 (clickhouse+0x1e37d71d)
        confluentinc#2 rd_kafka_broker_ops_io_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:3416:17 (clickhouse+0x1e37d71d)
        confluentinc#3 rd_kafka_broker_consumer_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:4975:17 (clickhouse+0x1e378e5e) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#4 rd_kafka_broker_serve build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5080:17 (clickhouse+0x1e378e5e)
        confluentinc#5 rd_kafka_broker_thread_main build_docker/./contrib/librdkafka/src/rdkafka_broker.c:5237:25 (clickhouse+0x1e372619) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)
        confluentinc#6 _thrd_wrapper_function build_docker/./contrib/librdkafka/src/tinycthread.c:576:9 (clickhouse+0x1e47a57b) (BuildId: 7122171f6a93acda7ea89a6d10cce3ad580a715d)

Refs: ClickHouse/ClickHouse#60443
Refs: ClickHouse/ClickHouse#60939
Refs: confluentinc#4625
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants