Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transaction] Fix transaction sequenceId generate error. #13209

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections.map.SingletonMap;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand All @@ -65,7 +64,6 @@
import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -572,7 +570,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionLogInterceptor);
MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig());
Expand All @@ -591,7 +590,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
doNothing().when(timeoutTracker).start();
MLTransactionMetadataStore metadataStore1 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker);
mlTransactionLog, timeoutTracker, transactionRecoverTracker,
mlTransactionLogInterceptor.getSequenceId());

Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
Expand All @@ -604,7 +604,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{

MLTransactionMetadataStore metadataStore2 =
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
mlTransactionLog, timeoutTracker, transactionRecoverTracker);
mlTransactionLog, timeoutTracker, transactionRecoverTracker,
mlTransactionLogInterceptor.getSequenceId());
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,11 @@ public class MLTransactionLogImpl implements TransactionLog {

private final TopicName topicName;

private final MLTransactionLogInterceptor mlTransactionLogInterceptor;

public MLTransactionLogImpl(TransactionCoordinatorID tcID,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig) {
this.topicName = getMLTransactionLogName(tcID);
this.tcId = tcID.getId();
this.mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
managedLedgerConfig.setManagedLedgerInterceptor(this.mlTransactionLogInterceptor);
this.managedLedgerFactory = managedLedgerFactory;
this.managedLedgerConfig = managedLedgerConfig;
this.entryQueue = new SpscArrayQueue<>(2000);
Expand Down Expand Up @@ -161,7 +157,6 @@ public CompletableFuture<Position> append(TransactionMetadataEntry transactionMe
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
buf.release();
mlTransactionLogInterceptor.setMaxLocalTxnId(transactionMetadataEntry.getMaxLocalTxnId());
completableFuture.complete(position);
}

Expand Down Expand Up @@ -242,42 +237,6 @@ public void start() {
}
}

public CompletableFuture<Long> getMaxLocalTxnId() {

CompletableFuture<Long> completableFuture = new CompletableFuture<>();
PositionImpl position = (PositionImpl) managedLedger.getLastConfirmedEntry();

if (position != null && position.getEntryId() != -1
&& ((ManagedLedgerImpl) managedLedger).ledgerExists(position.getLedgerId())) {
((ManagedLedgerImpl) this.managedLedger).asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
ByteBuf buffer = entry.getDataBuffer();
lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
completableFuture.complete(lastConfirmEntry.getMaxLocalTxnId());
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail!", topicName, exception);
completableFuture.completeExceptionally(exception);
}
}, null);
} else if (managedLedger.getProperties()
.get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID) != null) {
completableFuture.complete(Long.parseLong(managedLedger.getProperties()
.get(MLTransactionLogInterceptor.MAX_LOCAL_TXN_ID)));
} else {
log.error("[{}] MLTransactionLog recover MaxLocalTxnId fail! "
+ "not found MaxLocalTxnId in managedLedger and properties", topicName);
completableFuture.completeExceptionally(new ManagedLedgerException(topicName
+ "MLTransactionLog recover MaxLocalTxnId fail! "
+ "not found MaxLocalTxnId in managedLedger and properties"));
}
return completableFuture;
}

class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback {

private final AtomicLong outstandingReadsRequests = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,87 @@
*/
package org.apache.pulsar.transaction.coordinator.impl;

import io.netty.buffer.ByteBuf;
import lombok.Getter;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.pulsar.transaction.coordinator.proto.TransactionMetadataEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;

/**
* Store max sequenceID in ManagedLedger properties, in order to recover transaction log.
*/
public class MLTransactionLogInterceptor implements ManagedLedgerInterceptor {

private static final Logger log = LoggerFactory.getLogger(MLTransactionLogInterceptor.class);
private static final long TC_ID_NOT_USED = -1L;
public static final String MAX_LOCAL_TXN_ID = "max_local_txn_id";

private volatile long maxLocalTxnId = -1;
@Getter
private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better return long not AtomicLong for the getter method.


@Override
public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
return null;
return op;
}

// When all of ledger have been deleted, we will generate sequenceId from managedLedger properties
@Override
public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
if (propertiesMap == null || propertiesMap.size() == 0) {
return;
}

if (propertiesMap.containsKey(MAX_LOCAL_TXN_ID)) {
sequenceId.set(Long.parseLong(propertiesMap.get(MAX_LOCAL_TXN_ID)));
}
}

