Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autocommit offset does not work if i use high level api #480

Closed
ago3x opened this issue Dec 28, 2015 · 35 comments
Closed

autocommit offset does not work if i use high level api #480

ago3x opened this issue Dec 28, 2015 · 35 comments

Comments

@ago3x
Copy link

ago3x commented Dec 28, 2015

I use the high level api(rd_kafka_subscribe and rd_kafka_consumer_poll) to consume messages,but when i restart the programe it will consume the message from offset 0.

@edenhill
Copy link
Contributor

Are you calling consumer_close() before exiting?

Can you reproduce this with examples/rdkafka_consumer_example ?

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

i don't call the consumer_close(),the offset record by consumer_close()?

@edenhill
Copy link
Contributor

You need to call consume_close() to properly shut down your consumer and let it perform final offset commits, group leaves, etc.

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

Have api to record offset by myself?

@edenhill
Copy link
Contributor

You may commit offsets yourself with rd_kafka_commit().

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

can't auto commit?

@edenhill
Copy link
Contributor

auto commit is enabled by default, but you must call consumer_close() before exiting.

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

I think this is not good,if my program crash the offset will not commit.

@edenhill
Copy link
Contributor

If auto commit is enabled (enable.auto.commit=true) offsets will be committed periodically (auto.commit.interval.ms) to the broker.

When exiting an application must also call consumer_close() to commit any offsets waiting to be committed from the last interval.

This is a pay off between performance and consistency.
If you commit the offset for each message you will get full consistency and no message replays on restart, but it will affect performance negatively.

@DavidLiuXh
Copy link

I set 'enable.auto.commit=true' and 'auto.commit.interval.ms', but can't periodically committed.

@edenhill
Copy link
Contributor

What do you set ..interval.ms to?
How many messages are you receiving?
How long is your program running?
Are you calling consumer_close()?

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

mee too

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

auto.commit.interval.ms=4000
about 20 messages
2min
no

@edenhill
Copy link
Contributor

Run your program with debug=topic and see if it says anything about offset commits.

Also, what is the reason for not calling consumer_close()?
librdkafka relies on it to perform a proper clean shut down.

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

RDKAFKA-7-TOPIC: rdkafka#consumer-0: New local topic: test1
RDKAFKA-7-DESP: rdkafka#consumer-0: Adding desired topic test1 [0]
RDKAFKA-7-OFFSET: rdkafka#consumer-0: PC-ZHENGJIYUE.hikvision.com:9092/0: Offset
FetchRequest(v1) for 1 partition(s)
RDKAFKA-7-OFFSET: rdkafka#consumer-0: Topic test1 [0]: setting default offset -1
001
RDKAFKA-7-CONSUMER: rdkafka#consumer-0: Start consuming test1 [0] at offset TAIL
(76843841185970498)
RDKAFKA-7-OP: rdkafka#consumer-0: test1 [0] received op FETCH_START (v2) in fetc
h-state none
RDKAFKA-7-FETCH: rdkafka#consumer-0: Start fetch for test1 [0] in state none at
offset TAIL(76843841185970498) (v2)
RDKAFKA-7-PARTSTATE: rdkafka#consumer-0: Partition test1 [0] changed fetch state
none -> offset-query
RDKAFKA-7-OFFSET: rdkafka#consumer-0: test1 [0]: offset reset (at offset TAIL(76
843841185970498)) to TAIL(76843841185970498): update: Success
RDKAFKA-7-OFFSET: rdkafka#consumer-0: test1 [0]: no current leader for partition
, starting offset query timer for offset TAIL(76843841185970498)
RDKAFKA-7-STATE: rdkafka#consumer-0: Topic test1 changed state unknown -> exists

RDKAFKA-7-PARTCNT: rdkafka#consumer-0: Topic test1 partition count changed from
0 to 1
RDKAFKA-7-BRKDELGT: rdkafka#consumer-0: Broker PC-ZHENGJIYUE.hikvision.com:9092/
0 is now leader for topic test1 [0] with 0 messages (0 bytes) queued
RDKAFKA-7-METADATA: rdkafka#consumer-0: PC-ZHENGJIYUE.hikvision.com:9093/1: Requ
ested topic test1 seen in metadata
RDKAFKA-7-OFFSET: rdkafka#consumer-0: Topic test1 [0]: timed offset query for TA
IL(76843841185970498)
RDKAFKA-7-OFFREQ: rdkafka#consumer-0: PC-ZHENGJIYUE.hikvision.com:9092/0: Partit
ion test1 [0]: querying for logical offset TAIL(76843841185970498) (opv 2)
RDKAFKA-7-OFFSET: rdkafka#consumer-0: PC-ZHENGJIYUE.hikvision.com:9092/0: Offset
Request (-1) for topic test1 [0]
RDKAFKA-7-PARTSTATE: rdkafka#consumer-0: Partition test1 [0] changed fetch state
offset-query -> offset-wait
RDKAFKA-7-OFFSET: rdkafka#consumer-0: Offset -76843841185972498 request for test
1 [0] returned offset 25 (25)
RDKAFKA-7-OFFSET: rdkafka#consumer-0: OffsetReply for topic test1 [0]: offset 25
: adjusting for OFFSET_TAIL(76843841185970498): effective offset 0
RDKAFKA-7-PARTSTATE: rdkafka#consumer-0: Partition test1 [0] changed fetch state
offset-wait -> active
RDKAFKA-7-FETCHADD: rdkafka#consumer-0: PC-ZHENGJIYUE.hikvision.com:9092/0: Adde
d test1 [0] to fetch list (1 entries)

@edenhill
Copy link
Contributor

This offset looks very weird:
RDKAFKA-7-CONSUMER: rdkafka#consumer-0: Start consuming test1 [0] at offset TAIL (76843841185970498)

Can you show me your code that calls rd_kafka_subscribe()?
And also, are you registering a rebalance_cb? If so, show me that code as well.
Thanks

@ago3x
Copy link
Author

ago3x commented Dec 28, 2015

static void rebalance_cb(rd_kafka_t *rk,
    rd_kafka_resp_err_t err,
    rd_kafka_topic_partition_list_t *partitions,
    void *opaque) {

    fprintf(stderr, "%% Consumer group rebalanced: ");

    switch (err)
    {
    case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
        fprintf(stderr, "assigned:\n");
        print_partition_list(stderr, 1, partitions);
        rd_kafka_assign(rk, partitions);
        break;

    case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
        fprintf(stderr, "revoked:\n");
        print_partition_list(stderr, 0, partitions);
        rd_kafka_assign(rk, NULL);
        break;

    default:
        fprintf(stderr, "failed: %s\n",
            rd_kafka_err2str(err));
        break;
    }
}


        topics = rd_kafka_topic_partition_list_new(1);
    rd_kafka_topic_partition_list_add(topics, "test1", 0);

    if ((err = rd_kafka_subscribe(rk, topics))) {
        return;
    }

    if (rd_kafka_brokers_add(rk, brokers) == 0) {
        return;
    }

    while (run) {
        rd_kafka_message_t *rkmessage;

        rkmessage = rd_kafka_consumer_poll(rk, 100);
        if (rkmessage) {
            msg_consume(rkmessage, NULL);
            rd_kafka_message_destroy(rkmessage);
        }
       }

rebalance_cb is your example's code.

@edenhill
Copy link
Contributor

Code looks good.

Can you reproduce this behaviour with examples/rdkafka_consumer_example?

@DavidLiuXh
Copy link

I just use examples/rdkafka_consumer_example, but can't auto commit offset:

 ./examples/rdkafka_consumer_example -b localhost:9192,localhost:9193,localhost:9194 -g rd_kafka_12 -X partition.assignment.strategy=roundrobin -A t_p_6:0 t_p
_6:1 t_p_6:2 

@ago3x
Copy link
Author

ago3x commented Dec 30, 2015

I run rdkafka_performance to produce message and rdkafka_consumer_example to consume message,the auto commit doesn't work.

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2016

One unrelated note: in balanced consumer group mode a consumer can only subscribe on topics, not topics:partitions, this means that the partitions you specify on the command line are ignored.
The partition syntax is only used for manual assignments (without balanced consumer groups).

@edenhill
Copy link
Contributor

edenhill commented Jan 2, 2016

I've pushed a number of fixes around offset handling, could you update to latest master and see if you can still reproduce the issue?

@DavidLiuXh
Copy link

I update last version, but but can't auto commit offset.

@ago3x
Copy link
Author

