-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Get ConsumerStats using Binary Protocol #216
Conversation
protobuf/README.md
Outdated
@@ -28,6 +29,7 @@ make | |||
|
|||
# Re-generate PulsarApi | |||
cd pulsar-path/pulsar-common | |||
PROTOC=~/protobuf/src/protoc ./generate_protobuf.sh | |||
PROTOC=~/protobuf/src/protoc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you break it in 2 lines, then you need to use export PROTOC=...
otherwise the variable will not picked up by the next script
} | ||
} | ||
|
||
private Map<String, String> createConsumerStatsMap(Consumer consumer, PersistentSubscription subscription) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not reuse the already existing ConsumerStats
class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need some subscriber level details as well - backlog, subscription type etc
But I have added a toMap method to convert the Class to a map
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdhabalia, @saandrews and @merlimat Would it be better if I add a class in the protobuf and client side instead of using a map?
|
||
ResultMessageTooBig /// Trying to send a messages exceeding the max size | ||
ResultMessageTooBig, /// Trying to send a messages exceeding the max size | ||
ResultConsumerStatsError /// Broker returned an error while trying to fetch consumer stats |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cannot we use the existing ResultError
instead of ResultConsumerStatsError
. If I am making a call to get consumer stats, there's no need to re-state the type of the request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- all the formatting changes are with /pulsar-client-cpp/eclipse-formatter.xml ??
@@ -484,6 +484,7 @@ flexible messaging model and an intuitive client API.</description> | |||
<exclude>pulsar-client-cpp/lib/checksum/int_types.h</exclude> | |||
<exclude>pulsar-client-cpp/lib/checksum/crc32c*</exclude> | |||
<exclude>pulsar-client-cpp/lib/lz4/lz4.*</exclude> | |||
<exclude>pulsar-client-cpp/.idea/*</exclude> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think cpp/.gitignore already contains: .idea so, we may not need this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Git ignore will prevent the idea folder to be checked in and this change will prevent the local build to fail
Object msg = null; | ||
try { | ||
PersistentTopic topic; | ||
topic = (PersistentTopic) getBrokerService().getTopicReference(topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need of multiple lines :
PersistentTopic topic = (PersistentTopic) getBrokerService().getTopicReference(topicName);
protobuf/README.md
Outdated
@@ -5,7 +5,7 @@ Pulsar uses protocol buffer messages for the client/broker wire protocol. | |||
|
|||
The protocol definition is located at `pulsar-common/src/main/proto/PulsarApi.proto` and the pre-generated Java code is at `pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java`. | |||
|
|||
When making a change to the `PulsarApi.proto` definition, we have regenerate the `PulsarApi.java` and include that in the same commit. | |||
When making a change to the `PulsarApi.proto` definition, we have to regenerate the `PulsarApi.java` and include that in the same commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now, cpp library is added, we can also mention : PulsarApi.pb.cc
log.debug("CommandConsumerStats[requestId = {}, topicName = {}, subscriptionName = {}, consumerId = {}]", requestId, topicName, subscriptionName, consumerId); | ||
} | ||
|
||
Object msg = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteBuf msg
requestId, topicName, subscriptionName, consumerId); | ||
} | ||
} catch (Exception e) { | ||
log.error("Exception: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should write appropriate message such as topic not found
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have written appropriate message in line 244, 240 and 236 (same file)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log remaining info as well in all the above error logs? topic name, requested, surname & consumerid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean error log such as: Failed to get consumer-stats response
so, it can help while reading log file where exactly it occurred.
} | ||
} catch (Exception e) { | ||
log.error("Exception: {}", e); | ||
msg = Commands.newConsumerStatsResponse("Exception: " + e, requestId, topicName, subscriptionName, consumerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here: error-msg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have written appropriate message in line 244, 240 and 236 (same file)
This case is for handling some exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this exception is sent to the client, shouldn't it be the one client knows about?
|
||
#pragma GCC visibility push(default) | ||
|
||
class PulsarFriend; | ||
|
||
namespace pulsar { | ||
|
||
typedef std::map<std::string, std::string> StatsMap; | ||
|
||
class BrokerSideConsumerStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we rename it as BrokerConsumerStats
or ConsumerStats
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already a ConsumerStats class in Java client which captures the message rate, acks etc as sent by client - created by Sangming and Sid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer BrokerConsumerStats or ConsumerStatsInfo.
@@ -117,7 +117,8 @@ SharedBuffer BatchMessageContainer::getBatchedPayload() { | |||
void BatchMessageContainer::clear() { | |||
LOG_DEBUG(*this << " BatchMessageContainer::clear() called"); | |||
timer_->cancel(); | |||
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/++numberOfBatchesSent_; | |||
averageBatchSize_ = (messagesContainerListPtr_->size() + (averageBatchSize_ * numberOfBatchesSent_))/(numberOfBatchesSent_ + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any difference here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, the compiler doesn't give ordering guarantees for ++ if used in complex calculations
Try compiling in your mac m/c and you will see the warning.
case CONSUMER_STATS: | ||
checkArgument(cmd.hasConsumerStats()); | ||
handleConsumerStats(cmd.getConsumerStats()); | ||
cmd.getConsumerStats().recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting
case CONSUMER_STATS_RESPONSE: | ||
checkArgument(cmd.hasConsumerStatsResponse()); | ||
handleConsumerStatsResponse(cmd.getConsumerStatsResponse()); | ||
cmd.getConsumerStatsResponse().recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting
just a thought:
|
protobuf/README.md
Outdated
|
||
# Apply patch | ||
patch -p1 < ../pulsar-path/protobuf/protobuf.patch | ||
## For C++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for C++ -> git checkout v2.6.0
|
||
message CommandConsumerStatsResponse { | ||
required uint64 request_id = 1; | ||
required string topic_name = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need topic_name
, subscription_name
, consumer_id
in response? can't we just return request_id
as client knows information attached with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can but a little more info might help in debug if we run into some trouble and since this call can be made at most once in 30 seconds - it won't harm the Network BW much.
Still if you feel strongly about it let me know - i will remove the fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should not send extra field in response except request_id
requestId, topicName, subscriptionName, consumerId); | ||
} | ||
} catch (Exception e) { | ||
log.error("Exception: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean error log such as: Failed to get consumer-stats response
so, it can help while reading log file where exactly it occurred.
@@ -266,7 +274,12 @@ class ClientConnection : public boost::enable_shared_from_this<ClientConnection> | |||
// Signals whether we're waiting for a response from broker | |||
bool havePendingPingRequest_; | |||
DeadlineTimerPtr keepAliveTimer_; | |||
DeadlineTimerPtr consumerStatsRequestTimer_; | |||
|
|||
void handleConsumerStatsTimeout(const boost::system::error_code &ec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need timer to expiry the request. we already have keep alive time which will break the connection if broker is not responding and it should clear-out pending requests.
@@ -60,6 +60,7 @@ namespace pulsar { | |||
virtual void redeliverUnacknowledgedMessages(); | |||
virtual const std::string& getName() const; | |||
virtual int getNumOfPrefetchedMessages() const ; | |||
virtual Result getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats, int partitionIndex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also provide getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed yesterday - We can the code is not difficult but the interface will become quite ugly since now we need to return a list of stats always.
pulsar-client-cpp/lib/Consumer.cc
Outdated
@@ -23,6 +23,17 @@ namespace pulsar { | |||
|
|||
const std::string EMPTY_STRING; | |||
|
|||
BrokerSideConsumerStats::BrokerSideConsumerStats():validTill_(now()) {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we initialize validTill_+X so it can be valid for X time.??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do that in ClientConnection
boost::posix_time::ptime currentTime = now() + milliseconds(consumerStatsTTLMs_);
consumerStatsPromise.setValue(BrokerSideConsumerStats(consumerStats, currentTime));
" Sending getConsumerStats command for Consumer - " << getConsumerId() << ", requestId - "<<requestId); | ||
|
||
BrokerSideConsumerStats consumerStats; | ||
Result res = cnx->newConsumerStats(topic_, subscription_, consumerId_, requestId).get(consumerStats); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a blocking call on future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then should we make it non blocking call by attaching the listener?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will block the user thread not out io thread, similar logic is used to producer.send and consumer.receive
Result Producer::send(const Message& msg) {
Promise<Result, Message> promise;
sendAsync(msg, WaitForCallbackValue<Message>(promise));
Message m;
Result result = promise.getFuture().get(m);
return result;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think Is it worth it to provide callback option also in case of client doesn't want to block such as: sendAsync(msg, callBack)
} | ||
return res; | ||
} else { | ||
LOG_ERROR(getName() << " In getConsumerStats call - Operation not supported since server protobuf version is older than proto::v6"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
v7?
} | ||
} catch (Exception e) { | ||
log.error("Exception: {}", e); | ||
msg = Commands.newConsumerStatsResponse("Exception: " + e, requestId, topicName, subscriptionName, consumerId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this exception is sent to the client, shouldn't it be the one client knows about?
requestId, topicName, subscriptionName, consumerId); | ||
} | ||
} catch (Exception e) { | ||
log.error("Exception: {}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log remaining info as well in all the above error logs? topic name, requested, surname & consumerid?
|
||
#pragma GCC visibility push(default) | ||
|
||
class PulsarFriend; | ||
|
||
namespace pulsar { | ||
|
||
typedef std::map<std::string, std::string> StatsMap; | ||
|
||
class BrokerSideConsumerStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer BrokerConsumerStats or ConsumerStatsInfo.
consumerStats[kv.key()] = kv.value(); | ||
LOG_DEBUG(kv.key() << " = " << kv.value()); | ||
} | ||
boost::posix_time::ptime currentTime = now() + milliseconds(consumerStatsTTLMs_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not really currentTime
|
||
for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap_.begin(); | ||
it != pendingConsumerStatsMap_.end(); ++it) { | ||
LOG_ERROR(cnxString_ << " In getConsumerStats call - Closing Client Connection, please try again later"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy pasted message? Also we shouldn't have to explicitly say "In getConsumerStats call"
@@ -686,4 +687,44 @@ int ConsumerImpl::getNumOfPrefetchedMessages() const { | |||
return incomingMessages_.size(); | |||
} | |||
|
|||
Result ConsumerImpl::getConsumerStats(BrokerSideConsumerStats& brokerSideConsumerStats, int partitionIndex) { | |||
if (partitionIndex != -1) { | |||
LOG_WARN(getName() << " In getConsumerStats call - Ignoring the partitionIndex since the topic is not partitioned") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove "In getConsumerStats call -"
} | ||
|
||
if (brokerSideConsumerStats_.isValid()) { | ||
LOG_INFO(getName() << " In getConsumerStats call - Serving data from cache"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why INFO log?
@@ -300,7 +392,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { | |||
}) // | |||
.exceptionally(exception -> { | |||
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, | |||
subscriptionName, exception.getCause().getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCause().getMessage() is not very informative
@@ -45,7 +45,9 @@ struct ClientConfiguration::Impl { | |||
operationTimeoutSeconds(30), | |||
messageListenerThreads(1), | |||
concurrentLookupRequest(5000), | |||
logConfFilePath() {} | |||
logConfFilePath(), | |||
useTls(false), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a few test runs I found that tls was automatically enabled - hence initialized the params
@@ -113,7 +114,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { | |||
lock.unlock(); | |||
|
|||
ClientImplPtr client = client_.lock(); | |||
int requestId = client->newRequestId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
datatype compatible with what we send over the wire
@@ -483,13 +484,13 @@ inline proto::CommandSubscribe_SubType ConsumerImpl::getSubType() { | |||
ConsumerType type = config_.getConsumerType(); | |||
switch (type) { | |||
case ConsumerExclusive: | |||
return proto::CommandSubscribe_SubType_Exclusive; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
both mean the same - changed since underscores were ugly
static const SubType Exclusive = CommandSubscribe_SubType_Exclusive;
static const SubType Shared = CommandSubscribe_SubType_Shared;
static const SubType Failover = CommandSubscribe_SubType_Failover;
@@ -300,7 +392,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { | |||
}) // | |||
.exceptionally(exception -> { | |||
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, | |||
subscriptionName, exception.getCause().getMessage()); | |||
subscriptionName, exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will be CompletionException
and it will not be helpful, we have to log exception.getCause()
} | ||
if (!consumerFound) { | ||
log.error( | ||
"Failed to get consumer-stats response - Consumer {} not found for CommandConsumerStats request id {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log topic and subscription id as well?
LOG_ERROR(cnxString_ << " In getConsumerStats call - Closing Client Connection, please try again later"); | ||
it->second.setFailed(ResultConnectError); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unlock here.
// Complex logic since promises need to be fulfilled outside the lock | ||
for (int i = 0; i < consumerStatsPromises.size(); i++) { | ||
consumerStatsPromises[i].setFailed(ResultTimeout); | ||
LOG_ERROR(cnxString_ << " In getConsumerStats call - Operation timedout, didn't get response from broker"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix log msg.
|
||
if (consumerStatsResponse.has_error_code()) { | ||
if (consumerStatsResponse.has_error_message()) { | ||
LOG_ERROR(cnxString_ << " In getConsumerStats call - " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix msg
83c4319
to
20675ca
Compare
} | ||
|
||
break; | ||
} | ||
|
||
case BaseCommand::SEND_ERROR: { | ||
const CommandSendError& error = incomingCmd_.send_error(); | ||
const CommandSendError &error = incomingCmd_.send_error(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the formatting caused by the editor w.r.t. space and &.
97827ef
to
12cf4d1
Compare
@merlimat - if you find time, can you review this PR. |
Ok, I was about to get to it ;)
…On Mon, Feb 27, 2017 at 9:52 AM jai1 ***@***.***> wrote:
Merged #216 <#216>.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#216 (comment)>, or mute the
thread
<https://github.com/notifications/unsubscribe-auth/AAD0JFYQ-dU-Mz5EZMqX8kadwm8fhT5bks5rgw1PgaJpZM4MDYHI>
.
|
Sorry - was in a hurry to get this out, will plan better next time onwards
On Mon, Feb 27, 2017 at 9:54 AM, Matteo Merli <notifications@github.com>
wrote:
… Ok, I was about to get to it ;)
On Mon, Feb 27, 2017 at 9:52 AM jai1 ***@***.***> wrote:
> Merged #216 <#216>.
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> <#216 (comment)>, or mute the
> thread
> <https://github.com/notifications/unsubscribe-auth/AAD0JFYQ-dU-
Mz5EZMqX8kadwm8fhT5bks5rgw1PgaJpZM4MDYHI>
> .
>
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#216 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AIQh_F8HALVIdU1obGNbSU6712P0g9_Bks5rgw3XgaJpZM4MDYHI>
.
|
* Rename FunctionConfig names to make them more consistent * Added generated python file
speed up entriesToRecords for consuming multi entries
Motivation
Sometimes we need topic statics before consuming messages (Common use cases listed below), currently such information is obtained by making admin calls which use HTTP. Admin calls are expensive because each call spawns a new connection and may cause a redirect if the current broker doesn’t own the topic, we need a way to get this information without using HTTP.
Common use cases
Modifications
Result
We will be able to get consumer stats without having to use HTTP