// When we don't roll over ledger, we can init sequenceId from the getLastAddConfirmed transaction metadata entry
@Override
public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle) {
return CompletableFuture.completedFuture(null);
public CompletableFuture<Void> onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) {
CompletableFuture<Void> promise = new CompletableFuture<>();
if (lh.getLastAddConfirmed() >= 0) {
lh.readAsync(lh.getLastAddConfirmed(), lh.getLastAddConfirmed()).whenComplete((entries, ex) -> {
if (ex != null) {
log.error("[{}] Read last entry error.", name, ex);
promise.completeExceptionally(ex);
} else {
if (entries != null) {
try {
LedgerEntry ledgerEntry = entries.getEntry(lh.getLastAddConfirmed());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TheledgerEntryshould be closed after getting the max local txn ID.

if (ledgerEntry != null) {
TransactionMetadataEntry lastConfirmEntry = new TransactionMetadataEntry();
ByteBuf buffer = ledgerEntry.getEntryBuffer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffer should be released after getting the max local txn ID.

lastConfirmEntry.parseFrom(buffer, buffer.readableBytes());
this.sequenceId.set(lastConfirmEntry.getMaxLocalTxnId());
}
entries.close();
promise.complete(null);
} catch (Exception e) {
log.error("[{}] Failed to recover the tc sequenceId from the last add confirmed entry.",
name, e);
promise.completeExceptionally(e);
}
} else {
promise.complete(null);
}
}
});
} else {
promise.complete(null);
}
return promise;
}

// roll over ledger will update sequenceId to managedLedger properties
@Override
public void onUpdateManagedLedgerInfo(Map<String, String> propertiesMap) {
propertiesMap.put(MAX_LOCAL_TXN_ID, maxLocalTxnId + "");
}

protected void setMaxLocalTxnId(long maxLocalTxnId) {
this.maxLocalTxnId = maxLocalTxnId;
propertiesMap.put(MAX_LOCAL_TXN_ID, sequenceId.get() + "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,8 @@ public class MLTransactionMetadataStore
private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataStore.class);

private final TransactionCoordinatorID tcID;
private final AtomicLong sequenceId = new AtomicLong(TC_ID_NOT_USED);
private final AtomicLong sequenceId;
private final MLTransactionLogImpl transactionLog;
private static final long TC_ID_NOT_USED = -1L;
private final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
private final TransactionTimeoutTracker timeoutTracker;
private final TransactionMetadataStoreStats transactionMetadataStoreStats;
Expand All @@ -75,8 +74,10 @@ public class MLTransactionMetadataStore
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
MLTransactionLogImpl mlTransactionLog,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
TransactionRecoverTracker recoverTracker,
AtomicLong sequenceId) {
super(State.None);
this.sequenceId = sequenceId;
this.tcID = tcID;
this.transactionLog = mlTransactionLog;
this.timeoutTracker = timeoutTracker;
Expand All @@ -96,16 +97,13 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID,

@Override
public void replayComplete() {
mlTransactionLog.getMaxLocalTxnId().thenAccept(id -> {
recoverTracker.appendOpenTransactionToTimeoutTracker();
sequenceId.set(id);
if (!changeToReadyState()) {
log.error("Managed ledger transaction metadata store change state error when replay complete");
} else {
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
}
});
recoverTracker.appendOpenTransactionToTimeoutTracker();
if (!changeToReadyState()) {
log.error("Managed ledger transaction metadata store change state error when replay complete");
} else {
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
MLTransactionLogInterceptor mlTransactionLogInterceptor = new MLTransactionLogInterceptor();
managedLedgerConfig.setManagedLedgerInterceptor(new MLTransactionLogInterceptor());
MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
managedLedgerFactory, managedLedgerConfig);

// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
return txnLog.initialize().thenApply(__ ->
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, recoverTracker));
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
recoverTracker, mlTransactionLogInterceptor.getSequenceId()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to introduce modify sequence ID method for mlTransactionLogInterceptor, share AtomicLong across multiple instances is not good for maintainance

}
}
Loading