ago3x commented Jan 4, 2016

I found the reason.

  1. in source code rdkafka_conf.c:403
    { _RK_GLOBAL|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
    _RKT(auto_commit_interval_ms)
    It should be _RK(auto_commit_interval_ms).
  2. i must call rd_kafka_conf_set() to set the value of "auto.commit.interval.ms"(value >0),because the default value is 0.

@edenhill
Copy link
Contributor

edenhill commented Jan 4, 2016

@ago3x Good find, thanks!

@DavidLiuXh
Copy link

@ago3x I call rd_kafka_conf_set, but can't auto commit offset.

@ago3x
Copy link
Author

ago3x commented Jan 4, 2016

@DavidLiuXh rdkafka_conf.c:403
{ _RK_GLOBAL|_RK_CONSUMER, "auto.commit.interval.ms", _RK_C_INT,
_RKT(auto_commit_interval_ms)
_RKT(auto_commit_interval_ms) change to _RK(auto_commit_interval_ms)

@edenhill
Copy link
Contributor

edenhill commented Jan 4, 2016

Or simply update to latest master

@DavidLiuXh
Copy link

Thanks @ago3x and @edenhill.

@edenhill
Copy link
Contributor

edenhill commented Jan 5, 2016

Can you confirm that this fixes the issue?

@DavidLiuXh
Copy link

@edenhill
It works.

@edenhill
Copy link
Contributor

edenhill commented Jan 5, 2016

👍

@edenhill edenhill closed this as completed Jan 5, 2016
@ago3x
Copy link
Author

ago3x commented Jan 6, 2016

@edenhill i updated to latest master and run the rdkafka_consumer_example,it will core dumped soon.
the call stack is :
#0 0x00000037ff432925 in raise (sig=6) at ../nptl/sysdeps/unix/sysv/linux/raise.c:64
#1 0x00000037ff434105 in abort () at abort.c:92
#2 0x00000037ff470837 in __libc_message (do_abort=2, fmt=0x37ff558aa0 "*** glibc detected *** %s: %s: 0x%s ***\n") at ../sysdeps/unix/sysv/linux/libc_fatal.c:198
#3 0x00000037ff476166 in malloc_printerr (action=3, str=0x37ff558de0 "double free or corruption (out)", ptr=) at malloc.c:6336
#4 0x00000037ff478ca3 in _int_free (av=0x37ff78fe80, p=0x7fffe0003ad0, have_lock=0) at malloc.c:4832
#5 0x000000000043144d in rd_free (rktparlist=0x7fffe0002900) at rd.h:97
#6 rd_kafka_topic_partition_list_destroy (rktparlist=0x7fffe0002900) at rdkafka_partition.c:1727
#7 0x00000000004220aa in rd_kafka_op_destroy (rko=0x7fffec0008c0) at rdkafka_op.c:103
#8 0x0000000000423d4f in rd_kafka_op_handle_OffsetCommit (rkb=0x7fffec002050, err=, rkbuf=, request=0x0, opaque=0x7fffec0008c0) at rdkafka_request.c:663
#9 0x000000000042e69e in rd_kafka_cgrp_op_serve (rkcg=0x658a80, rkb=0x7fffec002050) at rdkafka_cgrp.c:1353
#10 rd_kafka_cgrp_serve (rkcg=0x658a80, rkb=0x7fffec002050) at rdkafka_cgrp.c:1528
#11 0x0000000000411b64 in rd_kafka_broker_serve (rkb=0x7fffec002050, timeout_ms=0) at rdkafka_broker.c:2072
#12 0x0000000000413c0b in rd_kafka_broker_consumer_serve (arg=0x7fffec002050) at rdkafka_broker.c:3191
#13 rd_kafka_broker_thread_main (arg=0x7fffec002050) at rdkafka_broker.c:3279
#14 0x00000000004367bf in _thrd_wrapper_function (aArg=) at tinycthread.c:596
#15 0x00000037ff8079d1 in start_thread (arg=0x7ffff4d61700) at pthread_create.c:301
#16 0x00000037ff4e8b5d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:115

@edenhill
Copy link
Contributor

edenhill commented Jan 6, 2016

This looks similar to issue #493

@ago3x
Copy link
Author

ago3x commented Jan 6, 2016

it will not happen when i disable the autocommit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants