diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9a2dd5c7c54fd..53e80d021d0a3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -283,6 +283,40 @@ protected void internalGrantPermissionsOnTopic(final AsyncResponse asyncResponse }); } + private CompletableFuture revokePermissionsAsync(String topicUri, String role, boolean force) { + return namespaceResources().getPoliciesAsync(namespaceName).thenCompose( + policiesOptional -> { + Policies policies = policiesOptional.orElseThrow(() -> + new RestException(Status.NOT_FOUND, "Namespace does not exist")); + if (!policies.auth_policies.getTopicAuthentication().containsKey(topicUri) + || !policies.auth_policies.getTopicAuthentication().get(topicUri).containsKey(role)) { + log.warn("[{}] Failed to revoke permission from role {} on topic: Not set at topic level {}", + clientAppId(), role, topicUri); + if (force) { + return CompletableFuture.completedFuture(null); + } else { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Permissions are not set at the topic level")); + } + } + // Write the new policies to metadata store + return namespaceResources().setPoliciesAsync(namespaceName, p -> { + p.auth_policies.getTopicAuthentication().computeIfPresent(topicUri, (k, roles) -> { + roles.remove(role); + if (roles.isEmpty()) { + return null; + } + return roles; + }); + return p; + }).thenAccept(__ -> + log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, + topicUri) + ); + } + ); + } + protected void internalRevokePermissionsOnTopic(AsyncResponse asyncResponse, String role) { // This operation should be reading from zookeeper and it should be allowed without having admin privileges validateAdminAccessForTenantAsync(namespaceName.getTenant()) @@ -4107,27 +4141,33 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit return; } try { - final MessageExpirer messageExpirer; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - } else { - messageExpirer = topic.getSubscription(subName); - } - if (messageExpirer == null) { - final String message = (subName.startsWith(topic.getReplicatorPrefix())) - ? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName); - asyncResponse.resume(new RestException(Status.NOT_FOUND, message)); + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } - CompletableFuture batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); + boolean issued; try { - if (messageExpirer.expireMessages(position)) { + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Replicator not found")); + return; + } + issued = repl.expireMessages(position); + } else { + issued = sub.expireMessages(position); + } + if (issued) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c1050340..84133a8383938 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -24,7 +24,6 @@ import java.util.SortedMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; -import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -48,7 +47,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, MessageExpirer { private final ManagedCursor cursor; private final String subName; - private final PersistentTopic topic; private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; @@ -62,10 +60,9 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); - public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - @Nullable PersistentSubscription subscription) { - this.topic = topic; - this.topicName = topic.getName(); + public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, + PersistentSubscription subscription) { + this.topicName = topicName; this.cursor = cursor; this.subName = subscriptionName; this.subscription = subscription; @@ -141,12 +138,11 @@ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTL @Override public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. - PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); - if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) { + if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " - + "current topic's last position {}", topicName, subName, messagePosition, - topicLastPosition); + + "current topic's last position {}", topicName, subName, messagePosition, + subscription.getTopic().getLastPosition()); } return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 0d387da1dd912..35e140ac3713f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -117,7 +117,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man brokerService, replicationClient); this.topic = localTopic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic, + this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f00c95f7e6816..e23142e40b792 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -151,7 +151,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ac5ab94c213c5..be1e14f235c1e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -62,7 +62,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; @@ -234,11 +233,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional }); assertTrue(ex.get()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); - - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); @@ -415,11 +410,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); - - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); Position previousMarkDelete = null; for (int i = 0; i < totalEntries; i++) { monitor.expireMessages(1); @@ -454,10 +445,7 @@ public void testIncorrectClientClock() throws Exception { ledger.addEntry(createMessageWrittenToLedger("msg" + i, incorrectPublishTimestamp)); } assertEquals(ledger.getLedgersInfoAsList().size(), 10); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); Thread.sleep(TimeUnit.SECONDS.toMillis(maxTTLSeconds)); monitor.expireMessages(maxTTLSeconds); assertEquals(c1.getNumberOfEntriesInBacklog(true), 0); @@ -479,16 +467,15 @@ void testMessageExpiryWithPosition() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - PersistentTopic topic = mock(PersistentTopic.class); + Topic topic = mock(Topic.class); when(subscription.getTopic()).thenReturn(topic); - when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", cursor.getName(), cursor, subscription)); assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); boolean issued; @@ -527,7 +514,7 @@ void testMessageExpiryWithPosition() throws Exception { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); - PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to // expire message shouldn't issue. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 159d49ca2e7cc..3fa3a8dfc0f15 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -727,36 +727,6 @@ public void testReplicatorClearBacklog() throws Exception { assertEquals(status.getReplicationBacklog(), 0); } - - @Test(timeOut = 30000) - public void testResetReplicatorSubscriptionPosition() throws Exception { - final TopicName dest = TopicName - .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription")); - - @Cleanup - MessageProducer producer1 = new MessageProducer(url1, dest); - - // Produce from cluster1 and consume from the rest - for (int i = 0; i < 10; i++) { - producer1.produce(2); - } - - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); - - MessageId id = topic.getLastMessageId().get(); - admin1.topics().expireMessages(dest.getPartitionedTopicName(), - replicator.getCursor().getName(), - id,false); - - replicator.updateRates(); - - ReplicatorStats status = replicator.getStats(); - assertEquals(status.getReplicationBacklog(), 0); - } - @Test(timeOut = 30000) public void testResetCursorNotFail() throws Exception {