Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: apache/pulsar
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 3c0819421a4e4ed573fbdade774fe098092b0d68
Choose a base ref
..
head repository: apache/pulsar
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 4749d21c675798bd93688bb9d2d99e10fad9d6d1
Choose a head ref
Showing with 341 additions and 15,701 deletions.
  1. +1 −1 .github/workflows/ci-documentbot.yml
  2. +55 −0 pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
  3. +36 −0 pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
  4. +70 −0 pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
  5. +18 −0 pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
  6. +15 −0 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
  7. +35 −2 pulsar-client-cpp/lib/ConsumerImpl.cc
  8. +4 −0 pulsar-client-cpp/lib/ConsumerImpl.h
  9. +32 −0 pulsar-client-cpp/tests/ConsumerTest.cc
  10. +4 −0 pulsar-client-cpp/tests/PulsarFriend.h
  11. +27 −0 pulsar-client-cpp/tests/ReaderTest.cc
  12. +7 −0 pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
  13. +21 −0 pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
  14. +9 −8 ...ge/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/DLOutputStreamTest.java
  15. +1 −1 ...ache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java
  16. +6 −2 pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
  17. +0 −581 site2/website/versioned_docs/version-2.2.0/develop-binary-protocol.md
  18. +0 −581 site2/website/versioned_docs/version-2.2.1/develop-binary-protocol.md
  19. +0 −581 site2/website/versioned_docs/version-2.3.0/develop-binary-protocol.md
  20. +0 −581 site2/website/versioned_docs/version-2.3.1/develop-binary-protocol.md
  21. +0 −581 site2/website/versioned_docs/version-2.3.2/develop-binary-protocol.md
  22. +0 −581 site2/website/versioned_docs/version-2.4.0/develop-binary-protocol.md
  23. +0 −581 site2/website/versioned_docs/version-2.4.1/develop-binary-protocol.md
  24. +0 −581 site2/website/versioned_docs/version-2.4.2/develop-binary-protocol.md
  25. +0 −581 site2/website/versioned_docs/version-2.5.0/develop-binary-protocol.md
  26. +0 −581 site2/website/versioned_docs/version-2.5.1/develop-binary-protocol.md
  27. +0 −581 site2/website/versioned_docs/version-2.5.2/develop-binary-protocol.md
  28. +0 −581 site2/website/versioned_docs/version-2.6.0/develop-binary-protocol.md
  29. +0 −581 site2/website/versioned_docs/version-2.6.1/develop-binary-protocol.md
  30. +0 −581 site2/website/versioned_docs/version-2.6.2/develop-binary-protocol.md
  31. +0 −581 site2/website/versioned_docs/version-2.6.3/develop-binary-protocol.md
  32. +0 −581 site2/website/versioned_docs/version-2.6.4/develop-binary-protocol.md
  33. +0 −581 site2/website/versioned_docs/version-2.7.0/develop-binary-protocol.md
  34. +0 −581 site2/website/versioned_docs/version-2.7.1/develop-binary-protocol.md
  35. +0 −581 site2/website/versioned_docs/version-2.7.3/develop-binary-protocol.md
  36. +0 −581 site2/website/versioned_docs/version-2.7.4/develop-binary-protocol.md
  37. +0 −581 site2/website/versioned_docs/version-2.8.0-deprecated/develop-binary-protocol.md
  38. +0 −581 site2/website/versioned_docs/version-2.8.1-deprecated/develop-binary-protocol.md
  39. +0 −581 site2/website/versioned_docs/version-2.8.2-deprecated/develop-binary-protocol.md
  40. +0 −581 site2/website/versioned_docs/version-2.8.x/develop-binary-protocol.md
  41. +0 −581 site2/website/versioned_docs/version-2.9.0-deprecated/develop-binary-protocol.md
  42. +0 −581 site2/website/versioned_docs/version-2.9.1-deprecated/develop-binary-protocol.md
  43. +0 −581 site2/website/versioned_docs/version-2.9.x/develop-binary-protocol.md
2 changes: 1 addition & 1 deletion .github/workflows/ci-documentbot.yml
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@ on:
- unlabeled

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.event.number }}
cancel-in-progress: true

jobs:
Original file line number Diff line number Diff line change
@@ -62,6 +62,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
@@ -600,6 +601,60 @@ private CompletableFuture<Map<String, String>> getPropertiesAsync() {
});
}

protected CompletableFuture<Void> internalUpdatePropertiesAsync(boolean authoritative,
Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
log.warn("[{}] [{}] properties is empty, ignore update", clientAppId(), topicName);
return CompletableFuture.completedFuture(null);
}
return validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.PRODUCE))
.thenCompose(__ -> {
if (topicName.isPartitioned()) {
return internalUpdateNonPartitionedTopicProperties(properties);
} else {
return pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
if (metadata.partitions == 0) {
return internalUpdateNonPartitionedTopicProperties(properties);
}
return namespaceResources()
.getPartitionedTopicResources().updatePartitionedTopicAsync(topicName,
p -> new PartitionedTopicMetadata(p.partitions,
MapUtils.putAll(p.properties, properties.entrySet().toArray())));
});
}
}).thenAccept(__ ->
log.info("[{}] [{}] update properties success with properties {}",
clientAppId(), topicName, properties));
}

