-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-23: Pulsar Java Client Interceptors. #2471
Conversation
Add interceptor(ProducerInterceptor<T>... interceptors) method to ProducerBuilder.java and ProducerBuilderImpl.java Add ProducerInterceptors to ProducerBase.java
Implement before consume in consumer interceptors. Add UT for before consumer in consumer interceptors.
Add interceptor(ProducerInterceptor<T>... interceptors) method to ProducerBuilder.java and ProducerBuilderImpl.java Add ProducerInterceptors to ProducerBase.java
Implement before consume in consumer interceptors. Add UT for before consumer in consumer interceptors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui the change LGTM! a few nits around using mockito
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/v1/V1_ReplicatorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change LGTM, just left few comments.
One thing we should consider is to add ReaderInterceptor
as well. Even though the implementation is completely based on consumer, from an API perspective they are completely different.
Internally the ReaderInterceptor
would be just a wrapper of ConsumerInterceptor
.
Another idea, which might be out of the scope of this PR is to have Client level interceptor, both to be automatically attached to any producer/consumer created by the client instance and to also intercept events like consumer/producer created.
pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerInterceptor.java
Outdated
Show resolved
Hide resolved
* @param messageId message to ack, null if acknowledge fail. | ||
* @param cause the exception on acknowledge. | ||
*/ | ||
void onAcknowledge(MessageId messageId, Throwable cause); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception on acknowledge is an artifact of existing APIs. The current implementation is not throwing exceptions anymore (based on the fact that messages that are failed to be acknowledged will be replayed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In doAcknowledge method, if check state failed.
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
if (AckType.Individual.equals(ackType)) {
onAcknowledge(messageId, exception);
} else if (AckType.Cumulative.equals(ackType)) {
onAcknowledgeCumulative(messageId, exception);
}
return FutureUtil.failedFuture(exception);
}
So i thought parameter of exception should be keep.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should actually only be called when the consumer has been closed.
pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerInterceptor.java
Outdated
Show resolved
Hide resolved
@@ -105,7 +105,6 @@ void add(MessageImpl<?> msg, SendCallback callback) { | |||
batchedMessageMetadataAndPayload = Commands.serializeSingleMessageInBatchWithPayload(msgBuilder, | |||
msg.getDataBuffer(), batchedMessageMetadataAndPayload); | |||
messages.add(msg); | |||
msgBuilder.recycle(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason for why the message builder cannot be recycled anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerIntercetpor want to get the message properties. The message properties is lazy init by call getProperties(). In getProperties method:
public synchronized Map<String, String> getProperties() {
if (this.properties == null) {
if (msgMetadataBuilder.getPropertiesCount() > 0) {
this.properties = Collections.unmodifiableMap(msgMetadataBuilder.getPropertiesList().stream()
.collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue)));
} else {
this.properties = Collections.emptyMap();
}
}
return this.properties;
}
So i change call msgBuilder.recycle() after call interceptor.
public void sendComplete(Exception e) {
if (e != null) {
stats.incrementSendFailed();
onSendAcknowledgement(interceptorMessage, null, e);
future.completeExceptionally(e);
} else {
onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
future.complete(interceptorMessage.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
interceptorMessage.getDataBuffer().release();
interceptorMessage.getMessageBuilder().recycle();
while (nextCallback != null) {
SendCallback sendCallback = nextCallback;
MessageImpl<?> msg = nextMsg;
msg.getDataBuffer().retain();
if (e != null) {
stats.incrementSendFailed();
onSendAcknowledgement((Message<T>) msg, null, e);
sendCallback.getFuture().completeExceptionally(e);
} else {
onSendAcknowledgement((Message<T>) msg, msg.getMessageId(), null);
sendCallback.getFuture().complete(msg.getMessageId());
stats.incrementNumAcksReceived(System.nanoTime() - createdAt);
}
msg.getDataBuffer().release();
msg.getMessageBuilder().recycle();
nextMsg = nextCallback.getNextMessage();
nextCallback = nextCallback.getNextSendCallback();
}
}
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerInterceptors.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerInterceptors.java
Outdated
Show resolved
Hide resolved
|
||
sendAsync(message, new SendCallback() { | ||
MessageImpl<T> interceptorMessage = (MessageImpl<T>) beforeSend(message); | ||
interceptorMessage.getDataBuffer().retain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a comment on why we are retaining the buffer here and when it will be released?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
Outdated
Show resolved
Hide resolved
@merlimat : created #2476 for tracking the tasks for interceptors. lets focus on producer/consumer interceptor for this PR.
Created #2477
Created #2478 |
Implement before consume in consumer interceptors. Add UT for before consumer in consumer interceptors.
…consumers/producers, it might be good to also pass a reference to the Consumer and Producer.
…ally to ensure release. Change foreach to for++." This reverts commit d605cad
… try/finally to ensure release. Change foreach to for++."" This reverts commit aaffaf7
@merlimat I'm already address all your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
retest this please |
@codelipenghui There is one test failing : https://builds.apache.org/job/pulsar_precommit_java8/3387/testReport/junit/org.apache.pulsar.functions.instance/ContextImplTest/testPublishUsingDefaultSchema_on_testPublishUsingDefaultSchema_org_apache_pulsar_functions_instance_ContextImplTest_/ I believe it might be related to the test mocking the producer implementation and then find the interceptor variable as null. |
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
# Conflicts: # pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
run java8 tests |
@codelipenghui I fixed ReplicatorTest and V1_ReplicatorTest. so the CI should be passing now. once the CI is passed, will merge it. |
run java8 tests |
@codelipenghui I just merged the interceptors change. Great work! |
@sijie Thanks. |
Motivation
Support user to add interceptors to producer and consumer.
Modifications
Add Consumer interceptors.
Add Producer interceptors.
Result
Users can using interceptors in multiple scenarios, such as for applications to add
custom logging or processing.
Master Issue: #2476