Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance and testing pass #42

Merged
merged 9 commits into from
Oct 19, 2016
Merged

Performance and testing pass #42

merged 9 commits into from
Oct 19, 2016

Conversation

webmakersteve
Copy link
Contributor

Lots of changes here and I'm not done. Just publishing this initially as a PR so it can get some visibility.

1. Producer now has a closer mapping to the underlying
RdKafka produce method. No longer does object enumeration
2. Consumer now copies memory instead of creating a reference
to the message in the underlying buffer. Should allow better
free'ing
3. Do version checking to support flush and additional constants
4. Export constants in C++ Land and enumerate the object on init
Intermediary class was doing little more than wrapping
the object so it could be checked for fatal errors.

Instead, consume can encapsulate message deleting
when it is an error and return a baton. Baton
has a valid message inside when it is not an error.

Then, we can leverage a converter which copies the data
over so the message can be safely deleted. This should
allow for graceful disconnects.
@webmakersteve
Copy link
Contributor Author

  1. Producer now has a closer mapping to the underlying RdKafka produce method. No longer does object enumeration. Should be about a 10-15% performance gain in producing especially with large messages.
  2. Consumer now copies memory instead of creating a reference to the message in the underlying buffer. Should allow better free'ing, but does slow performance down based on how large the messages read are.
  3. Do version checking to support flush and additional constants if they are in librdkafka
  4. Export constants in C++ Land and enumerate the object on library initialization to make them more maintainable

1. Got rid of the original produce method and other
deprecated messages that no one was using.
2. RdKafka::err2str now has an exported library function
so the internal error class can use it to turn any old
integer into an error string. This is currently implemented
for the synchronous produce method, which now returns the same
int that the librdkafka return did.
3. Updated benchmark and it is showing around a 50% improvement
over the old async + object enumeration implementation
@webmakersteve
Copy link
Contributor Author

webmakersteve commented Oct 17, 2016

Updated TopicWritable to use the new produce method

The way that stream backpressure works now that the method is synchronous:
The stream will keep writing to the producer until it gets a ERR__QUEUE_FULL. When it does, it holds off on writing and does not call the callback. It then retries the message in 500ms and furthermore until it gets a fatal error or the queue accepts the message.

You will be able to increase the amount the queue takes through the librdkafka configuration parameter..

Other change of note: consumed message data is no longer message.message. It is message.payload to be more in line with the librdkafka message object.

Also, at this point, converted end to end tests to use mocha, as the big thing stopping it was the graceful disconnect issue which appears to be as gone as it will be.

NOTE: sometimes, shutting down can take up to 10-15 seconds. I believe these are outgoing sockets still timing out in the background. It generally hangs that long on error'd requests, but it should still exit out eventually.

@edenhill
Copy link

edenhill commented Oct 18, 2016

You might be interested in the new .._F_MSG_BLOCK flag:
https://github.com/edenhill/librdkafka/blob/master/src-cpp/rdkafkacpp.h#L1608

Re message.message: standard Kafka semantics is actually message.value, so you might want to go with that rather than .payload.

Re slow shutdown: debug=broker,cgrp now (master) prints what a broker or cgrp might be stalling on when trying to shut down, be it outstanding requests, pending commits, etc.

@webmakersteve
Copy link
Contributor Author

@edenhill I did take note of that MSG_BLOCK flag but saw it was not included in the current release. Was going to put some precompiler directives to support the flag if it was available but it would change the underlying API so I opted not to until it's been released.

Re: message.message - Sure. Thanks for the suggestion! Makes sense to me.

Re: slow shutdown: I'll take a look today with debug config on and see what I can see. Thanks!

@webmakersteve
Copy link
Contributor Author

New Changes

  1. message.value is now the buffer consumed from kafka.
  2. Produced message now supports NULL payload
  3. Produces message now supports NULL key or empty string key

@webmakersteve
Copy link
Contributor Author

@edenhill From what I can gather it's another GC related issue. If a created topic object is not cleaned up when delete is called on the producer, it blocks in the destructor and never gets free'd. Can be worked around depending on how you structure your JS so I don't think it's a big problem, but will probably look into other ways to do it.

@webmakersteve
Copy link
Contributor Author

New Changes

  1. message.key is now null when there is no key present instead of undefined.

@webmakersteve
Copy link
Contributor Author

New Changes

  1. Added missing consumer method subscription
  2. Added missing consumer method position
  3. Added missing consumer method committed (async)
  4. Began refactor of topic readable to make it a first class citizen
  5. subscribe is now sync and throws
  6. unsubscribe is now sync and throws

1. Added missing consumer method `subscription`
2. Added missing consumer method `position`
3. Added missing consumer method `committed`
4. Began refactor of topic readable to make it a first class citizen
5. subscribe is now sync and throws
6. unsubscribe is now sync and throws
@webmakersteve webmakersteve merged commit 02e3f29 into master Oct 19, 2016
@webmakersteve webmakersteve deleted the performance-pass branch October 19, 2016 22:05
@ottomata ottomata mentioned this pull request Oct 24, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants