Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No error callback and hanging when publishing to non existent partition #39

Closed
winbatch opened this issue Dec 27, 2013 · 29 comments
Closed

Comments

@winbatch
Copy link

my topic is defined as having 10 partitions. (0-9) When I attempt to send to partition 10, I expected to get an error callback. However, the poll (which I set to -1) just hangs. Shouldn't I get an error callback? I know the API is aware this is a bad partition because my log callback is being called and includes this detail:
27-Dec-2013 11:44:21:918 [T07][3828] kafka/kafka.cc:23 fac:'PART' name:'LARA#producer-0' log:'LaraReplicator_kafkacluster2 partition [10] not currently available'^M

@winbatch
Copy link
Author

Actually looks like eventually (5 minutes but I believe configurable) you get a msg delivery callback with a 'Local: Message timed out'. So I guess I the end I'm requesting to get a callback as soon as it's known the partition is wrong

@edenhill
Copy link
Contributor

The thing is that the partition might not be available because the broker is down or connectivity to the broker is down, librdkafka cant really know, so it will hold on to the message for as long as the local message timeout is configured (message.timeout.ms, defaults to 5 minutes), before failing the message.

If you provide your own partitioner function you can use rd_kafka_topic_partition_available() to check if a partition is available (has a leader broker) before producing to it.

@winbatch
Copy link
Author

I haven't looked too deeply into librdkafka's metadata information, but is it able to get from the broker the number of partitions that have been configured for a particular topic? If so, I might be able to use that. Right now, I force the user of my app to tell me the number of partitions there are - I'd rather it be more dynamic/automatic.

@edenhill
Copy link
Contributor

If you configure a custom partitioner with rd_kafka_topic_set_partitioner_cb() it will be called for each message, and one of its arguments is partition_cnt, the numbef of partitions currently available for that topic.

Partitioning is usually done for one of two reasons:

  • Load balancing - use the default random partitioner
  • Data specific segmenting/sharding: i.e., all messages for a specific user or web page goes to one partition to ensure ordering.

In the latter case the partitioner must have some kind of information to make its partition decision on, and that information is the optional key as provided to the produce() functiom.
This key, an arbitrary binary blob which is usually a string, is provided by the application per message, used by the producer's partitioner, and forwarded along with the message to the broker and the consumer(s).

@winbatch
Copy link
Author

To be clear, is 'partition_cnt' the number of partitions that are currently available (as in the servers are up and running) or configured on the broker. I was hoping for the latter but having a feeling it's the former?

My use case is that I need to make sure certain streams always go to the same partition and don't want it to be random. I don't care which partition is initially chosen for a stream, but I do care that no matter what, once chosen it stays there.

@edenhill
Copy link
Contributor

partition_cnt is the number of configured partitions on the broker.

You can use rd_kafka_topic_partition_available() from inside a partitioner function to check if a certain partition is currently available (has a leader broker) or not.

@winbatch
Copy link
Author

So here's where I'm confused. My original question was whether or not librdkafka could be enhanced to reject my produce attempt when it I attempt to send to a particular partition # if it's actually beyond the number configured for that topic (remember the 0-9 vs. #10 example). It would seem then (based on your response about partition_cnt) that librdkafka does know the number configured and therefore could in fact be enhanced to reject it. Did I get that right?

@edenhill
Copy link
Contributor

librdkafka is asynchronous by nature: an appplication does not need to care or know about the current state of brokers, connectivty, etc. librdkafka takes care of queuing messages when brokers are down, resending when broker leaders change, and reporting back to the application when all attempts have failed.

It is thus not safe to discard a message at produce() time because a partition is not available: the application might have started to produce message's before the initial broker connection has been set up, or before all relevant brokers have been connected to - rejecting a message at this point is not helpful for the application and just leads to timing issues.

So, in your case, if you produce to a partition that is not available, that message will eventually time out (message.timeout.ms) because during the messages lifetime the desired partition did not materialise on any broker. This could be optimised of course; librdkafka could fail the message as soon as it has received an updated metadata list from brokers.
But it is still an async approach, and the application shall use the delivery report callbacks to check for these errors.

What you seem to be asking for is a synch interface that blocks until the message is delivered or would fail delivery. This is possible to implement both in librdkafka (issue #32) and the application (by doing a single produce() and then waiting for the delivery report by doing poll()).
But the big problem with sync interfaces is that they kill performance. The throughput will be limited to the RTT to the broker. RTT of 1 ms = max 1000 messages per second, 5ms = 200 messages per second, and so on.

So my suggestion to you is to construct your application with asynchronisity and late failures in mind.
Produce the message, provide a per-message msg_opaque pointer for tracking if necessary, and handle any errors in the delivery report callback.

Solutions:

  • design your application with async in mind (above suggestion)
  • librdkafka provides a sync interface (slow)
  • librdkafka provides a method for checking if a partition is available prior to producing (but this will lead to false drops - the partition might exist but the current broker connection is down for some reason).

Hope this clears things up a bit.

@winbatch
Copy link
Author

I'm still confused in this regard:
'
It is thus not safe to discard a message at produce() time because a
partition is not available: the application might have started to produce
messages before the initial broker connection has been set up, or before
all relevant brokers have been connected to'

Based on the above, how are you able to reliably provide the partition_cnt
in the partitioner callback? To me, that would either not be safe for the
same reason, or would be safe to the point that it could reject the produce
for my scenario- I was looking for immediate rejection for when the
partition attempted was > number configured, not for when the partition was
unavailable. (Which I'm ok to wait for the timeout)
On Monday, December 30, 2013, Magnus Edenhill wrote:

librdkafka is asynchronous by nature: an appplication does not need to
care or know about the current state of brokers, connectivty, etc.
librdkafka takes care of queuing messages when brokers are down, resending
when broker leaders change, and reporting back to the application when all
attempts have failed.

It is thus not safe to discard a message at produce() time because a
partition is not available: the application might have started to produce
messages before the initial broker connection has been set up, or before
all relevant brokers have been connected to - rejecting a message at this
point is not helpful for the application and just leads to timing issues.

So, in your case, if you produce to a partition that is not available,
that message will eventually time out (message.timeout.ms) because during
the messages lifetime the desired partition did not materialise on any
broker. This could be optimised of course; librdkafka could fail the
message as soon as it has received an updated metadata list from brokers.
But it is still an async approach, and the application shall use the
delivery report callbacks to check for these errors.

What you seem to be asking for is a synch interface that blocks until the
message is delivered or would fail delivery. This is possible to implement
both in librdkafka (issue #32#32)
and the application (by doing a single produce() and the waiting for the
delivery report by doing poll()).
But the big problem with sync interfaces is that they kill performance.
The throughput will be limited to the RTT to the broker. RTT of 1 ms = max
1000 messages per second, 5ms = 200 messages per second, and so on.

So my suggestion to you is to construct your application with
asynchronisity and late failures in mind.
Produce the message, provide a per-message msg_opaque pointer for tracking
if necessary, and handle any errors in the delivery report callback.

Solutions:

  • design your application with async in mind (above suggestion)
  • librdkafka provides a sync interface (slow)
  • librdkafka provides a method for checking if a partition is
    available prior to producing (but this will lead to false drops - the
    partition might exist but the current broker connection is down for some
    reason).

Hope this clears things up a bit.


Reply to this email directly or view it on GitHubhttps://github.com//issues/39#issuecomment-31338158
.

@edenhill
Copy link
Contributor

The partitioner callback is not called until the partition count has been retrieved from the broker (metadata response).

In the current implementation messages that are produced to a partition that doesnt exist will linger in the internal queue until they time out. But I will change that behaviour to fail those messages as soon as the metadata is received from the broker, and failing them with a proper error code (no such partition) instead of timeout.

It would also be possible to reject messages at produce() time by using the last cached metadata information, even though this information may be outdated (migrating from a a separated broker to a broker that is properly connected to the cluster - but this is a rare case). I will add support for this aswell. The problem here is with the messages produced before a connection to a broker has been established - are these messages to be dropped immediately or queued until a metadata response has been received? I would say the latter.

So to sum it up:

  • fail produced messages for non-existent partitions as soon as we retrieve the full partition list from the broker
  • fail produce() call if partition is not available.

@winbatch
Copy link
Author

Sounds great. Thanks very much

On Monday, December 30, 2013, Magnus Edenhill wrote:

The partitioner callback is not called until the partition count has been
retrieved from the broker (metadata response).

In the current implementation messages that are produced to a partition
that doesnt exist will linger in the internal queue until they time out.
But I will change that behaviour to fail those messages as soon as the
metadata is received from the broker, and failing them with a proper error
code (no such partition) instead of timeout.

It would also be possible to reject messages at produce() time by using
the last cached metadata information, even though this information may be
outdated (migrating from a a separated broker to a broker that is properly
connected to the cluster - but this is a rare case). I will add support for
this aswell but make it up to the application to turn it on or off by using
the msgflags in the produce() call. The problem here is with the messages
produced before a connection to a broker has been established - are these
messages to be dropped immediately or queued until a metadata response has
been received? I would say the latter.

So to sum it up:

  • fail produced messages for non-existent partitions as soon as we
    retrieve the full partition list from the broker
  • fail produce() call if partition is not available (and
    RD_KAFKA_MSG_F_REQPART is set in msgflags).


Reply to this email directly or view it on GitHubhttps://github.com//issues/39#issuecomment-31347185
.

edenhill added a commit that referenced this issue Jan 4, 2014
Two error paths:
 - produce() will fail with errno set to ESRCH
 - dr_cb() will provide err RD_KAFKA_RESP_ERR__UNKNOWN_PARTITTION
edenhill added a commit that referenced this issue Jan 4, 2014
The way msgtimeout test was implemented will not work any more with
the fix for issue #39
@edenhill
Copy link
Contributor

edenhill commented Jan 4, 2014

Please update your local master branch and give it a go, producing to unknown partitions should now fail in one of two ways (depending on if the metadata has been fetched from the broker yet):

Two error paths:

  • produce() will fail with errno set to ESRCH
  • dr_cb() will provide err RD_KAFKA_RESP_ERR__UNKNOWN_PARTITTION

Please close this issue if you are able to verify the fix. Thank you.

@winbatch
Copy link
Author

winbatch commented Jan 8, 2014

Is there any way you can have it fail also if the topic doesn't exist and aio create is false? Right now it times out.

@edenhill
Copy link
Contributor

edenhill commented Jan 8, 2014

Good point, I'll look into it.

@winbatch
Copy link
Author

winbatch commented Jan 8, 2014

The invalid partition check didn't seem to work. It ends up calling my partitioner function multiple times and then failed with a local message timeout.

@edenhill
Copy link
Contributor

edenhill commented Jan 8, 2014

Could you reproduce this with "debug" config property set to "broker,metadata,topic,msg" ?
Feel free to mail it to me directly if you dont want to make it public: rdkafka@edenhill.se

@winbatch
Copy link
Author

In rdkafka_performance the invalid partition checks works. Just need the invalid topic check now (immediate fail rather than timeout)

@edenhill
Copy link
Contributor

Will be looking at this the coming days.
I assume that you have auto.create.topics.enable set to false in your brokers?

@winbatch
Copy link
Author

Correct

On Friday, January 17, 2014, Magnus Edenhill notifications@github.com
wrote:

Will be looking at this the coming days.
I assume that you have auto.create.topics.enable set to false in your
brokers?


Reply to this email directly or view it on GitHubhttps://github.com//issues/39#issuecomment-32595296
.

@winbatch
Copy link
Author

Hey Magnus - just checking if you've had a chance to look at this one yet?

edenhill added a commit that referenced this issue Jan 25, 2014
… problem for produced messages.

rd_kafka_produce() will now return -1 and set errno to ENOENT when
the topic is unknown in the cluster, and the DR callback will use
new error code RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC to indicte the same.

This also fixes an issue where messages could be produced out of order
during initial broker setup in some circumstances.
@edenhill
Copy link
Contributor

Unknown partitions and topics should now fail as soon as possible, both through rd_kafka_produce() and the dr_cb.

Could you please verify this in your setup aswell?

@winbatch
Copy link
Author

The topic one works, but unfortunately I have to specifically check for ENOENT and log appropriately myself because strerror(errno) will indicate 'no such file or directory' which will be misleading to my users.

@winbatch
Copy link
Author

Also, could we also enhance to error out early (and not timeout) if the required acks flag is > ISR's?

@edenhill
Copy link
Contributor

Yeah, using errno for error propagation was a mistake. It will be fixed in the next version of the API.
But until then I see a couple of alternatives:

  • Application translates errno back to a rd_kafka_resp_err_t according to the rd_kafka_produce() documentation.
  • librdkafka provides a rd_kafka_errno2err() function that does the same
  • librdkafka provides a rd_kafka_produce2() which returns a rd_kafka_resp_err_t. This is the cleanest fix now, but the function will be removed when the new API is implemented which might be a hassle for the app developer.

What would your preference be?

@winbatch
Copy link
Author

#2

On Monday, January 27, 2014, Magnus Edenhill notifications@github.com
wrote:

Yeah, using errno for error propagation was a mistake. It will be fixed in
the next version of the API.
But until then I see a couple of alternatives:

  • Application translates errno back to a rd_kafka_resp_err_t according
    to the rd_kafka_produce() documentation.
  • librdkafka provides a rd_kafka_errno2err() function that does the
    same
  • librdkafka provides a rd_kafka_produce2() which returns a
    rd_kafka_resp_err_t. This is the cleanest fix now, but the function will be
    removed when the API is implemented which might be a hassle for the app
    developer.

What would your preference be?


Reply to this email directly or view it on GitHubhttps://github.com//issues/39#issuecomment-33395697
.

@edenhill
Copy link
Contributor

Also, could we also enhance to error out early (and not timeout) if the required acks flag is > ISR's?

The client does not know about the ISR count, this will have to be enforced by the broker but doesn't seem to be.
In my tests with one broker (ISR=1) the message does not time out but is delivered immediately, seemingly disregarding the extra brokers in requiest.required.acks. I guess the value is capped on the broker to the replica count for the topic.

edenhill added a commit that referenced this issue Jan 27, 2014
…ors (issue #39)

This also changes the behaviour of rd_kafka_consume*() to set errno
to ESRCH for unknown partitions (was ENOENT) to be in line with
rd_kafka_produce() behaviour.
@edenhill
Copy link
Contributor

There, rd_kafka_errno2err(), give it a go.

@edenhill
Copy link
Contributor

Please verify this on your side and close the issue.
Thanks

@edenhill
Copy link
Contributor

Reopen if still an issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants