You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Currently, we have a reconciliation system that compares the messages(or entries) produced and consumed at a minute level to indicate whether all the data has been consumed completely by the consumers.
We want to track the message(or entry) properties (including the timestamp and msgSize) for messages has been persistent to bookie and messages that have been consumed & acked. Also, we want to track the event of the producer and consumer closed.
A good way to achieve this is using the BrokerInterceptor to interceptor the message at certain points. So we want to expand some interfaces for BrokerInterceptor like what #12858 did.
Goal
Get timestamp and size for the entry which has been persistent to bookie
trace the producer or consumer closed
Get the timestamp and size for entry that has been consumed and acked(which is already supported by BrokerInterceptor)
API Changes
interceptor message when broker received a send request to get the timestamp and size
/** * Interceptor message when broker received a send request * * @param headersAndPayload entry's header and payload * @param publishContext Publish Context */defaultvoidonMessagePublish(Producerproducer,
ByteBufheadersAndPayload,
Topic.PublishContextpublishContext) {
}
Add interfaces for a producer or consumer closed
/** * Called by the broker when a producer is closed. * * @param cnx client Connection * @param producer Consumer object * @param metadata A map of metadata */defaultvoidproducerClosed(ServerCnxcnx,
Producerproducer,
Map<String, String> metadata) {
}
/** * Called by the broker when a consumer is closed. * * @param cnx client Connection * @param consumer Consumer object * @param metadata A map of metadata */defaultvoidconsumerClosed(ServerCnxcnx,
Consumerconsumer,
Map<String, String> metadata) {
}
expand the beforeSendMessage to support consumer
/** * Intercept messages before sending them to the consumers. * * @param subscription pulsar subscription * @param entry entry * @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets. * @param msgMetadata message metadata. The message metadata will be recycled after this call. * @param consumer consumer. Consumer which entry are sent to. */defaultvoidbeforeSendMessage(Subscriptionsubscription,
Entryentry,
long[] ackSet,
MessageMetadatamsgMetadata,
Consumerconsumer) {
}
Implementation
First, change the APIs as described above.
Then implements all the newly added interfaces in BrokerInterceptors.java and BrokerInterceptorWithClassLoader .
Set all interested properties(like timestamp and msgSize) to MessagePublishContext in onMessagePublish interface before persistent entry to bookie.
When after entry persistent to the bookie, invoke BrokerInterceptor.messageProduced() and get the properties from the MessagePublishContext for reconciliation.
For consumption, record the interested properties before sending to the consumer by beforeSendMessage, and then at the point f message acked, getting all the properties for reconciliation.
The text was updated successfully, but these errors were encountered:
Motivation
Currently, we have a reconciliation system that compares the messages(or entries) produced and consumed at a minute level to indicate whether all the data has been consumed completely by the consumers.
We want to track the message(or entry) properties (including the timestamp and msgSize) for messages has been persistent to bookie and messages that have been consumed & acked. Also, we want to track the event of the producer and consumer closed.
A good way to achieve this is using the
BrokerInterceptor
to interceptor the message at certain points. So we want to expand some interfaces forBrokerInterceptor
like what #12858 did.Goal
API Changes
interceptor message when broker received a send request to get the timestamp and size
Add interfaces for a producer or consumer closed
expand the
beforeSendMessage
to support consumerImplementation
First, change the APIs as described above.
Then implements all the newly added interfaces in
BrokerInterceptors.java
andBrokerInterceptorWithClassLoader
.Set all interested properties(like timestamp and msgSize) to
MessagePublishContext
inonMessagePublish
interface before persistent entry to bookie.When after entry persistent to the bookie, invoke
BrokerInterceptor.messageProduced()
and get the properties from theMessagePublishContext
for reconciliation.For consumption, record the interested properties before sending to the consumer by
beforeSendMessage
, and then at the point f message acked, getting all the properties for reconciliation.The text was updated successfully, but these errors were encountered: