-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Broker side deduplication #751
Conversation
} | ||
|
||
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService, | ||
PersistentTopicConfiguration topicConfiguration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any reason of having PersistentTopicConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explained here below.
new OpenLedgerCallback() { | ||
@Override | ||
public void openLedgerComplete(ManagedLedger ledger, Object ctx) { | ||
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this); | ||
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this, | ||
config); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see PersistentTopicConfiguration
being used in PersistentTopic
. In that case, we don't need PersistentTopicConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR, we were getting the per-topic (actually it's per-namespace) configuration only for ManagedLedger purposes: https://github.com/apache/incubator-pulsar/blob/aa40ebbe02f9b4f60b7f3910c293c50c18f35687/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java#L567
For deduplication, when creating the topic I need to know wether it's enabled or not, depending on the broker default and on the per-namespace policy.
PersistentTopicConfiguration
is grouping both the policy and the customized ManagedLedgerConfig
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For deduplication, when creating the topic I need to know wether it's enabled or not
PersistentTopicConfiguration is grouping both the policy and the customized ManagedLedgerConfig.
Yes, I understood that part. But correct if I am wrong:
MessageDeduplication checks namespace-dedup by directly fetching from policies-cache
and PersistentTopicConfiguration.namespacePolicies
is not being used anywhere.
so, I don't see any difference if we just return ManagedLedgerConfig
instead PersistentTopicConfiguration
because it seems we are not using it anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm, I guess you're right :)
I have indeed changed the approach after adding the PersistentTopicConfiguration
so that isDeduplicationEnabled()
is checking the policy and returning a future.
I'll remove PersistentTopicConfiguration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll remove
PersistentTopicConfiguration
Should we revert getTopicConfiguration()
to getManagedLedgerConfig()
if we are not using PersistentTopicConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ouch, I finally forgot to remove it. Will do that
highestSequencedPersisted.put(producerName, sequenceId); | ||
|
||
md.recycle(); | ||
messageMetadataAndPayload.release(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we release entry
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either we release the entry
or the buffer messageMetadataAndPayload
, the effect is the same, though we cannot release both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
either we release the entry or the buffer messageMetadataAndPayload, the effect is the same
Releasing Entry also releases payload(messageMetadataAndPayload
) but just releasing payload
will not release Entry
so, can you correct if it it's wrong but here we are not releasing Entry
so, isn't that leak?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it to call entry.release()
. In any case it won't be a "leak", just a missed opportunity to recycling the entry instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In any case it won't be a "leak"
Yes, but if we don't recycle EntryImpl
then don't we see leak with netty-detection report and it will keep creating new objects and will go out of memory.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No no, it's not that bad. When "leaking" objects from a recycler, they will just get normally GCed.
|
||
String producerName = md.getProducerName(); | ||
long sequenceId = md.getSequenceId(); | ||
highestSequencedPushed.put(producerName, sequenceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we have dedup disabled first and then enabled/ Or old-client lib published msg with null producer-name. In that case, some of the metadata
will have producerName=null
. So, ConcurrentOpenHashMap.put(null,x)
will throw RuntimeException
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Producer name was present in the metadata (and it's required) since the very beginning of Pulsar. It will never be null.
* returning a future to track the completion of the task | ||
*/ | ||
public CompletableFuture<Void> checkStatus() { | ||
if (status == Status.Recovering || status == Status.Removing) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we move this check at ln#197 after getting isDeduplicationEnabled()
because sometimes, we have seen (mostly while removing replicator cluster from policies) that broker receives multiple zk-watch event and we have to handle this race condition. Here, isDeduplicationEnabled()
is an async therefore, it make sense to move this check once we get the result from it (if possible then under synchronized block).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, also I'll add a mutex in the state mutation
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { | ||
log.warn("[{}] Failed to disable deduplication: {}", topic.getName(), exception.getMessage()); | ||
status = Status.Failed; | ||
future.completeExceptionally(exception); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it a ledger leak
? should we retry X time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a ledger leak since it's the delete cursor operation that failed. In this case the cursor will stay there. I'll make sure to schedule a check in this case to retry to disable deduplication.
}).exceptionally(ex -> { | ||
status = Status.Failed; | ||
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage()); | ||
future.completeExceptionally(ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think status
should be part of client metrics to verify that dedup is not enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean, reported in the topic stats ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it would be useful to know in case status=failed
. so, does it make sense to add in topic-stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
return true; | ||
} | ||
|
||
highestSequencedPushed.put(producerName, sequenceId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel isMessageAlreadyStored
name is little confusing.. as per method name it should not update the map. I know we want to add the sequenceId as well. how about pushSequenceId()
and if it returns false
means couldn't store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let me rethink the method name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming into shouldPublishNextMessage()
@rdhabalia Updated addressing comments. Also added logic to check for original producer name in case of replicated messages. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.. just a minor question regarding use of PersistentTopicConfiguration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
I think we don't have a testcase for dedup..right? |
Motivation
Context in PIP-6.
This PR contains the broker side changes to implement the deduplication using the logic described in the design doc.
This is not the complete feature. I will add unit tests after pushing the client library changes as well. All tasks are in the deduplication project
Modifications