private CompletableFuture<Void> internalUpdateNonPartitionedTopicProperties(Map<String, String> properties) {
CompletableFuture<Void> future = new CompletableFuture<>();
pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(opt -> {
if (!opt.isPresent()) {
throw new RestException(Status.NOT_FOUND,
getTopicNotFoundErrorMessage(topicName.toString()));
}
ManagedLedger managedLedger = ((PersistentTopic) opt.get()).getManagedLedger();
managedLedger.asyncSetProperties(properties, new AsyncCallbacks.UpdatePropertiesCallback() {

@Override
public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
managedLedger.getConfig().getProperties().putAll(properties);
future.complete(null);
}

@Override
public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
});
return future;
}

protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
Original file line number Diff line number Diff line change
@@ -966,6 +966,42 @@ public void getProperties(
});
}

@PUT
@Path("/{tenant}/{namespace}/{topic}/properties")
@ApiOperation(value = "Update the properties on the given topic.")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or"
+ "subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Method Not Allowed"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void updateProperties(
@Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@PathParam("namespace") String namespace,
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Key value pair properties for the topic metadata") Map<String, String> properties){
validatePersistentTopicName(tenant, namespace, encodedTopic);
internalUpdatePropertiesAsync(authoritative, properties)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to update topic {} properties", clientAppId(), topicName, ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/partitions")
@ApiOperation(value = "Delete a partitioned topic.",
Original file line number Diff line number Diff line change
@@ -887,6 +887,76 @@ public void testCreateAndGetTopicProperties() throws Exception {
Assert.assertEquals(properties22.get("key2"), "value2");
}

@Test
public void testUpdatePartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdatePartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);

// create partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createPartitionedTopic(topicName, 2, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}

@Test
public void testUpdateNonPartitionedTopicProperties() throws Exception {
final String namespace = "prop-xyz/ns2";
final String topicName = "persistent://" + namespace + "/testUpdateNonPartitionedTopicProperties";
admin.namespaces().createNamespace(namespace, 20);

// create non-partitioned topic with properties
Map<String, String> topicProperties = new HashMap<>();
topicProperties.put("key1", "value1");
admin.topics().createNonPartitionedTopic(topicName, topicProperties);
Map<String, String> properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.get("key1"), "value1");

// update with new key, old properties should keep
topicProperties = new HashMap<>();
topicProperties.put("key2", "value2");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value1");
Assert.assertEquals(properties.get("key2"), "value2");

// override old values
topicProperties = new HashMap<>();
topicProperties.put("key1", "value11");
admin.topics().updateProperties(topicName, topicProperties);
properties = admin.topics().getProperties(topicName);
Assert.assertNotNull(properties);
Assert.assertEquals(properties.size(), 2);
Assert.assertEquals(properties.get("key1"), "value11");
Assert.assertEquals(properties.get("key2"), "value2");
}

