Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to clear message queue? #990

Closed
TvoroG opened this issue Jan 9, 2017 · 13 comments
Closed

How to clear message queue? #990

TvoroG opened this issue Jan 9, 2017 · 13 comments

Comments

@TvoroG
Copy link

TvoroG commented Jan 9, 2017

Description

Is there a way to remove all messages from message queue when it is full (ENOBUFS)? I want to dump all messages to a file and clear the queue.

@TvoroG
Copy link
Author

TvoroG commented Jan 9, 2017

By the way, thank you for this lib! :)

@TvoroG
Copy link
Author

TvoroG commented Jan 9, 2017

As a workaround I can dump new messages to file instead of messages in queue.

@edenhill
Copy link
Contributor

edenhill commented Jan 9, 2017

There's been a couple of similar feature requests previously, so something like this is on the roadmap.

In my mind there are two API alternatives, which one would you prefer (or something else?):

  • an API that extracts the rd_kafka_message_t's from the internal queues and returns them in an array to the application.
  • a purge() API that fails all outstanding messages thus triggering a delivery report with err=ERR__PURGED for each message

Also:
would you like to do this for all queued messages or per topic+partition?

And what about messages that are in-flight to the broker, should they be included?

@TvoroG
Copy link
Author

TvoroG commented Jan 10, 2017

I would choose purge() version. In my case it would be enough to fail all queued messages.
What do you mean by in-flight messages? Is this messages that already have been sent to the broker?
I just don't want to lose any of messages and repeating of them would be bad too.

@edenhill
Copy link
Contributor

Exactly, the in-flight message are those that have been sent to the broker but we still havent received an ack for.
Including them in the purge may lead to duplicates if you choose to re-produce the purged messages at a later time.
Not including them in the purge may lead to message loss.

Messages can be in three different states in librdkafka:

  1. queued for transmission to broker - these are easy to purge, no side effects
  2. messages in-transit to broker, awaiting ack/reply
  3. delivered or failed messages awaiting delivery report callback to be called by poll()

purge() wont need to bother with 3. since they are already queued for the delivery report callbacks.
There could be an option to purge() on what to do with 2.: ignore them or fail them.

@edenhill
Copy link
Contributor

edenhill commented Jan 10, 2017

API suggestion:

/**
 * @brief Purge, fail and enqueue delivery report for produce():d messages.
 *
 * @param partitions: list of partitions to purge, or NULL for all partitions.
 * @param purge_flags: or:ed RD_KAFKA_MSG_F_QUEUED, RD_KAFKA_MSG_F_IN_FLIGHT
 * @param err: error code to fail messages with
 *
 * @returns the number of messages purged, or -1 no partitions were matched.
 */
size_t rd_kafka_purge (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *partitions.
                       int purge_flags, rd_kafka_resp_err_t err);

Usage example - purge queued but not transmitted messages, then wait for in-transit messages to finish:

size_t r;
r = rd_kafka_purge(rk, NULL, RD_KAFKA_MSG_F_QUEUED, RD_KAFKA_RESP_ERR__FAIL);
if (r > 0)
  rd_kafka_poll(rk, 0); // serve delivery reports for purged messages

/* Flush (wait for completion) remaining messages (those in-flight) */
rd_kafka_flush(rk, 5000);

(the poll is not really necessary since flush() does the same thing, but this is a bit more elaborate)

@TvoroG
Copy link
Author

TvoroG commented Jan 11, 2017

@edenhill, is rkb_outbufs from rd_kafka_broker_s relates to RD_KAFKA_MSG_F_QUEUED messages?

@edenhill
Copy link
Contributor

Yes, rkb_outbufs are buffers waiting to be transmitted while rkb_waitresp are buffers that have been transmitted that we're waiting response for.

@rthalley
Copy link
Contributor

+1 on this feature request for me. workarounds like lowering the message queue timeout don't work for me because I purposely want a good sized message queue time most of the time; only when I'm shutting down forcibly to I want to be able to cause shorter timeouts. I need the messages to resolve to avoid leaking.

@edenhill
Copy link
Contributor

edenhill commented May 10, 2017

Understood.

I guess it sort of makes sense to forcibly flush messages in queue on rd_kafka_destroy() and trigger the dr callbacks, seeing how the application will otherwise leak memory (in the case of pointer sharing).

The backside is that the rd_kafka_destroy() call will then change behaviour and possibly be unsafe for existing applications.

@rthalley
Copy link
Contributor

Yeah... my use case where this came up is a mirroring/transformation case, and there is bookkeeping and storage management stuff I need to do that wants every production attempt to end up in a DR callback.

@nmred
Copy link
Contributor

nmred commented Jun 30, 2017

+1 on this feature request for me

@CoderOverflow
Copy link

CoderOverflow commented Jul 5, 2017

Hi, @edenhill
although setting a big timeout can solve this problem, it maybe dangerous in the case of message hub service. big timeout value will lead to a huge memory cost. so I may prefer to use a small timeout value.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants