Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async call for broker stats #305

Merged
merged 15 commits into from
Apr 7, 2017
85 changes: 85 additions & 0 deletions pulsar-client-cpp/include/pulsar/BrokerConsumerStats.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef PULSAR_CPP_BROKERCONSUMERSTATS_H
#define PULSAR_CPP_BROKERCONSUMERSTATS_H

#include <boost/date_time/posix_time/ptime.hpp>
#include <string.h>
#include <iostream>
#include <pulsar/Result.h>
#include <boost/function.hpp>
#include <pulsar/ConsumerType.h>

namespace pulsar {
class BrokerConsumerStatsImplBase;
class BrokerConsumerStats {
private:
boost::shared_ptr<BrokerConsumerStatsImplBase> impl_;
public:
BrokerConsumerStats(boost::shared_ptr<BrokerConsumerStatsImplBase> impl);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this constructor should be explicit


BrokerConsumerStats();

/** Returns true if the Stats are still valid **/
virtual bool isValid() const;

/** Returns the rate of messages delivered to the consumer. msg/s */
virtual double getMsgRateOut() const;

/** Returns the throughput delivered to the consumer. bytes/s */
virtual double getMsgThroughputOut() const;

/** Returns the rate of messages redelivered by this consumer. msg/s */
virtual double getMsgRateRedeliver() const;

/** Returns the Name of the consumer */
virtual const std::string getConsumerName() const;

/** Returns the Number of available message permits for the consumer */
virtual uint64_t getAvailablePermits() const;

/** Returns the Number of unacknowledged messages for the consumer */
virtual uint64_t getUnackedMessages() const;

/** Returns true if the consumer is blocked due to unacked messages. */
virtual bool isBlockedConsumerOnUnackedMsgs() const;

/** Returns the Address of this consumer */
virtual const std::string getAddress() const;

/** Returns the Timestamp of connection */
virtual const std::string getConnectedSince() const;

/** Returns Whether this subscription is Exclusive or Shared or Failover */
virtual const ConsumerType getType() const;

/** Returns the rate of messages expired on this subscription. msg/s */
virtual double getMsgRateExpired() const;

/** Returns the Number of messages in the subscription backlog */
virtual uint64_t getMsgBacklog() const;

/** @deprecated */
boost::shared_ptr<BrokerConsumerStatsImplBase> getImpl() const;

friend std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats &obj);
};
typedef boost::shared_ptr<BrokerConsumerStats> BrokerConsumerStatsPtr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need a shared_ptr here if BrokerConsumerStats is already basically a shared_ptr and is allowed to be invalid?

typedef boost::function<void(Result result, BrokerConsumerStats brokerConsumerStats)> BrokerConsumerStatsCallback;

}
#endif //PULSAR_CPP_BROKERCONSUMERSTATS_H
101 changes: 25 additions & 76 deletions pulsar-client-cpp/include/pulsar/Consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <pulsar/Result.h>
#include <boost/date_time/posix_time/ptime.hpp>
#include <iostream>
#include <pulsar/ConsumerType.h>
#include <pulsar/BrokerConsumerStats.h>
#pragma GCC visibility push(default)

class PulsarFriend;
Expand All @@ -37,80 +39,6 @@ typedef boost::function<void(Result result)> ResultCallback;
/// Callback definition for MessageListener
typedef boost::function<void(Consumer consumer, const Message& msg)> MessageListener;

enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,

/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,

/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/

ConsumerFailover
};

class BrokerConsumerStats {
private:
/*
* validTillInMs_ - Stats will be valid till this time.
*/
boost::posix_time::ptime validTill_;
public:
BrokerConsumerStats();
BrokerConsumerStats(boost::posix_time::ptime& validTill, double msgRateOut, double msgThroughputOut,
double msgRateRedeliver, std::string consumerName, int availablePermits,
int unackedMessages, bool blockedConsumerOnUnackedMsgs, std::string address,
std::string connectedSince, std::string type, double msgRateExpired, long msgBacklog);

/** Returns true if the Message is Expired **/
bool isValid() const;

/** Total rate of messages delivered to the consumer. msg/s */
double msgRateOut_;

/** Total throughput delivered to the consumer. bytes/s */
double msgThroughputOut_;

/** Total rate of messages redelivered by this consumer. msg/s */
double msgRateRedeliver_;

/** Name of the consumer */
std::string consumerName_;

/** Number of available message permits for the consumer */
int availablePermits_;

/** Number of unacknowledged messages for the consumer */
int unackedMessages_;

/** Flag to verify if consumer is blocked due to reaching threshold of unacked messages */
bool blockedConsumerOnUnackedMsgs_;

/** Address of this consumer */
std::string address_;

/** Timestamp of connection */
std::string connectedSince_;

/// Whether this subscription is Exclusive or Shared or Failover
std::string type_;

/// Total rate of messages expired on this subscription. msg/s
double msgRateExpired_;

/// Number of messages in the subscription backlog
long msgBacklog_;

friend std::ostream& operator<<(std::ostream& os, const BrokerConsumerStats& obj);
};

/**
* Class specifying the configuration of a consumer.
*/
Expand Down Expand Up @@ -182,6 +110,18 @@ class ConsumerConfiguration {
* @return the configured timeout in milliseconds for unacked messages.
*/
long getUnAckedMessagesTimeoutMs() const;

/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
*/
void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs);

/**
* @return the configured timeout in milliseconds caching BrokerConsumerStats.
*/
long getBrokerConsumerStatsCacheTimeInMs() const;

private:
struct Impl;
boost::shared_ptr<Impl> impl_;
Expand Down Expand Up @@ -347,12 +287,21 @@ class Consumer {
* still valid.
*
* @param brokerConsumerStats - if the function returns ResultOk, this object will contain consumer stats
* @param partitionIndex - optional parameter which is to be populated only if the topic is partitioned.
*
* @note This is a blocking call with timeout of thirty seconds.
*/
Result getConsumerStats(BrokerConsumerStats& brokerConsumerStats, int partitionIndex = -1);
Result getBrokerConsumerStats(BrokerConsumerStats& brokerConsumerStats);

/**
* Asynchronous call to gets Consumer Stats from broker.
* The stats are cached for 30 seconds, if a call is made before the stats returned by the previous call expires
* then cached data will be returned. BrokerConsumerStats::isValid() function can be used to check if the stats are
* still valid.
*
* @param callback - callback function to get the brokerConsumerStats,
* if result is ResultOk then the brokerConsumerStats will be populated
*/
void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback);
private:
typedef boost::shared_ptr<ConsumerImplBase> ConsumerImplBasePtr;
friend class PulsarFriend;
Expand Down
40 changes: 40 additions & 0 deletions pulsar-client-cpp/include/pulsar/ConsumerType.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef PULSAR_CPP_CONSUMERTYPE_H
#define PULSAR_CPP_CONSUMERTYPE_H

namespace pulsar {
enum ConsumerType {
/**
* There can be only 1 consumer on the same topic with the same consumerName
*/
ConsumerExclusive,

/**
* Multiple consumers will be able to use the same consumerName and the messages
* will be dispatched according to a round-robin rotation between the connected consumers
*/
ConsumerShared,

/** Only one consumer is active on the subscription; Subscription can have N consumers
* connected one of which will get promoted to master if the current master becomes inactive
*/
ConsumerFailover
};
}

#endif //PULSAR_CPP_CONSUMERTYPE_H
138 changes: 138 additions & 0 deletions pulsar-client-cpp/lib/BrokerConsumerStats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <pulsar/BrokerConsumerStats.h>
#include <lib/BrokerConsumerStatsImplBase.h>

namespace pulsar {
BrokerConsumerStats::BrokerConsumerStats(boost::shared_ptr<BrokerConsumerStatsImplBase> impl) : impl_(impl) {}

BrokerConsumerStats::BrokerConsumerStats() {}

boost::shared_ptr<BrokerConsumerStatsImplBase> BrokerConsumerStats::getImpl() const {
return impl_;
}

bool BrokerConsumerStats::isValid() const {
if (impl_) {
return impl_->isValid();
}
return false;
}

std::ostream& operator<<(std::ostream &os, const BrokerConsumerStats& obj) {
os << "\nBrokerConsumerStats ["
<< "validTill_ = " << obj.isValid()
<< ", msgRateOut_ = " << obj.getMsgRateOut()
<< ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
<< ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
<< ", consumerName_ = " << obj.getConsumerName()
<< ", availablePermits_ = " << obj.getAvailablePermits()
<< ", unackedMessages_ = " << obj.getUnackedMessages()
<< ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
<< ", address_ = " << obj.getAddress()
<< ", connectedSince_ = " << obj.getConnectedSince()
<< ", type_ = " << obj.getType()
<< ", msgRateExpired_ = " << obj.getMsgRateExpired()
<< ", msgBacklog_ = " << obj.getMsgBacklog()
<< "]";
return os;
}

double BrokerConsumerStats::getMsgRateOut() const {
if (impl_) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be more idiomatic to just document that no getXXX is allowed on an invalid BrokerConsumerStats object and get rid of the check. that avoids hiding bugs.

return impl_->getMsgRateOut();
}
return 0;
}

double BrokerConsumerStats::getMsgThroughputOut() const {
if (impl_) {
return impl_->getMsgThroughputOut();
}
return 0;
}

double BrokerConsumerStats::getMsgRateRedeliver() const {
if (impl_) {
return impl_->getMsgRateRedeliver();
}
return 0;
}

const std::string BrokerConsumerStats::getConsumerName() const {
if (impl_) {
return impl_->getConsumerName();
}
return "";
}

uint64_t BrokerConsumerStats::getAvailablePermits() const {
if (impl_) {
return impl_->getAvailablePermits();
}
return 0;
}

uint64_t BrokerConsumerStats::getUnackedMessages() const {
if (impl_) {
return impl_->getUnackedMessages();
}
return 0;
}

bool BrokerConsumerStats::isBlockedConsumerOnUnackedMsgs() const {
if (impl_) {
return impl_->isBlockedConsumerOnUnackedMsgs();
}
return false;
}

const std::string BrokerConsumerStats::getAddress() const {
if (impl_) {
return impl_->getAddress();
}
return "";
}

const std::string BrokerConsumerStats::getConnectedSince() const {
if (impl_) {
return impl_->getConnectedSince();
}
return "";
}

const ConsumerType BrokerConsumerStats::getType() const {
if (impl_) {
return impl_->getType();
}
return ConsumerExclusive;
}

double BrokerConsumerStats::getMsgRateExpired() const {
if (impl_) {
return impl_->getMsgRateExpired();
}
return 0;
}

uint64_t BrokerConsumerStats::getMsgBacklog() const {
if (impl_) {
return impl_->getMsgBacklog();
}
return 0;
}
}
Loading