Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New producer design [beta] #132

Merged
merged 13 commits into from
Nov 13, 2014
Merged

New producer design [beta] #132

merged 13 commits into from
Nov 13, 2014

Conversation

eapache
Copy link
Contributor

@eapache eapache commented Jul 9, 2014

@wvanbergen @burke @sirupsen @graemej and whoever else might be interested.

This is an idea for a different API for the producer that is somewhat more channel/goroutine based. I was thinking about alternative architectures after reviewing the PR which adds the channels returning failed/successful messages. I started playing with things (after also reviewing the original multiproducer PRs) and came up with this.

It ended up being basically a complete rewrite, so reviewing the diff won't be too helpful, just read the new producer.go straight. It's very much a work-in-progress, I'm more interested in architectural thoughts right now than code nits. It seems to pass the simple tests, but that's primarily accidental, shrug.

Misc. notes on things that might cause questions:

  • User communication (input of new messages, receipt of errors) is all done with channels now, so the common idiom I expect would be a select on those two channels to avoid blocking.
  • It doesn't have a "synchronous" mode, instead the separate SimpleProducer uses the AckSuccesses flag to effectively imitate that.
  • In the normal success path, a message ends up passing through five goroutines before being bundled into a message and passed to the Broker object to put on the wire. Not sure if this will impact performance noticeably, but it means that if any intermediate step blocks (i.e. a metadata request for a new leader for a topic) then all other messages not directly impacted will continue processing, which is nice (assuming you configure a large enough channel buffer).
  • I delayed actually converting the Key/Value elements to bytes until the very last moment. This seems cleaner, and also potentially simplifies the addition of streaming messages which somebody asked for recently.
  • Hopefully the retry path is easier to reason about than the old one, as it is exactly the same as the normal path now (the message gets resubmitted on exactly the same channel as the user submits messages on, it just has an extra flag set). It should still preserve ordering correctly though (see the leaderDispatcher logic for this).

@eapache
Copy link
Contributor Author

eapache commented Jul 9, 2014

@wvanbergen interesting, the test failure here is exactly the same heisenbug that we've been seeing with the old producer - this quite effectively rules out a producer bug :)

@wvanbergen
Copy link
Contributor

Interesting. So probably it's in the test then. I don't understand what could be wrong with it though; it's very straightforward.

@wvanbergen
Copy link
Contributor

Pinging @mkobetic and @snormore .

@eapache
Copy link
Contributor Author

eapache commented Jul 26, 2014

After letting it sit for a while and doing a little additional cleanup, I'm starting to feel more comfortable with this design.

}

// Errors is the output channel back to the user. If you do not read from this channel,
// the Producer may deadlock. It is suggested that you send messages and read errors in a select statement.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may -> will

since consuming those events is a pretty hard requirement:

  1. Mention it very clearly in the package docs; or
  2. Take a function as argument in the constructor which is used to send error to; or
  3. Do your own leaky select logic, and log errors that are dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I've clarified that.

@wkuranowski
Copy link

I am testing your new producer and it seems to work just fine. I am also unable to reproduce #150. So is it possible for your producer to send more than FlushByteCount bytes to Kafka Broker (compressed or not) and exceed message.max.bytes?

I will continue my testing tomorrow.

@eapache
Copy link
Contributor Author

eapache commented Oct 15, 2014

I believe #150 is still a theoretical problem with this implementation, though it may be somewhat more difficult to trigger due to unrelated changes.

@wkuranowski
Copy link

If it's possible to trigger that issue, do you have any workaround for producer-ng?
I must say, that this issue is really nasty for us. When we want to send messages with max size 1MB (default for message.max.bytes) with an old producer, we need to increase message.max.bytes on brokers to about 10MB because of that issue. Then it's necessary to set fetch.message.max.bytes to the same value on all consumers. If a consumer (Java client) consumes from about 400 partitions we get OOM very often (16GB heap space for JVM), because it needs 10x more memory than with default settings. Reducing number of threads and kafka streams (and queued.max.message.chunks) on consumer side helps a bit, but performance is lower...

I will be incredibly thankful if you can fix that for a new producer! :)

@eapache
Copy link
Contributor Author

eapache commented Oct 15, 2014

@wkuranowski I just pushed a fix to this branch; I didn't test it, but if my understanding is correct it should do the right thing.

@wkuranowski
Copy link

Thank you very much!!! Patch looks good. I will test it tomorrow.

After reading your code I have a question. Is it possible (under some rare conditions) to build request bigger than MaxRequestSize in buildRequest() method?

@eapache
Copy link
Contributor Author

eapache commented Oct 16, 2014

The messageAggregator ensures that the size of the messages to pack does not exceed MaxRequestSize by checking forceFlushThreshold(), although looking closely I see a small error with that logic which I will fix shortly.

Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
FlushMsgCount int // The number of messages needed to trigger a flush.
FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued.
FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the message count and byte count are on a per partition basis, right? Do we want to make that explicit in the docs as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-broker.

@wvanbergen
Copy link
Contributor

/cc @cyprusad @snormore @mkobetic - this may be of your interest.

@wkuranowski
Copy link

@eapache I am producing messages to the input channel from multiples goroutines. I see that this channel is not buffered. Do you consider adding a buffer to that channel or should I stop producing from multiple goroutines? :)

@eapache
Copy link
Contributor Author

eapache commented Oct 16, 2014

The goroutine consuming from that channel is very fast, so even producing from multiple goroutines shouldn't block much if at all. Do you have a benchmark showing this to be a problem?

@mkobetic
Copy link

mkobetic commented Nov 3, 2014

I didn't run any experiments myself, but I'd be surprised if buffering didn't have notable effect. Is it easy to get a blocking profile from some of your experiments? Alternatively do you have a testing setup here that I could play with? I'd be quite interested to sit down for a walk through although this week we have a remote coworker visiting Ottawa so I'd rather not divert my attention too much. Would sometime next week work for you? @snormore interested to join as well?

@eapache
Copy link
Contributor Author

eapache commented Nov 3, 2014

There is a vagrantfile in this repo: you can vagrant up to get a working five-broker cluster. I've been benchmarking with https://github.com/eapache/scratch/blob/master/main.go, though note that golang's benchmarking is broken on OSX so you'll have to run that from elsewhere (your other vagrant will work fine).

@eapache
Copy link
Contributor Author

eapache commented Nov 3, 2014

Quick adhoc profiling suggests that (both with and without buffering) the cost of the channel operations is basically noise compared to the other things that we have to do (CRC calculations, writing data to the socket, optional compression, etc).

@snormore
Copy link

snormore commented Nov 4, 2014

I'd be interested in a walk-through for sure!

@mkobetic
Copy link

mkobetic commented Nov 4, 2014

Here are two blocking profiles of running your scratch app in vagrant for 1 minute. The first is with ChannelBufferSize = 0 and the second with ChannelBufferSize = 10. At first sight it seems like they are largely the same, but if you look closer there are notable differences in the amount of time the various goroutines spend blocking. I suspect the total blocking time is largely the same because the duration is fixed, but I wonder if there is a difference in the total throughput between the two cases. Might be worth adding some stats in that regard to the demo. Here are the tweaks I made to gather the profiles https://github.com/mkobetic/scratch/compare/flags.

Total: 414.773 seconds
   0.000   0.0%   0.0%  414.773 100.0% runtime.gosched0
   0.000   0.0%   0.0%  388.649  93.7% github.com/Shopify/sarama.withRecover
 186.865  45.1%  45.1%  186.865  45.1% runtime.chanrecv2
 133.296  32.1%  77.2%  133.296  32.1% runtime.selectgo
   0.000   0.0%  77.2%   86.800  20.9% github.com/Shopify/sarama.(*Producer).leaderDispatcher
   0.000   0.0%  77.2%   86.800  20.9% github.com/Shopify/sarama.func·009
   0.000   0.0%  77.2%   69.933  16.9% github.com/Shopify/sarama.(*Broker).responseReceiver
   0.000   0.0%  77.2%   69.933  16.9% github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm
   0.000   0.0%  77.2%   60.000  14.5% github.com/Shopify/sarama.(*Producer).retryHandler
   0.000   0.0%  77.2%   60.000  14.5% github.com/Shopify/sarama.*Producer.(github.com/Shopify/sarama.retryHandler)·fm
  60.000  14.5%  91.7%   60.000  14.5% runtime.chanrecv1
   0.000   0.0%  91.7%   57.862  14.0% github.com/Shopify/sarama.(*Producer).flusher
   0.000   0.0%  91.7%   57.862  14.0% github.com/Shopify/sarama.func·010
   0.000   0.0%  91.7%   57.407  13.8% github.com/Shopify/sarama.(*Producer).messageAggregator
   0.000   0.0%  91.7%   57.407  13.8% github.com/Shopify/sarama.func·011
   0.000   0.0%  91.7%   49.997  12.1% github.com/Shopify/sarama.(*Broker).sendAndReceive
   0.000   0.0%  91.7%   49.996  12.1% github.com/Shopify/sarama.(*Broker).Produce
  34.609   8.3% 100.0%   34.609   8.3% runtime.chansend1
   0.000   0.0% 100.0%   31.464   7.6% github.com/Shopify/sarama.(*Producer).topicDispatcher
   0.000   0.0% 100.0%   31.464   7.6% github.com/Shopify/sarama.*Producer.(github.com/Shopify/sarama.topicDispatcher)·fm
   0.000   0.0% 100.0%   26.124   6.3% main.main
   0.000   0.0% 100.0%   26.124   6.3% runtime.main
   0.000   0.0% 100.0%   25.183   6.1% github.com/Shopify/sarama.(*Producer).partitionDispatcher
   0.000   0.0% 100.0%   25.183   6.1% github.com/Shopify/sarama.func·008
   0.000   0.0% 100.0%    0.009   0.0% github.com/Shopify/sarama.(*Producer).Close
   0.000   0.0% 100.0%    0.009   0.0% main.func·002
   0.000   0.0% 100.0%    0.003   0.0% github.com/Shopify/sarama.(*Broker).send
   0.003   0.0% 100.0%    0.003   0.0% sync.(*Mutex).Lock
   0.000   0.0% 100.0%    0.001   0.0% github.com/Shopify/sarama.(*Broker).GetMetadata
   0.000   0.0% 100.0%    0.001   0.0% github.com/Shopify/sarama.(*Client).RefreshAllMetadata
   0.000   0.0% 100.0%    0.001   0.0% github.com/Shopify/sarama.(*Client).refreshMetadata
   0.000   0.0% 100.0%    0.001   0.0% github.com/Shopify/sarama.NewClient
Total: 412.782 seconds
   0.000   0.0%   0.0%  412.782 100.0% runtime.gosched0
   0.000   0.0%   0.0%  368.923  89.4% github.com/Shopify/sarama.withRecover
 177.621  43.0%  43.0%  177.621  43.0% runtime.chanrecv2
 167.884  40.7%  83.7%  167.884  40.7% runtime.selectgo
   0.000   0.0%  83.7%   70.265  17.0% github.com/Shopify/sarama.(*Producer).messageAggregator
   0.000   0.0%  83.7%   70.265  17.0% github.com/Shopify/sarama.func·011
   0.000   0.0%  83.7%   65.971  16.0% github.com/Shopify/sarama.(*Broker).responseReceiver
   0.000   0.0%  83.7%   65.971  16.0% github.com/Shopify/sarama.*Broker.(github.com/Shopify/sarama.responseReceiver)·fm
   0.000   0.0%  83.7%   59.982  14.5% github.com/Shopify/sarama.(*Producer).retryHandler
   0.000   0.0%  83.7%   59.982  14.5% github.com/Shopify/sarama.*Producer.(github.com/Shopify/sarama.retryHandler)·fm
  59.982  14.5%  98.2%   59.982  14.5% runtime.chanrecv1
   0.000   0.0%  98.2%   58.778  14.2% github.com/Shopify/sarama.(*Producer).flusher
   0.000   0.0%  98.2%   58.778  14.2% github.com/Shopify/sarama.func·010
   0.000   0.0%  98.2%   53.934  13.1% github.com/Shopify/sarama.(*Broker).sendAndReceive
   0.000   0.0%  98.2%   53.932  13.1% github.com/Shopify/sarama.(*Broker).Produce
   0.000   0.0%  98.2%   51.541  12.5% github.com/Shopify/sarama.(*Producer).leaderDispatcher
   0.000   0.0%  98.2%   51.541  12.5% github.com/Shopify/sarama.func·009
   0.000   0.0%  98.2%   46.980  11.4% github.com/Shopify/sarama.(*Producer).partitionDispatcher
   0.000   0.0%  98.2%   46.980  11.4% github.com/Shopify/sarama.func·008
   0.000   0.0%  98.2%   43.859  10.6% main.main
   0.000   0.0%  98.2%   43.859  10.6% runtime.main
   0.000   0.0%  98.2%   15.405   3.7% github.com/Shopify/sarama.(*Producer).topicDispatcher
   0.000   0.0%  98.2%   15.405   3.7% github.com/Shopify/sarama.*Producer.(github.com/Shopify/sarama.topicDispatcher)·fm
   7.286   1.8% 100.0%    7.286   1.8% runtime.chansend1
   0.000   0.0% 100.0%    0.008   0.0% github.com/Shopify/sarama.(*Broker).send
   0.008   0.0% 100.0%    0.008   0.0% sync.(*Mutex).Lock
   0.000   0.0% 100.0%    0.002   0.0% github.com/Shopify/sarama.(*Producer).Close
   0.000   0.0% 100.0%    0.002   0.0% main.func·002
   0.000   0.0% 100.0%    0.002   0.0% github.com/Shopify/sarama.(*Broker).GetMetadata
   0.000   0.0% 100.0%    0.002   0.0% github.com/Shopify/sarama.(*Client).RefreshAllMetadata
   0.000   0.0% 100.0%    0.002   0.0% github.com/Shopify/sarama.(*Client).refreshMetadata
   0.000   0.0% 100.0%    0.002   0.0% github.com/Shopify/sarama.NewClient

@ORBAT
Copy link

ORBAT commented Nov 5, 2014

The current version of Producer#Close() will return a non-nil error ("Failed to deliver 0 messages.") even when there were no actual errors.

My fix was simply

if len(errors) > 0 {
    return errors
}

return nil

@eapache
Copy link
Contributor Author

eapache commented Nov 5, 2014

Thanks @ORBAT, I had assumed a typed nil (empty slice) would still compare as nil when put in an interface, but I guess not. Fixed now (and updated tests to check it as well).

Also fix one bug found by the tests
Also simplify an if/else into just an if as suggested by golint.
and generate a new partitioner for each topic
they are no longer necessary
This is what the scala client defaults to
Worked through the happens-before graph a little more formally and found some
(admittedly unlikely) cases where shutdown would fail/panic/hang/whatever. This
version *should* be correct.
A typed nil still compares non-nil when stuffed into an interface. Thanks to Tom
(ORBAT) for pointing this out.
select {
case msg = <-p.retries:
case p.input <- buf[0]:
buf = buf[1:]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to preallocate some reasonably sized buf and do this with copy(buf, buf[1:]), if we just reslice buf the append below is guaranteed to reallocate the buffer every time since it's appending past the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

append grows the backing slice to some sane multiple of the current number of elements (not the textbook 2x but something like that) when it needs to realloc, so I'm not sure what the concern is - not sure how we could end up reallocating every time?

For optimal performance we could use https://github.com/eapache/queue/ but given that this isn't a common path (only being used during leader election failover) I didn't bother.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't realize that append preallocates capacity. So my "reallocate every time" is clearly wrong. Shifting rather than reslicing would keep the active buffer region from creeping upwards and would probably reduce the number of reallocations still, but if we are talking infrequent reallocations then the costs of the frequent copying may well outweigh the cost of the reallocations so this definitely makes my suggestion a lot less compelling.

eapache added a commit that referenced this pull request Nov 13, 2014
@eapache eapache merged commit 55f2931 into master Nov 13, 2014
@eapache eapache deleted the producer-ng branch November 13, 2014 14:09
@pjvds
Copy link

pjvds commented Nov 17, 2014

Why is there concurrency in around SendMessage? In other words, why is the complete producer SendMessage locked by a mutex?

This feels like a flawed design which effect throughput. Now all routines that want to publish something are blocked and processed one by one. Shouldn't the currency at the level of broker workers?

@eapache
Copy link
Contributor Author

eapache commented Nov 17, 2014

@pjvds For efficient, asynchronous producing you can use a Producer (instead of a SimpleProducer) which does not have this limitation. For more context please see #187.

@pjvds
Copy link

pjvds commented Nov 17, 2014

@eapache Thanks for the response. The Producer API lacks of ack/nack feedback. Why isn't the error channel added to the MessageToSend struct? This way we can pass around the send context and ack or nack back to it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants