Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client consume from kafka crushed occasionally #254

Closed
mahb94 opened this issue Apr 21, 2015 · 16 comments
Closed

client consume from kafka crushed occasionally #254

mahb94 opened this issue Apr 21, 2015 · 16 comments
Labels

Comments

@mahb94
Copy link

mahb94 commented Apr 21, 2015

I use librdkafka to consume messages from kafka. But there will be a coredump every 2 or 3 days. Here is the stack from one of the coredump files. I dont have any idea now from these information.

(gdb) bt
#0 0x0000003dbae784aa in _int_free () from /lib64/libc.so.6
#1 0x00007fb46f5b0cb4 in rd_kafka_q_serve (rkq=0x9ae848, timeout_ms=, callback=0x7fb46f5b1f00 <rd_kafka_consume_cb>,

opaque=0x7fff80a3f960) at rdkafka.c:471

#2 0x00007fb46f5b0da4 in rd_kafka_consume_callback0 (rkt=0x9ae170, partition=0, timeout_ms=1000, consume_cb=0x405688 <msg_consume>, opaque=0x0)

at rdkafka.c:1481

#3 rd_kafka_consume_callback (rkt=0x9ae170, partition=0, timeout_ms=1000, consume_cb=0x405688 <msg_consume>, opaque=0x0) at rdkafka.c:1508
#4 0x0000000000407ad9 in kafkacli_consume (index=0, offset=42192633) at kafkacli.c:1774
#5 0x0000000000404918 in main (argc=3, argv=0x7fff80a3fb98) at consumer.c:601

@edenhill
Copy link
Contributor

Which version of librdkafka is this?

Can you share your code for your consume callback?

Thanks

@mahb94
Copy link
Author

mahb94 commented Apr 21, 2015

0.8.5, the g_conf.run is a global variable. I use it to stop rkt outside kafkacli_consume which calls rd_kafka_consume_callback

static void msg_consume (rd_kafka_message_t *rkmessage, void *opaque)
{
    int r;
    char path[1024];
    char buffer[64];
    if(g_conf.run == 0)
        return;
    if (rkmessage->err)
    {
        if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF)
        {
            kafkacli_log(KFCLI_LOG_WARN,
                 "kafka",
                 "Consumer reached end of "
                 "%s "
                 "message queue at offset %"PRId64,
                 g_conf.kafkacli_tp[0].topic,
                 rkmessage->offset);

            sprintf(path, "/zmd_consumers/%s/%d", g_conf.kafkacli_tp[0].topic, g_conf.consumer_group_id);
            sprintf(buffer, "%"PRId64, rkmessage->offset);
            r = zoo_set(g_conf.zh_owneridc, path, buffer, sizeof(buffer), -1);
            if(r != ZOK)
            {
                kafkacli_log(KFCLI_LOG_WARN,
                 "zookeeper",
                 "failed to set topic %s offset:%"PRId64,
                 g_conf.kafkacli_tp[0].topic,
                 rkmessage->offset);
                g_conf.run = 0;
                return;
            }
            kafkacli_log(KFCLI_LOG_DEBUG,
                 "zookeeper",
                 "set topic %s offset:%s",
                 g_conf.kafkacli_tp[0].topic, buffer);
        }
        else
        {
            kafkacli_log(KFCLI_LOG_ERROR,
                 "kafka",
                 "Consume error for topic %s "
                 "offset %"PRId64": %s",
                 g_conf.kafkacli_tp[0].topic,
                 rkmessage->offset,
                 rd_kafka_message_errstr(rkmessage));
            g_conf.run = 0;
        }

        if(g_conf.cli_cb)
            g_conf.cli_cb(rkmessage->payload, rkmessage->len,
                          rkmessage->offset, rkmessage->err,
                          rd_kafka_message_errstr(rkmessage));


    }
    else
    {
        if(g_conf.cli_cb)
        {
            r = g_conf.cli_cb(rkmessage->payload, rkmessage->len,
                          rkmessage->offset, rkmessage->err,
                          rd_kafka_message_errstr(rkmessage));
            if(r)
            {
                //消息回调中处理失败,停止继续消费
                g_conf.run = 0;

                sprintf(path, "/zmd_consumers/%s/%d", g_conf.kafkacli_tp[0].topic, g_conf.consumer_group_id);
                sprintf(buffer, "%"PRId64, rkmessage->offset);
                r = zoo_set(g_conf.zh_owneridc, path, buffer, sizeof(buffer), -1);
                if(r != ZOK)
                {
                    kafkacli_log(KFCLI_LOG_ERROR,
                         "zookeeper",
                         "failed to set topic %s offset:%s",
                         g_conf.kafkacli_tp[0].topic, buffer);
                    g_conf.run = 0;
                    return;
                }

                kafkacli_log(KFCLI_LOG_ERROR,
                         "zookeeper",
                         "failed to deal business, set topic %s offset:%s",
                         g_conf.kafkacli_tp[0].topic, buffer);
                return;
            }

            if(rkmessage->offset%g_conf.offset_commit_interval==0)
            {
                //每隔g_conf.offset_commit_interval条消息强制提交偏移量
                sprintf(path, "/zmd_consumers/%s/%d", g_conf.kafkacli_tp[0].topic, g_conf.consumer_group_id);
                sprintf(buffer, "%"PRId64, rkmessage->offset);
                r = zoo_set(g_conf.zh_owneridc, path, buffer, sizeof(buffer), -1);
                if(r != ZOK)
                {
                    kafkacli_log(KFCLI_LOG_ERROR,
                         "zookeeper",
                         "failed to set topic %s offset:%s",
                         g_conf.kafkacli_tp[0].topic, buffer);
                    g_conf.run = 0;
                    return;
                }

                kafkacli_log(KFCLI_LOG_ERROR,
                         "zookeeper",
                         "succeed to deal business, set topic %s offset:%s",
                         g_conf.kafkacli_tp[0].topic, buffer);
                return;
            }
        }

        /* We store offset when we're done processing
         * the current message. */
        //rd_kafka_offset_store(rkmessage->rkt, rkmessage->partition, rkmessage->offset);
        #if 0
        sprintf(path, "/zmd_consumers/%s/%d", rd_kafka_topic_name(rkmessage->rkt), g_conf.consumer_group_id);
        sprintf(buffer, "%"PRId64, rkmessage->offset);
        r = zoo_set(g_conf.zh, path, buffer, sizeof(buffer), -1);
        if(r != ZOK)
        {
            kafkacli_log(KFCLI_LOG_ERROR,
                 "zookeeper",
                 "failed to set offset:%s at%"PRId64,
                 buffer, rkmessage->offset);
            g_conf.run = 0;
            return;
        }

        kafkacli_log(KFCLI_LOG_WARN,
                 "zookeeper",
                 "set offset:%s",
                 buffer);
        #endif
    }

}

