Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Reduce the CPU pressure from the transaction buffer in rolling restarts #23062

Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ public void start() throws PulsarServerException {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());
MLPendingAckStoreProvider.initBufferedWriterMetrics(getAdvertisedAddress());

this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(getClient());
this.transactionBufferSnapshotServiceFactory = new TransactionBufferSnapshotServiceFactory(this);

this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,16 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.systopic.NamespaceEventsSystemTopicFactory;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.systopic.SystemTopicClientBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.transaction.buffer.impl.TableView;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -42,6 +46,8 @@ public class SystemTopicTxnBufferSnapshotService<T> {
protected final EventType systemTopicType;

private final ConcurrentHashMap<NamespaceName, ReferenceCountedWriter<T>> refCountedWriterMap;
@Getter
private final TableView<T> tableView;

// The class ReferenceCountedWriter will maintain the reference count,
// when the reference count decrement to 0, it will be removed from writerFutureMap, the writer will be closed.
Expand Down Expand Up @@ -95,13 +101,16 @@ public synchronized void release() {

}

public SystemTopicTxnBufferSnapshotService(PulsarClient client, EventType systemTopicType,
Class<T> schemaType) {
public SystemTopicTxnBufferSnapshotService(PulsarService pulsar, EventType systemTopicType,
Class<T> schemaType) throws PulsarServerException {
final var client = (PulsarClientImpl) pulsar.getClient();
this.namespaceEventsSystemTopicFactory = new NamespaceEventsSystemTopicFactory(client);
this.systemTopicType = systemTopicType;
this.schemaType = schemaType;
this.clients = new ConcurrentHashMap<>();
this.refCountedWriterMap = new ConcurrentHashMap<>();
this.tableView = new TableView<>(this::createReader,
client.getConfiguration().getOperationTimeoutMs(), pulsar.getExecutor());
}

public CompletableFuture<SystemTopicClient.Reader<T>> createReader(TopicName topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package org.apache.pulsar.broker.service;

import lombok.Getter;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.events.EventType;

@Getter
public class TransactionBufferSnapshotServiceFactory {

private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> txnBufferSnapshotService;
Expand All @@ -33,29 +36,16 @@ public class TransactionBufferSnapshotServiceFactory {

private SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> txnBufferSnapshotIndexService;

public TransactionBufferSnapshotServiceFactory(PulsarClient pulsarClient) {
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
public TransactionBufferSnapshotServiceFactory(PulsarService pulsar) throws PulsarServerException {
this.txnBufferSnapshotSegmentService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS,
TransactionBufferSnapshotSegment.class);
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
this.txnBufferSnapshotIndexService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT_INDEXES, TransactionBufferSnapshotIndexes.class);
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsarClient,
this.txnBufferSnapshotService = new SystemTopicTxnBufferSnapshotService<>(pulsar,
EventType.TRANSACTION_BUFFER_SNAPSHOT, TransactionBufferSnapshot.class);
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> getTxnBufferSnapshotIndexService() {
return this.txnBufferSnapshotIndexService;
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotSegment>
getTxnBufferSnapshotSegmentService() {
return this.txnBufferSnapshotSegmentService;
}

public SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshot> getTxnBufferSnapshotService() {
return this.txnBufferSnapshotService;
}

public void close() throws Exception {
if (this.txnBufferSnapshotIndexService != null) {
this.txnBufferSnapshotIndexService.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,19 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.SystemTopicTxnBufferSnapshotService.ReferenceCountedWriter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.AbortTxnMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class SingleSnapshotAbortedTxnProcessorImpl implements AbortedTxnProcessor {
Expand Down Expand Up @@ -91,48 +84,27 @@ public boolean checkAbortedTransaction(TxnID txnID) {
return aborts.containsKey(txnID);
}

private long getSystemClientOperationTimeoutMs() throws Exception {
PulsarClientImpl pulsarClient = (PulsarClientImpl) topic.getBrokerService().getPulsar().getClient();
return pulsarClient.getConfiguration().getOperationTimeoutMs();
}

@Override
public CompletableFuture<Position> recoverFromSnapshot() {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(reader -> {
try {
Position startReadCursorPosition = null;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
handleSnapshot(transactionBufferSnapshot);
startReadCursorPosition = PositionFactory.create(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
return CompletableFuture.completedFuture(startReadCursorPosition);
} catch (TimeoutException ex) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ "transactionBufferSnapshot timeout!", topic.getName());
log.error(errorMessage, t);
return FutureUtil.failedFuture(
new BrokerServiceException.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
return FutureUtil.failedFuture(ex);
} finally {
closeReader(reader);
}
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this));
final var future = new CompletableFuture<Position>();
final var pulsar = topic.getBrokerService().getPulsar();
pulsar.getTransactionExecutorProvider().getExecutor(this).execute(() -> {
try {
final var snapshot = pulsar.getTransactionBufferSnapshotServiceFactory().getTxnBufferSnapshotService()
.getTableView().readLatest(topic.getName());
if (snapshot != null) {
handleSnapshot(snapshot);
final var startReadCursorPosition = PositionFactory.create(snapshot.getMaxReadPositionLedgerId(),
snapshot.getMaxReadPositionEntryId());
future.complete(startReadCursorPosition);
} else {
future.complete(null);
}
} catch (Throwable e) {
future.completeExceptionally(e);
}
});
return future;
}

@Override
Expand Down Expand Up @@ -191,13 +163,6 @@ public synchronized CompletableFuture<Void> closeAsync() {
return CompletableFuture.completedFuture(null);
}

private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) {
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});
}

private void handleSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata ->
Expand Down
Loading
Loading