@Test
public void testNonPersistentTopics() throws Exception {
final String namespace = "prop-xyz/ns2";
Original file line number Diff line number Diff line change
@@ -736,6 +736,24 @@ void updatePartitionedTopic(String topic, int numPartitions, boolean updateLocal
*/
CompletableFuture<Map<String, String>> getPropertiesAsync(String topic);

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @throws PulsarAdminException
*/
void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException;

/**
* Update Topic Properties on a topic.
* The new properties will override the existing values, old properties in the topic will be keep if not override.
* @param topic
* @param properties
* @return
*/
CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties);

/**
* Delete a partitioned topic and its schemas.
* <p/>
Original file line number Diff line number Diff line change
@@ -456,6 +456,21 @@ public CompletableFuture<Map<String, String>> getPropertiesAsync(String topic) {
return asyncGetRequest(path, new FutureCallback<Map<String, String>>(){});
}

@Override
public void updateProperties(String topic, Map<String, String> properties) throws PulsarAdminException {
sync(() -> updatePropertiesAsync(topic, properties));
}

@Override
public CompletableFuture<Void> updatePropertiesAsync(String topic, Map<String, String> properties) {
TopicName tn = validateTopic(topic);
WebTarget path = topicPath(tn, "properties");
if (properties == null) {
properties = new HashMap<>();
}
return asyncPutRequest(path, Entity.entity(properties, MediaType.APPLICATION_JSON));
}

@Override
public void deletePartitionedTopic(String topic) throws PulsarAdminException {
deletePartitionedTopic(topic, false);
37 changes: 35 additions & 2 deletions pulsar-client-cpp/lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
@@ -1299,6 +1299,16 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
return;
}

TimeDuration operationTimeout = seconds(client_.lock()->conf().getOperationTimeoutSeconds());
BackoffPtr backoff = std::make_shared<Backoff>(milliseconds(100), operationTimeout * 2, milliseconds(0));
DeadlineTimerPtr timer = executor_->createDeadlineTimer();

internalGetLastMessageIdAsync(backoff, operationTimeout, timer, callback);
}

void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback) {
ClientConnectionPtr cnx = getCnx().lock();
if (cnx) {
if (cnx->getServerProtocolVersion() >= proto::v12) {
@@ -1326,8 +1336,31 @@ void ConsumerImpl::getLastMessageIdAsync(BrokerGetLastMessageIdCallback callback
callback(ResultUnsupportedVersionError, MessageId());
}
} else {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected, MessageId());
TimeDuration next = std::min(remainTime, backoff->next());
if (next.total_milliseconds() <= 0) {
LOG_ERROR(getName() << " Client Connection not ready for Consumer");
callback(ResultNotConnected, MessageId());
return;
}
remainTime -= next;

timer->expires_from_now(next);

auto self = shared_from_this();
timer->async_wait([this, backoff, remainTime, timer, next, callback,
self](const boost::system::error_code& ec) -> void {
if (ec == boost::asio::error::operation_aborted) {
LOG_DEBUG(getName() << " Get last message id operation was cancelled, code[" << ec << "].");
return;
}
if (ec) {
LOG_ERROR(getName() << " Failed to get last message id, code[" << ec << "].");
return;
}
LOG_WARN(getName() << " Could not get connection while getLastMessageId -- Will try again in "
<< next.total_milliseconds() << " ms")
this->internalGetLastMessageIdAsync(backoff, remainTime, timer, callback);
});
}
}

4 changes: 4 additions & 0 deletions pulsar-client-cpp/lib/ConsumerImpl.h
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ class ConsumerImpl;
class BatchAcknowledgementTracker;
typedef std::shared_ptr<MessageCrypto> MessageCryptoPtr;
typedef std::function<void(Result, const GetLastMessageIdResponse&)> BrokerGetLastMessageIdCallback;
typedef std::shared_ptr<Backoff> BackoffPtr;

enum ConsumerTopicType
{
@@ -181,6 +182,9 @@ class ConsumerImpl : public ConsumerImplBase,
void failPendingReceiveCallback();
void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
void trackMessage(const MessageId& messageId);
void internalGetLastMessageIdAsync(const BackoffPtr& backoff, TimeDuration remainTime,
const DeadlineTimerPtr& timer,
BrokerGetLastMessageIdCallback callback);

Optional<MessageId> clearReceiveQueue();

32 changes: 32 additions & 0 deletions pulsar-client-cpp/tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
@@ -763,4 +763,36 @@ TEST(ConsumerTest, testPartitionsWithCloseUnblock) {
thread.join();
}

TEST(ConsumerTest, testGetLastMessageIdBlockWhenConnectionDisconnected) {
int operationTimeout = 5;
ClientConfiguration clientConfiguration;
clientConfiguration.setOperationTimeoutSeconds(operationTimeout);

Client client(lookupUrl, clientConfiguration);
const std::string topic =
"testGetLastMessageIdBlockWhenConnectionDisconnected-" + std::to_string(time(nullptr));

Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "test-sub", consumer));

ConsumerImpl& consumerImpl = PulsarFriend::getConsumerImpl(consumer);
ClientConnectionWeakPtr conn = PulsarFriend::getClientConnection(consumerImpl);

PulsarFriend::setClientConnection(consumerImpl, std::weak_ptr<ClientConnection>());

pulsar::Latch latch(1);
auto start = TimeUtils::now();

consumerImpl.getLastMessageIdAsync([&latch](Result r, const GetLastMessageIdResponse&) -> void {
ASSERT_EQ(r, ResultNotConnected);
latch.countdown();
});

ASSERT_TRUE(latch.wait(std::chrono::seconds(20)));
auto elapsed = TimeUtils::now() - start;

// getLastMessageIdAsync should be blocked until operationTimeout when the connection is disconnected.
ASSERT_GE(elapsed.seconds(), operationTimeout);
}

} // namespace pulsar
4 changes: 4 additions & 0 deletions pulsar-client-cpp/tests/PulsarFriend.h
Original file line number Diff line number Diff line change
@@ -116,6 +116,10 @@ class PulsarFriend {

static ClientConnectionWeakPtr getClientConnection(HandlerBase& handler) { return handler.connection_; }

static void setClientConnection(HandlerBase& handler, ClientConnectionWeakPtr conn) {
handler.connection_ = conn;
}

static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
return backoff.firstBackoffTime_;
}
27 changes: 27 additions & 0 deletions pulsar-client-cpp/tests/ReaderTest.cc
Original file line number Diff line number Diff line change
@@ -579,3 +579,30 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
EXPECT_FALSE(hasMessageAvailable);
client.close();
}

TEST(ReaderTest, testReceiveAfterSeek) {
Client client(serviceUrl);
const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));

Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));

MessageId seekMessageId;
for (int i = 0; i < 5; i++) {
MessageId messageId;
producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
if (i == 3) {
seekMessageId = messageId;
}
}

Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));

reader.seek(seekMessageId);

bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));

client.close();
}
Loading