@edenhill
Copy link
Contributor

Looks okay.
Could you try running the program with valgrind to find out why it crashes?
valgrind ./your-program ... ?

@mahb94
Copy link
Author

mahb94 commented Apr 21, 2015

I will try it! But I'm not sure when this bug will appear again. Thx for reply:)

@edenhill
Copy link
Contributor

Also, would it be possible to try out the latest master version?

@mahb94
Copy link
Author

mahb94 commented Apr 21, 2015

Not sure. I can update my librdkafka first.

@mahb94
Copy link
Author

mahb94 commented Apr 21, 2015

another type coredump(most coredump files are the two types):
rdkafka.c:200 is free(rko->rko_payload);

(gdb) bt
#0 0x0000003dbae32635 in raise () from /lib64/libc.so.6
#1 0x0000003dbae33e15 in abort () from /lib64/libc.so.6
#2 0x0000003dbae70547 in __libc_message () from /lib64/libc.so.6
#3 0x0000003dbae75e76 in malloc_printerr () from /lib64/libc.so.6
#4 0x00007f1d62bb4b64 in rd_kafka_op_destroy (rko=0x7f1d380008c0) at rdkafka.c:200
#5 0x00007f1d62bc3d64 in rd_kafka_broker_metadata_reply (rkb=0x7f1d40004170, err=0, reply=0x7f1d38000b00, request=0x7f1d38001420,
opaque=0x7f1d380008c0) at rdkafka_broker.c:1041
#6 0x00007f1d62bc0037 in rd_kafka_req_response (rkb=0x7f1d40004170) at rdkafka_broker.c:1311
#7 rd_kafka_recv (rkb=0x7f1d40004170) at rdkafka_broker.c:1503
#8 0x00007f1d62bc09f0 in rd_kafka_broker_io_serve (rkb=0x7f1d40004170) at rdkafka_broker.c:2436
#9 0x00007f1d62bc1017 in rd_kafka_broker_consumer_serve (rkb=0x7f1d40004170) at rdkafka_broker.c:4001
#10 0x00007f1d62bc1e37 in rd_kafka_broker_thread_main (arg=0x7f1d40004170) at rdkafka_broker.c:4050
#11 0x0000003dbb6079d1 in start_thread () from /lib64/libpthread.so.0
#12 0x0000003dbaee886d in clone () from /lib64/libc.so.6

(gdb) p *rko
$1 = {rko_link = {tqe_next = 0x0, tqe_prev = 0x0}, rko_type = RD_KAFKA_OP_METADATA_REQ, rko_flags = 1, rko_msgq = {rkmq_msgs = {tqh_first = 0x0,
tqh_last = 0x0}, rkmq_msg_cnt = 0, rkmq_msg_bytes = 0}, rko_replyq = 0x0, rko_intarg = 1, rko_rkm = 0x0, rko_rkmessage = {
err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x0, partition = 0, payload = 0x7f1d380013e0, len = 0, key = 0x0, key_len = 0, offset = 0,
_private = 0x0}, rko_rkbuf = 0x0, rko_metadata = 0x0, rko_rktp = 0x0}

@edenhill
Copy link
Contributor

Any luck with valgrind?

@edenhill
Copy link
Contributor

Is still happening?

@mahb94
Copy link
Author

mahb94 commented Jun 25, 2015

I used valgrind and run the program for 2 weeks, it did not crash anymore. Without valgrind, it crashed again. very strange!

@DEvil0000
Copy link
Contributor

With valgrind and such tools there is no need to crash - can you add the valgrind output file?
Maybe we can see something in it that looks similar to your coredumps.

I think it might be multi-threading related. Try also running it with Helgrind instead of valgrind.
And of course you need to use the same setup and produce the same kind of workload.
More workload is better for problem detection.

@edenhill
Copy link
Contributor

Did you capture the log output from valgrind?
If there are no errors reported by valgrind and no crashes it probably means the issue is timing related (since running in valgrind will slow it down a lot).

@edenhill
Copy link
Contributor

Reopen if reseen

@chriscrenshaw
Copy link

I also have this issue

@edenhill
Copy link
Contributor

edenhill commented Aug 5, 2016

@chriscrenshaw Which librdkafka version are you on? Do you have a gdb stack trace?

@mahb94
Copy link
Author

mahb94 commented Sep 28, 2016

my problem solved. Sorry it is my own mistake

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants