From ffcc578e673aa6506ee62be5abd4b741898e8116 Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Mon, 22 Jul 2024 17:47:41 +0800
Subject: [PATCH 1/6] [improve][broker] Reduce the pressure from the
transaction buffer readers and writers in rolling restarts
### Motivation
During the rolling restarts, the namespace bundle ownerships will
change. Assuming there is a producer created on a single topic, and the
ownership was transferred to the new broker. Assuming the namespace
bundle has N topics and the namespace is `tenant/ns`,
1. All N topics in the same bundle of that topic will be loaded.
2. For each topic, the managed ledger will be initialized, when the
transaction coordinator is enabled, a `TopicTransactionBuffer` will
be created.
2.1 A Pulsar producer will be created on
`tenant/ns/__transaction_buffer_snapshot` concurrently.
2.2 A Pulsar reader will be created on
`tenant/ns/__transaction_buffer_snapshot` concurrently.
3. Once all N readers are created, the owner of the snapshot topic will
start dispatching messages to N readers. Each dispatcher will read
messages from BookKeeper concurrently and might fail with too many
requests error because BK can only have
`maxPendingReadRequestsPerThread` pending read requests (default: 10000).
We have a `numTransactionReplayThreadPoolSize` config to limit the
concurrency of transaction snapshot readers. However, it only limits the
read loop. For example, if it's configured with 1, only 1 reader could
read messages at the same time. However, N readers will be created
concurrently. Each when one of these reader explicitly calls `readNext`,
all N dispatchers at brokers side will dispatch messages to N readers.
The behaviors above brings much CPU pressure on the owner broker,
especially for a small cluster with only two brokers.
### Modifications
- Synchronize the reader creation, read loop and the following process
on its result. Maintain only one reader for each namespace.
---
.../apache/pulsar/broker/PulsarService.java | 6 ++
...SingleSnapshotAbortedTxnProcessorImpl.java | 72 ++++------------
.../buffer/impl/SnapshotTableView.java | 86 +++++++++++++++++++
.../org/apache/pulsar/utils/SimpleCache.java | 62 +++++++++++++
.../apache/pulsar/utils/SimpleCacheTest.java | 74 ++++++++++++++++
5 files changed, 246 insertions(+), 54 deletions(-)
create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 848484fe3763d..9dde179955fe3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -124,6 +124,7 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
+import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
@@ -285,6 +286,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
+ private SnapshotTableView snapshotTableView;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
@@ -993,6 +995,10 @@ public void start() throws PulsarServerException {
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
+ this.snapshotTableView = new SnapshotTableView(
+ transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService(),
+ executor, Long.parseLong(config.getProperties().getProperty(
+ "brokerClient_operationTimeoutMs", "30000")));
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 5c9075e9a3867..3c640dc41126d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -21,26 +21,19 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
-import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
-import org.apache.pulsar.common.util.FutureUtil;
@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
@@ -91,48 +84,26 @@ public boolean checkAbortedTransaction(TxnID txnID) {
return aborts.containsKey(txnID);
}
- private long getSystemClientOperationTimeoutMs() throws Exception {
- PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
- return pulsarClient.getConfiguration().getOperationTimeoutMs();
- }
-
@Override
public CompletableFuture recoverFromSnapshot() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- Position startReadCursorPosition = null;
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
- if (transactionBufferSnapshot != null) {
- handleSnapshot(transactionBufferSnapshot);
- startReadCursorPosition = PositionFactory.create(
- transactionBufferSnapshot.getMaxReadPositionLedgerId(),
- transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- return CompletableFuture.completedFuture(startReadCursorPosition);
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
- + "transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ final var future = new CompletableFuture();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var snapshot = pulsar.getSnapshotTableView().readLatest(topic.getName());
+ if (snapshot != null) {
+ handleSnapshot(snapshot);
+ final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ future.complete(startReadCursorPosition);
+ } else {
+ future.complete(null);
+ }
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
+ }
+ });
+ return future;
}
@Override
@@ -191,13 +162,6 @@ public synchronized CompletableFuture closeAsync() {
return CompletableFuture.completedFuture(null);
}
- private void closeReader(SystemTopicClient.Reader reader) {
- reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
- return null;
- });
- }
-
private void handleSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata ->
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
new file mode 100644
index 0000000000000..c622de7b47902
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.transaction.buffer.impl;
+
+import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
+import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.utils.SimpleCache;
+
+/**
+ * Compared with the more generic {@link org.apache.pulsar.client.api.TableView}, this table view
+ * - Provides just a single public method that reads the latest value synchronously.
+ * - Maintains multiple long-lived readers that will be expired after some time (1 minute by default).
+ */
+@Slf4j
+public class SnapshotTableView {
+
+ // Remove the cached reader and snapshots if there is no refresh request in 1 minute
+ private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
+ private final Map snapshots = new ConcurrentHashMap<>();
+ private final SystemTopicTxnBufferSnapshotService snapshotService;
+ private final long blockTimeoutMs;
+ private final SimpleCache> readers;
+
+ public SnapshotTableView(SystemTopicTxnBufferSnapshotService snapshotService,
+ ScheduledExecutorService executor, long blockTimeoutMs) {
+ this.snapshotService = snapshotService;
+ this.blockTimeoutMs = blockTimeoutMs;
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
+ }
+
+ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
+ final var topicName = TopicName.get(topic);
+ final var namespace = topicName.getNamespaceObject();
+ final var reader = readers.get(namespace, () -> {
+ try {
+ return wait(snapshotService.createReader(topicName), "create reader");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, __ -> __.closeAsync().exceptionally(e -> {
+ log.warn("Failed to close reader {}", e.getMessage());
+ return null;
+ }));
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var msg = wait(reader.readNextAsync(), "read message");
+ if (msg.getKey() != null) {
+ snapshots.put(msg.getKey(), msg.getValue());
+ }
+ }
+ return snapshots.get(topic);
+ }
+
+ private T wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new ExecutionException("Failed to " + msg, e.getCause());
+ }
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
new file mode 100644
index 0000000000000..41055b37c012f
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import lombok.RequiredArgsConstructor;
+
+@RequiredArgsConstructor
+public class SimpleCache {
+
+ private final Map cache = new HashMap<>();
+ private final Map> futures = new HashMap<>();
+ private final ScheduledExecutorService executor;
+ private final long timeoutMs;
+
+ public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) {
+ final V value;
+ V existingValue = cache.get(key);
+ if (existingValue != null) {
+ value = existingValue;
+ } else {
+ value = valueSupplier.get();
+ cache.put(key, value);
+ }
+ final var future = futures.remove(key);
+ if (future != null) {
+ future.cancel(true);
+ }
+ futures.put(key, executor.schedule(() -> {
+ synchronized (SimpleCache.this) {
+ futures.remove(key);
+ final var removedValue = cache.remove(key);
+ if (removedValue != null) {
+ expireCallback.accept(removedValue);
+ }
+ }
+ }, timeoutMs, TimeUnit.MILLISECONDS));
+ return value;
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
new file mode 100644
index 0000000000000..c2176d71fec9f
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.utils;
+
+import java.util.Collections;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.Test;
+
+public class SimpleCacheTest {
+
+ private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+
+ @AfterClass
+ public void shutdown() {
+ executor.shutdown();
+ }
+
+ @Test
+ public void testConcurrentUpdate() throws Exception {
+ final var cache = new SimpleCache(executor, 10000L);
+ final var pool = Executors.newFixedThreadPool(2);
+ final var latch = new CountDownLatch(2);
+ for (int i = 0; i < 2; i++) {
+ final var value = i + 100;
+ pool.execute(() -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ cache.get(0, () -> value, __ -> {});
+ latch.countDown();
+ });
+ }
+ latch.await();
+ final var value = cache.get(0, () -> -1, __ -> {});
+ Assert.assertTrue(value == 100 || value == 101);
+ pool.shutdown();
+ }
+
+ @Test
+ public void testExpire() throws InterruptedException {
+ final var cache = new SimpleCache(executor, 500L);
+ final var expiredValues = new CopyOnWriteArrayList();
+ cache.get(0, () -> 100, expiredValues::add);
+ for (int i = 0; i < 100; i++) {
+ cache.get(1, () -> 101, expiredValues::add);
+ Thread.sleep(10);
+ }
+ Assert.assertEquals(cache.get(0, () -> -1, __ -> {}), -1); // the value is expired
+ Assert.assertEquals(cache.get(1, () -> -1, __ -> {}), 101);
+ Assert.assertEquals(expiredValues, Collections.singletonList(100));
+ }
+}
From 4fdab3a96b13f3003bb5906fdaf8ce152569bf0f Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Wed, 24 Jul 2024 16:20:00 +0800
Subject: [PATCH 2/6] Remove key-value for null value
---
.../buffer/impl/SnapshotTableView.java | 6 +++++-
.../TopicTransactionBufferRecoverTest.java | 20 -------------------
2 files changed, 5 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
index c622de7b47902..256084132081c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
@@ -70,7 +70,11 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
- snapshots.put(msg.getKey(), msg.getValue());
+ if (msg.getValue() != null) {
+ snapshots.put(msg.getKey(), msg.getValue());
+ } else {
+ snapshots.remove(msg.getKey());
+ }
}
}
return snapshots.get(topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index e4240bce700bd..3cf3a0bba3bcf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -90,7 +90,6 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
@@ -684,25 +683,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
txn.commit().get();
}
-
- @Test
- public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
- String topic = NAMESPACE1 + "/test";
- @Cleanup
- Producer producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
- .topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
-
- admin.topics().unload(topic);
-
- // unload success, all readers have been closed except for the compaction sub
- producer.send("test");
- TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);
-
- // except for the compaction sub
- assertEquals(stats.getSubscriptions().size(), 1);
- assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
- }
-
@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
SystemTopicTxnBufferSnapshotService transactionBufferSnapshotIndexService =
From 3e2c82c896e4ddde65bc530a6cf0ed0662f1124d Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Wed, 24 Jul 2024 17:34:14 +0800
Subject: [PATCH 3/6] Mock snapshot table view in tests
---
.../apache/pulsar/broker/PulsarService.java | 5 +--
.../buffer/impl/SnapshotTableView.java | 44 +++++++++++--------
.../TopicTransactionBufferRecoverTest.java | 27 +++++++++++-
3 files changed, 51 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 9dde179955fe3..492db888fb260 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -995,10 +995,7 @@ public void start() throws PulsarServerException {
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
- this.snapshotTableView = new SnapshotTableView(
- transactionBufferSnapshotServiceFactory.getTxnBufferSnapshotService(),
- executor, Long.parseLong(config.getProperties().getProperty(
- "brokerClient_operationTimeoutMs", "30000")));
+ this.snapshotTableView = new SnapshotTableView(this);
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
index 256084132081c..7099f22c78cf7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
@@ -19,13 +19,15 @@
package org.apache.pulsar.broker.transaction.buffer.impl;
import static org.apache.pulsar.broker.systopic.SystemTopicClient.Reader;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.NamespaceName;
@@ -47,26 +49,15 @@ public class SnapshotTableView {
private final long blockTimeoutMs;
private final SimpleCache> readers;
- public SnapshotTableView(SystemTopicTxnBufferSnapshotService snapshotService,
- ScheduledExecutorService executor, long blockTimeoutMs) {
- this.snapshotService = snapshotService;
- this.blockTimeoutMs = blockTimeoutMs;
- this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
+ public SnapshotTableView(PulsarService pulsar) {
+ this.snapshotService = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
+ this.blockTimeoutMs = Long.parseLong(pulsar.getConfig().getProperties()
+ .getProperty("brokerClient_operationTimeoutMs", "30000"));
+ this.readers = new SimpleCache<>(pulsar.getExecutor(), CACHE_EXPIRE_TIMEOUT_MS);
}
public TransactionBufferSnapshot readLatest(String topic) throws Exception {
- final var topicName = TopicName.get(topic);
- final var namespace = topicName.getNamespaceObject();
- final var reader = readers.get(namespace, () -> {
- try {
- return wait(snapshotService.createReader(topicName), "create reader");
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, __ -> __.closeAsync().exceptionally(e -> {
- log.warn("Failed to close reader {}", e.getMessage());
- return null;
- }));
+ final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
@@ -80,11 +71,26 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
return snapshots.get(topic);
}
+ @VisibleForTesting
+ protected Reader getReader(String topic) {
+ final var topicName = TopicName.get(topic);
+ return readers.get(topicName.getNamespaceObject(), () -> {
+ try {
+ return wait(snapshotService.createReader(topicName), "create reader");
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, __ -> __.closeAsync().exceptionally(e -> {
+ log.warn("Failed to close reader {}", e.getMessage());
+ return null;
+ }));
+ }
+
private T wait(CompletableFuture future, String msg) throws Exception {
try {
return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
- throw new ExecutionException("Failed to " + msg, e.getCause());
+ throw new CompletionException("Failed to " + msg, e.getCause());
}
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 3cf3a0bba3bcf..057bd4eb76d0c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -66,6 +66,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
+import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -581,6 +582,22 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
reader.close();
}
+ static class MockSnapshotTableView extends SnapshotTableView {
+
+ private final PulsarService pulsar;
+
+ public MockSnapshotTableView(PulsarService pulsar) {
+ super(pulsar);
+ this.pulsar = pulsar;
+ }
+
+ @Override
+ public SystemTopicClient.Reader getReader(String topic) {
+ return pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .createReader(TopicName.get(topic)).join();
+ }
+ }
+
@Test(timeOut=30000)
public void testTransactionBufferRecoverThrowException() throws Exception {
String topic = NAMESPACE1 + "/testTransactionBufferRecoverThrowPulsarClientException";
@@ -662,7 +679,12 @@ private void checkCloseTopic(PulsarClient pulsarClient,
PersistentTopic originalTopic,
Field field,
Producer producer) throws Exception {
- field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactory);
+ final var pulsar = getPulsarServiceList().get(0);
+ final var snapshotTableViewField = PulsarService.class.getDeclaredField("snapshotTableView");
+ final var originalSnapshotTableView = pulsar.getSnapshotTableView();
+ snapshotTableViewField.setAccessible(true);
+ snapshotTableViewField.set(pulsar, new MockSnapshotTableView(pulsar));
+ field.set(pulsar, transactionBufferSnapshotServiceFactory);
// recover again will throw then close topic
new TopicTransactionBuffer(originalTopic);
@@ -673,7 +695,8 @@ private void checkCloseTopic(PulsarClient pulsarClient,
assertTrue((boolean) close.get(originalTopic));
});
- field.set(getPulsarServiceList().get(0), transactionBufferSnapshotServiceFactoryOriginal);
+ field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
+ snapshotTableViewField.set(pulsar, originalSnapshotTableView);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
From 92eede76324cb80f0dd109f4147e38618ea127f6 Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Wed, 24 Jul 2024 17:38:11 +0800
Subject: [PATCH 4/6] Reduce a thread switch when processing the recovered
position
---
.../transaction/buffer/impl/TopicTransactionBuffer.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index b4662e5fa83ed..7561457d11f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -632,7 +632,7 @@ public void run() {
this, topic.getName());
return;
}
- abortedTxnProcessor.recoverFromSnapshot().thenAcceptAsync(startReadCursorPosition -> {
+ abortedTxnProcessor.recoverFromSnapshot().thenAccept(startReadCursorPosition -> {
//Transaction is not use for this topic, so just make maxReadPosition as LAC.
if (startReadCursorPosition == null) {
callBack.noNeedToRecover();
@@ -678,8 +678,7 @@ public void run() {
closeCursor(SUBSCRIPTION_NAME);
callBack.recoverComplete();
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this)).exceptionally(e -> {
+ }).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer failed to recover snapshot!", topic.getName(), e);
return null;
From d24b2be860f5e8de2553182804cb7f0b55243a33 Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Wed, 24 Jul 2024 21:22:15 +0800
Subject: [PATCH 5/6] Apply transaction TableView for all AbortedTxnProcessor
implementations
---
.../apache/pulsar/broker/PulsarService.java | 5 +-
.../SystemTopicTxnBufferSnapshotService.java | 15 +-
...ansactionBufferSnapshotServiceFactory.java | 26 +-
...SingleSnapshotAbortedTxnProcessorImpl.java | 3 +-
...napshotSegmentAbortedTxnProcessorImpl.java | 394 +++++++-----------
...{SnapshotTableView.java => TableView.java} | 36 +-
.../TopicTransactionBufferRecoverTest.java | 27 +-
7 files changed, 213 insertions(+), 293 deletions(-)
rename pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/{SnapshotTableView.java => TableView.java} (68%)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 492db888fb260..a63334623a48b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -124,7 +124,6 @@
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
-import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
@@ -286,7 +285,6 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private PulsarMetadataEventSynchronizer localMetadataSynchronizer;
private CoordinationService coordinationService;
private TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory;
- private SnapshotTableView snapshotTableView;
private MetadataStore configurationMetadataStore;
private PulsarMetadataEventSynchronizer configMetadataSynchronizer;
private boolean shouldShutdownConfigurationMetadataStore;
@@ -994,8 +992,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
- this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
- this.snapshotTableView = new SnapshotTableView(this);
+ this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
index bd1b90981695e..ba6cbee355775 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicTxnBufferSnapshotService.java
@@ -22,12 +22,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
-import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
@@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService {
protected final EventType systemTopicType;
private final ConcurrentHashMap> refCountedWriterMap;
+ @Getter
+ private final TableView tableView;
// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
@@ -95,13 +101,16 @@ public synchronized void release() {
}
- public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
- Class schemaType) {
+ public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
+ Class schemaType) throws PulsarServerException {
+ final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
+ this.tableView = new TableView<>(this::createReader,
+ client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
}
public CompletableFuture> createReader(TopicName topicName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
index 4b8548fae47c7..d54f65572f594 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransactionBufferSnapshotServiceFactory.java
@@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;
+import lombok.Getter;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
-import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;
+@Getter
public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotService;
@@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {
private SystemTopicTxnBufferSnapshotService txnBufferSnapshotIndexService;
- public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
- this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
+ this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
- this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
- this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
+ this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
}
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotIndexService() {
- return this.txnBufferSnapshotIndexService;
- }
-
- public SystemTopicTxnBufferSnapshotService
- getTxnBufferSnapshotSegmentService() {
- return this.txnBufferSnapshotSegmentService;
- }
-
- public SystemTopicTxnBufferSnapshotService getTxnBufferSnapshotService() {
- return this.txnBufferSnapshotService;
- }
-
public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
index 3c640dc41126d..1649349e3e6f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SingleSnapshotAbortedTxnProcessorImpl.java
@@ -90,7 +90,8 @@ public CompletableFuture recoverFromSnapshot() {
final var pulsar = topic.getBrokerService().getPulsar();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
- final var snapshot = pulsar.getSnapshotTableView().readLatest(topic.getName());
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
index e94e7a047797a..4ca27f77a87f5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotSegmentAbortedTxnProcessorImpl.java
@@ -24,11 +24,11 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Supplier;
@@ -54,7 +54,6 @@
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
@@ -228,220 +227,129 @@ public CompletableFuture takeAbortedTxnsSnapshot(Position maxReadPosition)
@Override
public CompletableFuture recoverFromSnapshot() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotIndexService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- Position startReadCursorPosition = null;
- TransactionBufferSnapshotIndexes persistentSnapshotIndexes = null;
- try {
- /*
- Read the transaction snapshot segment index.
-
- The processor can get the sequence ID, unsealed transaction IDs,
- segment index list and max read position in the snapshot segment index.
- Then we can traverse the index list to read all aborted transaction IDs
- in segments to aborts.
-
- */
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes = message.getValue();
- if (transactionBufferSnapshotIndexes != null) {
- persistentSnapshotIndexes = transactionBufferSnapshotIndexes;
- startReadCursorPosition = PositionFactory.create(
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionLedgerId(),
- transactionBufferSnapshotIndexes.getSnapshot().getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
- + "transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(
- new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
- }
- Position finalStartReadCursorPosition = startReadCursorPosition;
- TransactionBufferSnapshotIndexes finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
- if (persistentSnapshotIndexes == null) {
- return recoverOldSnapshot();
- } else {
- this.unsealedTxnIds = convertTypeToTxnID(persistentSnapshotIndexes
- .getSnapshot().getAborts());
- }
- //Read snapshot segment to recover aborts.
- ArrayList> completableFutures = new ArrayList<>();
- CompletableFuture openManagedLedgerAndHandleSegmentsFuture = new CompletableFuture<>();
- AtomicBoolean hasInvalidIndex = new AtomicBoolean(false);
- AsyncCallbacks.OpenReadOnlyManagedLedgerCallback callback = new AsyncCallbacks
- .OpenReadOnlyManagedLedgerCallback() {
- @Override
- public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnlyManagedLedger,
- Object ctx) {
- finalPersistentSnapshotIndexes.getIndexList().forEach(index -> {
- CompletableFuture handleSegmentFuture = new CompletableFuture<>();
- completableFutures.add(handleSegmentFuture);
- readOnlyManagedLedger.asyncReadEntry(
- PositionFactory.create(index.getSegmentLedgerID(),
- index.getSegmentEntryID()),
- new AsyncCallbacks.ReadEntryCallback() {
- @Override
- public void readEntryComplete(Entry entry, Object ctx) {
- handleSnapshotSegmentEntry(entry);
- indexes.put(PositionFactory.create(
- index.abortedMarkLedgerID,
- index.abortedMarkEntryID),
- index);
- entry.release();
- handleSegmentFuture.complete(null);
- }
-
- @Override
- public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- /*
- The logic flow of deleting expired segment is:
-
- 1. delete segment
- 2. update segment index
-
- If the worker delete segment successfully
- but failed to update segment index,
- the segment can not be read according to the index.
- We update index again if there are invalid indexes.
- */
- if (((ManagedLedgerImpl) topic.getManagedLedger())
- .ledgerExists(index.getAbortedMarkLedgerID())) {
- log.error("[{}] Failed to read snapshot segment [{}:{}]",
- topic.getName(), index.segmentLedgerID,
- index.segmentEntryID, exception);
- handleSegmentFuture.completeExceptionally(exception);
- } else {
- hasInvalidIndex.set(true);
- }
- }
-
- @Override
- public String toString() {
- return String.format("Transaction buffer [%s] recover from snapshot",
- SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
- }
- }, null);
- });
- openManagedLedgerAndHandleSegmentsFuture.complete(null);
- }
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var future = new CompletableFuture();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var indexes = pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotIndexService().getTableView().readLatest(topic.getName());
+ if (indexes == null) {
+ // Try recovering from the old format snapshot
+ future.complete(recoverOldSnapshot());
+ return;
+ }
+ final var snapshot = indexes.getSnapshot();
+ final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
+ snapshot.getMaxReadPositionEntryId());
+ this.unsealedTxnIds = convertTypeToTxnID(snapshot.getAborts());
+ // Read snapshot segment to recover aborts
+ final var snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
+ TopicName.get(topic.getName()).getNamespaceObject(),
+ SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
+ readSegmentEntries(snapshotSegmentTopicName, indexes);
+ if (!this.indexes.isEmpty()) {
+ // If there is no segment index, the persistent worker will write segment begin from 0.
+ persistentWorker.sequenceID.set(this.indexes.get(this.indexes.lastKey()).sequenceID + 1);
+ }
+ unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
+ future.complete(startReadCursorPosition);
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
- @Override
- public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
- log.error("[{}] Failed to open readOnly managed ledger", topic, exception);
- openManagedLedgerAndHandleSegmentsFuture.completeExceptionally(exception);
- }
- };
-
- TopicName snapshotSegmentTopicName = TopicName.get(TopicDomain.persistent.toString(),
- TopicName.get(topic.getName()).getNamespaceObject(),
- SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
- this.topic.getBrokerService().getPulsar().getManagedLedgerFactory()
- .asyncOpenReadOnlyManagedLedger(snapshotSegmentTopicName
- .getPersistenceNamingEncoding(), callback,
- topic.getManagedLedger().getConfig(),
- null);
- /*
- Wait the processor recover completely and then allow TB
- to recover the messages after the startReadCursorPosition.
- */
- return openManagedLedgerAndHandleSegmentsFuture
- .thenCompose((ignore) -> FutureUtil.waitForAll(completableFutures))
- .thenCompose((i) -> {
- /*
- Update the snapshot segment index if there exist invalid indexes.
- */
- if (hasInvalidIndex.get()) {
- persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
- () -> persistentWorker.updateSnapshotIndex(
- finalPersistentSnapshotIndexes.getSnapshot()));
- }
- /*
- If there is no segment index, the persistent worker will write segment begin from 0.
- */
- if (indexes.size() != 0) {
- persistentWorker.sequenceID.set(indexes.get(indexes.lastKey()).sequenceID + 1);
- }
- /*
- Append the aborted txn IDs in the index metadata
- can keep the order of the aborted txn in the aborts.
- So that we can trim the expired snapshot segment in aborts
- according to the latest transaction IDs in the segmentIndex.
- */
- unsealedTxnIds.forEach(txnID -> aborts.put(txnID, txnID));
- return CompletableFuture.completedFuture(finalStartReadCursorPosition);
- }).exceptionally(ex -> {
- log.error("[{}] Failed to recover snapshot segment", this.topic.getName(), ex);
- return null;
- });
-
- }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
+ private void readSegmentEntries(TopicName topicName, TransactionBufferSnapshotIndexes indexes) throws Exception {
+ final var managedLedger = openReadOnlyManagedLedger(topicName);
+ boolean hasInvalidIndex = false;
+ for (var index : indexes.getIndexList()) {
+ final var position = PositionFactory.create(index.getSegmentLedgerID(), index.getSegmentEntryID());
+ final var abortedPosition = PositionFactory.create(index.abortedMarkLedgerID, index.abortedMarkEntryID);
+ try {
+ final var entry = readEntry(managedLedger, position);
+ try {
+ handleSnapshotSegmentEntry(entry);
+ this.indexes.put(abortedPosition, index);
+ } finally {
+ entry.release();
+ }
+ } catch (Throwable throwable) {
+ if (((ManagedLedgerImpl) topic.getManagedLedger())
+ .ledgerExists(index.getAbortedMarkLedgerID())) {
+ log.error("[{}] Failed to read snapshot segment [{}:{}]",
+ topic.getName(), index.segmentLedgerID,
+ index.segmentEntryID, throwable);
+ throw throwable;
+ } else {
+ hasInvalidIndex = true;
+ }
+ }
+ }
+ if (hasInvalidIndex) {
+ // Update the snapshot segment index if there exist invalid indexes.
+ persistentWorker.appendTask(PersistentWorker.OperationType.UpdateIndex,
+ () -> persistentWorker.updateSnapshotIndex(indexes.getSnapshot()));
+ }
+ }
+
+ private ReadOnlyManagedLedgerImpl openReadOnlyManagedLedger(TopicName topicName) throws Exception {
+ final var future = new CompletableFuture();
+ final var callback = new AsyncCallbacks.OpenReadOnlyManagedLedgerCallback() {
+ @Override
+ public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl managedLedger, Object ctx) {
+ future.complete(managedLedger);
+ }
+
+ @Override
+ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Transaction buffer [%s] recover from snapshot",
+ SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
+ }
+ };
+ topic.getBrokerService().getPulsar().getManagedLedgerFactory().asyncOpenReadOnlyManagedLedger(
+ topicName.getPersistenceNamingEncoding(), callback, topic.getManagedLedger().getConfig(), null);
+ return wait(future, "open read only ml for " + topicName);
+ }
+
+ private Entry readEntry(ReadOnlyManagedLedgerImpl managedLedger, Position position) throws Exception {
+ final var future = new CompletableFuture();
+ managedLedger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ future.complete(entry);
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ future.completeExceptionally(exception);
+ }
+ }, null);
+ return wait(future, "read entry from " + position);
}
// This method will be deprecated and removed in version 4.x.0
- private CompletableFuture recoverOldSnapshot() {
- return topic.getBrokerService().getPulsar().getPulsarResources().getTopicResources()
- .listPersistentTopicsAsync(NamespaceName.get(TopicName.get(topic.getName()).getNamespace()))
- .thenCompose(topics -> {
- if (!topics.contains(TopicDomain.persistent + "://"
- + TopicName.get(topic.getName()).getNamespace() + "/"
- + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
- return CompletableFuture.completedFuture(null);
- } else {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader -> {
- Position startReadCursorPositionInOldSnapshot = null;
- try {
- while (snapshotReader.hasMoreEvents()) {
- Message message = snapshotReader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getKey())) {
- TransactionBufferSnapshot transactionBufferSnapshot =
- message.getValue();
- if (transactionBufferSnapshot != null) {
- handleOldSnapshot(transactionBufferSnapshot);
- startReadCursorPositionInOldSnapshot = PositionFactory.create(
- transactionBufferSnapshot.getMaxReadPositionLedgerId(),
- transactionBufferSnapshot.getMaxReadPositionEntryId());
- }
- }
- }
- } catch (TimeoutException ex) {
- Throwable t = FutureUtil.unwrapCompletionException(ex);
- String errorMessage = String.format("[%s] Transaction buffer recover fail by "
- + "read transactionBufferSnapshot timeout!", topic.getName());
- log.error(errorMessage, t);
- return FutureUtil.failedFuture(new BrokerServiceException
- .ServiceUnitNotReadyException(errorMessage, t));
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer recover fail when read "
- + "transactionBufferSnapshot!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- assert snapshotReader != null;
- closeReader(snapshotReader);
- }
- return CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
- },
- topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
- .getExecutor(this));
- }
- });
+ private Position recoverOldSnapshot() throws Exception {
+ final var pulsar = topic.getBrokerService().getPulsar();
+ final var topicName = TopicName.get(topic.getName());
+ final var topics = wait(pulsar.getPulsarResources().getTopicResources().listPersistentTopicsAsync(
+ NamespaceName.get(topicName.getNamespace())), "list persistent topics");
+ if (!topics.contains(TopicDomain.persistent + "://" + topicName.getNamespace() + "/"
+ + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT)) {
+ return null;
+ }
+ final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .getTableView().readLatest(topic.getName());
+ if (snapshot == null) {
+ return null;
+ }
+ handleOldSnapshot(snapshot);
+ return PositionFactory.create(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId());
}
// This method will be deprecated and removed in version 4.x.0
@@ -509,9 +417,17 @@ private long getSystemClientOperationTimeoutMs() throws Exception {
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}
- private void closeReader(SystemTopicClient.Reader reader) {
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
+ }
+
+ private void closeReader(SystemTopicClient.Reader reader) {
reader.closeAsync().exceptionally(e -> {
- log.error("[{}]Transaction buffer snapshot reader close error!", topic.getName(), e);
+ log.warn("[{}] Failed to close reader: {}", topic.getName(), e.getMessage());
return null;
});
}
@@ -838,25 +754,37 @@ private CompletableFuture clearSnapshotSegmentAndIndexes() {
*
*/
private CompletableFuture clearAllSnapshotSegments() {
- return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
- .getTxnBufferSnapshotSegmentService()
- .createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
- try {
- while (reader.hasMoreEvents()) {
- Message message = reader.readNextAsync()
- .get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
- if (topic.getName().equals(message.getValue().getTopicName())) {
- snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
- }
+ final var future = new CompletableFuture();
+ final var pulsar = topic.getBrokerService().getPulsar();
+ pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
+ try {
+ final var reader = wait(pulsar.getTransactionBufferSnapshotServiceFactory()
+ .getTxnBufferSnapshotSegmentService().createReader(TopicName.get(topic.getName()))
+ , "create reader");
+ try {
+ while (wait(reader.hasMoreEventsAsync(), "has more events")) {
+ final var message = wait(reader.readNextAsync(), "read next");
+ if (topic.getName().equals(message.getValue().getTopicName())) {
+ snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
}
- return CompletableFuture.completedFuture(null);
- } catch (Exception ex) {
- log.error("[{}] Transaction buffer clear snapshot segments fail!", topic.getName(), ex);
- return FutureUtil.failedFuture(ex);
- } finally {
- closeReader(reader);
}
- });
+ future.complete(null);
+ } finally {
+ closeReader(reader);
+ }
+ } catch (Throwable throwable) {
+ future.completeExceptionally(throwable);
+ }
+ });
+ return future;
+ }
+
+ private R wait(CompletableFuture future, String msg) throws Exception {
+ try {
+ return future.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw new CompletionException("Failed to " + msg, e.getCause());
+ }
}
synchronized CompletableFuture closeAsync() {
@@ -882,4 +810,4 @@ private List convertTypeToTxnIDData(List abortedTxns) {
return segment;
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
similarity index 68%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 7099f22c78cf7..79836d8c15fc4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/SnapshotTableView.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -25,11 +25,10 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService;
-import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.utils.SimpleCache;
@@ -40,23 +39,24 @@
* - Maintains multiple long-lived readers that will be expired after some time (1 minute by default).
*/
@Slf4j
-public class SnapshotTableView {
+public class TableView {
// Remove the cached reader and snapshots if there is no refresh request in 1 minute
private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
- private final Map snapshots = new ConcurrentHashMap<>();
- private final SystemTopicTxnBufferSnapshotService snapshotService;
- private final long blockTimeoutMs;
- private final SimpleCache> readers;
+ @VisibleForTesting
+ protected final Function>> readerCreator;
+ private final Map snapshots = new ConcurrentHashMap<>();
+ private final long clientOperationTimeoutMs;
+ private final SimpleCache> readers;
- public SnapshotTableView(PulsarService pulsar) {
- this.snapshotService = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService();
- this.blockTimeoutMs = Long.parseLong(pulsar.getConfig().getProperties()
- .getProperty("brokerClient_operationTimeoutMs", "30000"));
- this.readers = new SimpleCache<>(pulsar.getExecutor(), CACHE_EXPIRE_TIMEOUT_MS);
+ public TableView(Function>> readerCreator, long clientOperationTimeoutMs,
+ ScheduledExecutorService executor) {
+ this.readerCreator = readerCreator;
+ this.clientOperationTimeoutMs = clientOperationTimeoutMs;
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
}
- public TransactionBufferSnapshot readLatest(String topic) throws Exception {
+ public T readLatest(String topic) throws Exception {
final var reader = getReader(topic);
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
@@ -72,11 +72,11 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
}
@VisibleForTesting
- protected Reader getReader(String topic) {
+ protected Reader getReader(String topic) {
final var topicName = TopicName.get(topic);
return readers.get(topicName.getNamespaceObject(), () -> {
try {
- return wait(snapshotService.createReader(topicName), "create reader");
+ return wait(readerCreator.apply(topicName), "create reader");
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -86,9 +86,9 @@ protected Reader getReader(String topic) {
}));
}
- private T wait(CompletableFuture future, String msg) throws Exception {
+ private R wait(CompletableFuture future, String msg) throws Exception {
try {
- return future.get(blockTimeoutMs, TimeUnit.MILLISECONDS);
+ return future.get(clientOperationTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw new CompletionException("Failed to " + msg, e.getCause());
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
index 057bd4eb76d0c..8ab9d58f57076 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java
@@ -66,7 +66,7 @@
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.impl.SingleSnapshotAbortedTxnProcessorImpl;
-import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotTableView;
+import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
@@ -582,19 +582,16 @@ private void checkSnapshotCount(TopicName topicName, boolean hasSnapshot,
reader.close();
}
- static class MockSnapshotTableView extends SnapshotTableView {
+ static class MockTableView extends TableView {
- private final PulsarService pulsar;
-
- public MockSnapshotTableView(PulsarService pulsar) {
- super(pulsar);
- this.pulsar = pulsar;
+ public MockTableView(PulsarService pulsar) {
+ super(topic -> pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
+ .createReader(topic), 30000L, pulsar.getExecutor());
}
@Override
public SystemTopicClient.Reader getReader(String topic) {
- return pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
- .createReader(TopicName.get(topic)).join();
+ return readerCreator.apply(TopicName.get(topic)).join();
}
}
@@ -628,6 +625,7 @@ public void testTransactionBufferRecoverThrowException() throws Exception {
doReturn(CompletableFuture.completedFuture(reader))
.when(systemTopicTxnBufferSnapshotService).createReader(any());
doReturn(refCounterWriter).when(systemTopicTxnBufferSnapshotService).getReferenceWriter(any());
+ doReturn(new MockTableView(pulsarServiceList.get(0))).when(systemTopicTxnBufferSnapshotService).getTableView();
TransactionBufferSnapshotServiceFactory transactionBufferSnapshotServiceFactory =
mock(TransactionBufferSnapshotServiceFactory.class);
doReturn(systemTopicTxnBufferSnapshotService)
@@ -680,10 +678,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
Field field,
Producer producer) throws Exception {
final var pulsar = getPulsarServiceList().get(0);
- final var snapshotTableViewField = PulsarService.class.getDeclaredField("snapshotTableView");
- final var originalSnapshotTableView = pulsar.getSnapshotTableView();
- snapshotTableViewField.setAccessible(true);
- snapshotTableViewField.set(pulsar, new MockSnapshotTableView(pulsar));
field.set(pulsar, transactionBufferSnapshotServiceFactory);
// recover again will throw then close topic
@@ -696,7 +690,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
});
field.set(pulsar, transactionBufferSnapshotServiceFactoryOriginal);
- snapshotTableViewField.set(pulsar, originalSnapshotTableView);
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
@@ -708,8 +701,9 @@ private void checkCloseTopic(PulsarClient pulsarClient,
@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService transactionBufferSnapshotIndexService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotIndexService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotIndexService();
SystemTopicClient.Writer indexesWriter =
transactionBufferSnapshotIndexService.getReferenceWriter(
@@ -769,9 +763,10 @@ public void testTransactionBufferSegmentSystemTopic() throws Exception {
BrokerService brokerService = pulsarService.getBrokerService();
// create snapshot segment writer
+ final var pulsar = pulsarServiceList.get(0);
SystemTopicTxnBufferSnapshotService
transactionBufferSnapshotSegmentService =
- new TransactionBufferSnapshotServiceFactory(pulsarClient).getTxnBufferSnapshotSegmentService();
+ new TransactionBufferSnapshotServiceFactory(pulsar).getTxnBufferSnapshotSegmentService();
SystemTopicClient.Writer
segmentWriter = transactionBufferSnapshotSegmentService
From 3b192a589b6817cdea3efd74eed8d6484a221ea6 Mon Sep 17 00:00:00 2001
From: Yunze Xu
Date: Fri, 26 Jul 2024 12:50:40 +0800
Subject: [PATCH 6/6] Check if value is expired periodically in SimpleCache
---
.../transaction/buffer/impl/TableView.java | 3 +-
.../org/apache/pulsar/utils/SimpleCache.java | 69 ++++++++++++-------
.../apache/pulsar/utils/SimpleCacheTest.java | 31 +++++----
3 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
index 79836d8c15fc4..7608a393cc980 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TableView.java
@@ -43,6 +43,7 @@ public class TableView {
// Remove the cached reader and snapshots if there is no refresh request in 1 minute
private static final long CACHE_EXPIRE_TIMEOUT_MS = 60 * 1000L;
+ private static final long CACHE_EXPIRE_CHECK_FREQUENCY_MS = 3000L;
@VisibleForTesting
protected final Function>> readerCreator;
private final Map snapshots = new ConcurrentHashMap<>();
@@ -53,7 +54,7 @@ public TableView(Function>> readerCreator
ScheduledExecutorService executor) {
this.readerCreator = readerCreator;
this.clientOperationTimeoutMs = clientOperationTimeoutMs;
- this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS);
+ this.readers = new SimpleCache<>(executor, CACHE_EXPIRE_TIMEOUT_MS, CACHE_EXPIRE_CHECK_FREQUENCY_MS);
}
public T readLatest(String topic) throws Exception {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
index 41055b37c012f..6a3a6721198e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/SimpleCache.java
@@ -19,44 +19,65 @@
package org.apache.pulsar.utils;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import lombok.RequiredArgsConstructor;
-@RequiredArgsConstructor
public class SimpleCache {
- private final Map cache = new HashMap<>();
- private final Map> futures = new HashMap<>();
- private final ScheduledExecutorService executor;
+ private final Map> cache = new HashMap<>();
private final long timeoutMs;
- public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) {
- final V value;
- V existingValue = cache.get(key);
- if (existingValue != null) {
- value = existingValue;
- } else {
- value = valueSupplier.get();
- cache.put(key, value);
+ @RequiredArgsConstructor
+ private class ExpirableValue {
+
+ private final V value;
+ private final Consumer expireCallback;
+ private long deadlineMs;
+
+ boolean tryExpire() {
+ if (System.currentTimeMillis() >= deadlineMs) {
+ expireCallback.accept(value);
+ return true;
+ } else {
+ return false;
+ }
}
- final var future = futures.remove(key);
- if (future != null) {
- future.cancel(true);
+
+ void updateDeadline() {
+ deadlineMs = System.currentTimeMillis() + timeoutMs;
}
- futures.put(key, executor.schedule(() -> {
+ }
+
+ public SimpleCache(final ScheduledExecutorService scheduler, final long timeoutMs, final long frequencyMs) {
+ this.timeoutMs = timeoutMs;
+ scheduler.scheduleAtFixedRate(() -> {
synchronized (SimpleCache.this) {
- futures.remove(key);
- final var removedValue = cache.remove(key);
- if (removedValue != null) {
- expireCallback.accept(removedValue);
- }
+ final var keys = new HashSet();
+ cache.forEach((key, value) -> {
+ if (value.tryExpire()) {
+ keys.add(key);
+ }
+ });
+ cache.keySet().removeAll(keys);
}
- }, timeoutMs, TimeUnit.MILLISECONDS));
- return value;
+ }, frequencyMs, frequencyMs, TimeUnit.MILLISECONDS);
+ }
+
+ public synchronized V get(final K key, final Supplier valueSupplier, final Consumer expireCallback) {
+ final var value = cache.get(key);
+ if (value != null) {
+ value.updateDeadline();
+ return value.value;
+ }
+
+ final var newValue = new ExpirableValue<>(valueSupplier.get(), expireCallback);
+ newValue.updateDeadline();
+ cache.put(key, newValue);
+ return newValue.value;
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
index c2176d71fec9f..c590eda171804 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/SimpleCacheTest.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.utils;
import java.util.Collections;
-import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
@@ -38,7 +41,7 @@ public void shutdown() {
@Test
public void testConcurrentUpdate() throws Exception {
- final var cache = new SimpleCache(executor, 10000L);
+ final var cache = new SimpleCache(executor, 10000L, 10000L);
final var pool = Executors.newFixedThreadPool(2);
final var latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
@@ -60,15 +63,19 @@ public void testConcurrentUpdate() throws Exception {
@Test
public void testExpire() throws InterruptedException {
- final var cache = new SimpleCache(executor, 500L);
- final var expiredValues = new CopyOnWriteArrayList();
- cache.get(0, () -> 100, expiredValues::add);
- for (int i = 0; i < 100; i++) {
- cache.get(1, () -> 101, expiredValues::add);
- Thread.sleep(10);
- }
- Assert.assertEquals(cache.get(0, () -> -1, __ -> {}), -1); // the value is expired
- Assert.assertEquals(cache.get(1, () -> -1, __ -> {}), 101);
- Assert.assertEquals(expiredValues, Collections.singletonList(100));
+ final var cache = new SimpleCache(executor, 500L, 5);
+ final var expiredValues = Collections.synchronizedSet(new HashSet());
+
+ final var allKeys = IntStream.range(0, 5).boxed().collect(Collectors.toSet());
+ allKeys.forEach(key -> cache.get(key, () -> key + 100, expiredValues::add));
+
+ Thread.sleep(400L);
+ final var recentAccessedKey = Set.of(1, 2);
+ recentAccessedKey.forEach(key -> cache.get(key, () -> -1, expiredValues::add)); // access these keys
+
+ Thread.sleep(300L);
+ recentAccessedKey.forEach(key -> Assert.assertEquals(key + 100, cache.get(key, () -> -1, __ -> {})));
+ allKeys.stream().filter(key -> !recentAccessedKey.contains(key))
+ .forEach(key -> Assert.assertEquals(-1, cache.get(key, () -> -1, __ -> {})));
}
}