This cheat sheet is filled with some handy tips, commands and code snippets to get you streaming data using Apache Pulsar in no time!
Of course, this list isn’t exhaustive at all. But contributing to this project is easy! Just send a pull request and make the list growth! Any feedback and bug reports are also greatly appreciated!
Did this list help you? Please ⭐ this repository to say 🙏 to the contributors!
- Use Pulsar in Docker
- Basic CLI Commands
- Client API
- Dependencies
- Client: Create basic PulsarClient
- Producer: Create basic Producer
- Producer: Create Producer with RoundRobin Routing Mode
- Producer: Create Producer with Single Partition Routing Mode
- Producer: Create Producer with Custom Router
- Producer: Create Batching Producer
- Consumer: Create a durable Consumer with Key Shared subscription
- Consumer: Read a partitioned-topic from the beginning to last published message.
- Consumer: Reset consumer subscription to either Earliest or Latest
- Admin API
- License
$ docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:latest \
bin/pulsar standalone
$ pulsar-admin topics create-partitioned-topic cheat_sheet_topic --partitions 3
Note: by default a topic is persistent, i.e. messages are persisted to storage nodes (bookies)
$ pulsar-admin topics create-partitioned-topic persistent://my-tenant/my-namespace/cheat_sheet_topic --partitions 3
$ pulsar-client produce cheat_sheet_topic --messages "first message, second message, third message"
If no key is provided on the message, the producer will randomly pick one single partition and publish all the messages into that partition.
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("cheat_sheet_topic")
.messageRoutingMode(MessageRoutingMode.CustomPartition)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
String key = msg.getProperty("routing_key");
return MathUtils.signSafeMod(Murmur3_32Hash.getInstance().makeHash(key), metadata.numPartitions());
}
})
.create();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("cheat_sheet_topic")
.enableBatching(true)
.batchingMaxBytes(5 * 1024 * 1024) // 5MB
.batchingMaxPublishDelay(200, TimeUnit.MILLISECONDS)
.blockIfQueueFull(true)
.sendTimeout(30, TimeUnit.SECONDS)
.compressionType(CompressionType.ZSTD)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.hashingScheme(HashingScheme.Murmur3_32Hash)
.create();
try(Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("cheat_sheet_topic")
.subscriptionName("cheatSeetsubscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
) {
while (true) {
Message<String> message = consumer.receive();
try {
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
message.getKey(),
message.getValue(),
message.getTopicName(),
message.getMessageId().toString());
consumer.acknowledge(message);
} catch (Exception e) {
// Failed to process message, mark it for redelivery
consumer.negativeAcknowledge(message);
}
}
}
// Create a PulsarClient
PulsarClient client = ...
// List all partitions for topic
List<String> topics = client.getPartitionsForTopic("test_hello").get();
// Create as many readers as topic-partitions
List<CompletableFuture<Reader<String>>> readers = topics.stream()
.map(topic ->
client.newReader(Schema.STRING)
.topic(topic)
.startMessageId(MessageId.earliest)
.createAsync()
).collect(Collectors.toList());
// Create a fixed-sized Thread pool.
ExecutorService service = Executors.newFixedThreadPool(readers.size());
// Submit one task for each reader
for (CompletableFuture<Reader<String>> future : readers) {
service.submit(() -> {
try (Reader<String> reader = future.get()) {
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
message.getKey(),
message.getValue(),
message.getTopicName(),
message.getMessageId().toString());
}
System.err.printf("[%s]No message available for topic %s %n",
Thread.currentThread().getName(),
reader.getTopic());
} catch (IOException ignore) {
} catch (Exception e) {
throw new RuntimeException("Cannot get reader", e);
}
});
}
service.shutdown();
service.awaitTermination(5, TimeUnit.MINUTES);
client.close();
public void resetSubscriptionOffsetsTo(final Consumer<?> consumer,
final SubscriptionInitialPosition strategy) throws PulsarClientException {
Objects.requireNonNull(consumer, "consumer cannot be null");
Objects.requireNonNull(strategy, "strategy cannot be null");
System.out.printf(
"Resetting partition %s for subscription %s to %s position %n",
consumer.getTopic(),
consumer.getSubscription(),
strategy
);
consumer.seek(strategy == SubscriptionInitialPosition.Earliest ? MessageId.earliest : MessageId.latest);
}
Note: this operation can only be done on non-partitioned topics.
public boolean topicExists(final PulsarAdmin admin,
final String topicName) throws PulsarAdminException {
int partitionNum = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
if (partitionNum == 0) {
try {
admin.topics().getStats(topicName);
} catch (PulsarAdminException.NotFoundException e) {
return false;
}
}
return true;
}
Copyright 2020 StreamThoughts.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.