Skip to content

Commit 7bf7736

Browse files
authored
[feature][transaction] Add a configuration to control max active transaction of coordinator (#15157)
detail in #15133 - **Status**: Discussion - **Author**: Bo Cong - **Pull Request**: - **Mailing List discussion**: - **Release**: 2.11 # Motivation Currently, the transaction coordinator does not limit the number of active transactions, which may cause the following problems: - A large number of active transactions will put a lot of pressure on memory - The transaction that a single TC can handle is limited, so the active transaction cannot be expanded infinitely - End transaction should wait TP or TB recover success, so a lot end request will pending in TP or TB and TC don't kown the state of the TB or TP, it will wast a lot of resource of the machine. If there have a lot of TB or TP request in pending state, it will cause the OOM ## Implementation ### Add config add maxActiveTransactions into broker.conf ```makefile # The max active transactions in one transaction coordinator maxActiveTransactionsPerCoordinator=10000 ``` ### How to handle the number of active transactions reach the maxActiveTransactions? If reach the maxActiveTransactions, return the Exception to client. It has a lot of disadvantages: 1. broker should add a ReachMaxActiveTxnException, if reach the max active txn exception. client need try this exception then do op. every client will handle the ReachMaxActiveTxnException. 2. client receive this transaction will not stop open txn, because it don't know what time the TC will be recoverd. It will retry now. When the TC can't recover, the client will keep retrying. But this op is not make sense. ### Design When this op request reach the maxActiveTransactions, coordinator don't return any response for this request. ignore this request directly. In this way, broker don't need to add any exception for this config. #### Let's we can see, how does this way will affect the client? If broker don't return the reponse for this request, the op of open txn will timeout. and in coordinator client, it has a semaphore to control the op of txn(open, add produce topic, add ack topic, end txn). In the timeout time, the coordinator client only can open the number of semaphore txns. Any other request will be block. So this design slove this two problems: 1. don't need to add a exception 2. client will not infinite retry #### Worries If you are worried that this design will affect the client-side experience, because the open transaction will always time out and other txn op will be blocked. I think your worry is superfluous, At this time, you should consider increasing the performance of the cluster or find the problematic client to repair. ### flow chart ![image](https://user-images.githubusercontent.com/39078850/162964277-6342ae82-1691-48b5-af84-18bb7a422ff1.png) ### Compatibility, Deprecation, and Migration Plan maxActiveTransactions default = 0, if maxActiveTransactions will not block open txn ### Test Plan reach maxActiveTransactions client open txn will timeout ### Rejected Alternatives If reach the maxActiveTransactions, return the Exception to client. It has a lot of disadvantages: 1. broker should add a ReachMaxActiveTxnException, if reach the max active txn exception. client need try this exception then do op. every client will handle the ReachMaxActiveTxnException. 2. client receive this transaction will not stop open txn, because it don't know what time the TC will be recoverd. It will retry now. When the TC can't recover, the client will keep retrying. But this op is not make sense.
1 parent f3df026 commit 7bf7736

File tree

14 files changed

+202
-60
lines changed

14 files changed

+202
-60
lines changed

conf/broker.conf

+3
Original file line numberDiff line numberDiff line change
@@ -1432,6 +1432,9 @@ transactionBufferSnapshotMinTimeInMillis=5000
14321432
# The max concurrent requests for transaction buffer client, default is 1000
14331433
transactionBufferClientMaxConcurrentRequests=1000
14341434

1435+
# The max active transactions per transaction coordinator, default value 0 indicates no limit.
1436+
maxActiveTransactionsPerCoordinator=0
1437+
14351438
# MLPendingAckStore maintains a ConcurrentSkipListMap pendingAckLogIndex,
14361439
# It stores the position in pendingAckStore as its value and saves a position used to determine
14371440
# whether the previous data can be cleaned up as a key.

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

+6
Original file line numberDiff line numberDiff line change
@@ -2600,6 +2600,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
26002600
)
26012601
private long transactionBufferClientOperationTimeoutInMills = 3000L;
26022602

2603+
@FieldContext(
2604+
category = CATEGORY_TRANSACTION,
2605+
doc = "The max active transactions per transaction coordinator, default value 0 indicates no limit."
2606+
)
2607+
private long maxActiveTransactionsPerCoordinator = 0L;
2608+
26032609
@FieldContext(
26042610
category = CATEGORY_TRANSACTION,
26052611
doc = "MLPendingAckStore maintain a ConcurrentSkipListMap pendingAckLogIndex`,"

pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,8 @@ public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(
208208
timeoutTracker, tcId.getId());
209209
return transactionMetadataStoreProvider
210210
.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
211-
timeoutTracker, recoverTracker);
211+
timeoutTracker, recoverTracker,
212+
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator());
212213
});
213214
}
214215

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -2221,11 +2221,20 @@ protected void handleNewTxn(CommandNewTxn command) {
22212221
}
22222222
commandSender.sendNewTxnResponse(requestId, txnID, command.getTcId());
22232223
} else {
2224-
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
2224+
if (ex instanceof CoordinatorException.ReachMaxActiveTxnException) {
2225+
// if new txn throw ReachMaxActiveTxnException, don't return any response to client,
2226+
// otherwise client will retry, it will wast o lot of resources
2227+
// link https://github.com/apache/pulsar/issues/15133
2228+
log.warn("New txn op reach max active transactions! tcId : {}, requestId : {}",
2229+
tcId.getId(), requestId, ex);
2230+
// do-nothing
2231+
} else {
2232+
ex = handleTxnException(ex, BaseCommand.Type.NEW_TXN.name(), requestId);
22252233

2226-
commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
2227-
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
2228-
transactionMetadataStoreService.handleOpFail(ex, tcId);
2234+
commandSender.sendNewTxnErrorResponse(requestId, tcId.getId(),
2235+
BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
2236+
transactionMetadataStoreService.handleOpFail(ex, tcId);
2237+
}
22292238
}
22302239
}));
22312240
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
695695
doNothing().when(timeoutTracker).start();
696696
MLTransactionMetadataStore metadataStore1 =
697697
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
698-
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
698+
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
699699
metadataStore1.init(transactionRecoverTracker).get();
700700
Awaitility.await().untilAsserted(() ->
701701
assertEquals(metadataStore1.getCoordinatorStats().state, "Ready"));
@@ -708,7 +708,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
708708

709709
MLTransactionMetadataStore metadataStore2 =
710710
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
711-
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
711+
712+
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
712713
metadataStore2.init(transactionRecoverTracker).get();
713714
Awaitility.await().untilAsserted(() ->
714715
assertEquals(metadataStore2.getCoordinatorStats().state, "Ready"));
@@ -721,7 +722,7 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
721722

722723
MLTransactionMetadataStore metadataStore3 =
723724
new MLTransactionMetadataStore(new TransactionCoordinatorID(1),
724-
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator);
725+
mlTransactionLog, timeoutTracker, mlTransactionSequenceIdGenerator, 0L);
725726
metadataStore3.init(transactionRecoverTracker).get();
726727
Awaitility.await().untilAsserted(() ->
727728
assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.transaction.coordinator;
20+
21+
import static org.testng.AssertJUnit.assertTrue;
22+
import static org.testng.AssertJUnit.fail;
23+
import com.google.common.collect.Sets;
24+
import java.util.concurrent.TimeUnit;
25+
import org.apache.pulsar.broker.ServiceConfiguration;
26+
import org.apache.pulsar.broker.service.BrokerTestBase;
27+
import org.apache.pulsar.client.api.PulsarClient;
28+
import org.apache.pulsar.client.api.PulsarClientException;
29+
import org.apache.pulsar.client.api.transaction.Transaction;
30+
import org.apache.pulsar.common.naming.NamespaceName;
31+
import org.apache.pulsar.common.naming.SystemTopicNames;
32+
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
33+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
34+
import org.testng.annotations.AfterMethod;
35+
import org.testng.annotations.BeforeMethod;
36+
import org.testng.annotations.Test;
37+
38+
@Test(groups = "broker")
39+
public class TransactionCoordinatorConfigTest extends BrokerTestBase {
40+
41+
@BeforeMethod
42+
@Override
43+
protected void setup() throws Exception {
44+
ServiceConfiguration configuration = getDefaultConf();
45+
configuration.setTransactionCoordinatorEnabled(true);
46+
configuration.setMaxActiveTransactionsPerCoordinator(2);
47+
super.baseSetup(configuration);
48+
admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
49+
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
50+
pulsar.getPulsarResources()
51+
.getNamespaceResources()
52+
.getPartitionedTopicResources()
53+
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
54+
new PartitionedTopicMetadata(1));
55+
}
56+
57+
@AfterMethod(alwaysRun = true)
58+
@Override
59+
protected void cleanup() throws Exception {
60+
super.internalCleanup();
61+
}
62+
63+
@Test
64+
public void testMaxActiveTxn() throws Exception {
65+
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString())
66+
.enableTransaction(true).operationTimeout(3, TimeUnit.SECONDS).build();
67+
68+
// new two txn will not reach max active txns
69+
Transaction commitTxn =
70+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
71+
Transaction abortTxn =
72+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
73+
try {
74+
// new the third txn will timeout, broker will return any response
75+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
76+
fail();
77+
} catch (Exception e) {
78+
assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
79+
}
80+
81+
// release active txn
82+
commitTxn.commit().get();
83+
abortTxn.abort().get();
84+
85+
// two txn end, can continue new txn
86+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
87+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
88+
89+
// reach max active txns again
90+
try {
91+
// new the third txn will timeout, broker will return any response
92+
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
93+
fail();
94+
} catch (Exception e) {
95+
assertTrue(e.getCause() instanceof PulsarClientException.TimeoutException);
96+
}
97+
}
98+
}

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,5 +68,5 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th
6868
CompletableFuture<TransactionMetadataStore> openStore(
6969
TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
7070
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
71-
TransactionRecoverTracker recoverTracker);
71+
TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator);
7272
}

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java

+12
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,16 @@ public TransactionMetadataStoreStateException(TransactionCoordinatorID tcID,
119119

120120
}
121121
}
122+
123+
/**
124+
* Exception is thrown when a operation of new transaction reach the number of max active transactions.
125+
*/
126+
public static class ReachMaxActiveTxnException extends CoordinatorException {
127+
128+
private static final long serialVersionUID = 0L;
129+
130+
public ReachMaxActiveTxnException(String message) {
131+
super(message);
132+
}
133+
}
122134
}

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
3737
ManagedLedgerFactory managedLedgerFactory,
3838
ManagedLedgerConfig managedLedgerConfig,
3939
TransactionTimeoutTracker timeoutTracker,
40-
TransactionRecoverTracker recoverTracker) {
40+
TransactionRecoverTracker recoverTracker,
41+
long maxActiveTransactionsPerCoordinator) {
4142
return CompletableFuture.completedFuture(
4243
new InMemTransactionMetadataStore(transactionCoordinatorId));
4344
}

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java

+47-38
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,21 @@ public class MLTransactionMetadataStore
7676
private final LongAdder appendLogCount;
7777
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
7878
private final ExecutorService internalPinnedExecutor;
79+
private final long maxActiveTransactionsPerCoordinator;
7980

8081
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
8182
MLTransactionLogImpl mlTransactionLog,
8283
TransactionTimeoutTracker timeoutTracker,
83-
MLTransactionSequenceIdGenerator sequenceIdGenerator) {
84+
MLTransactionSequenceIdGenerator sequenceIdGenerator,
85+
long maxActiveTransactionsPerCoordinator) {
8486
super(State.None);
8587
this.sequenceIdGenerator = sequenceIdGenerator;
8688
this.tcID = tcID;
8789
this.transactionLog = mlTransactionLog;
8890
this.timeoutTracker = timeoutTracker;
8991
this.transactionMetadataStoreStats = new TransactionMetadataStoreStats();
9092

93+
this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator;
9194
this.createdTransactionCount = new LongAdder();
9295
this.committedTransactionCount = new LongAdder();
9396
this.abortedTransactionCount = new LongAdder();
@@ -219,44 +222,50 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
219222

220223
@Override
221224
public CompletableFuture<TxnID> newTransaction(long timeOut) {
222-
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
223-
internalPinnedExecutor.execute(() -> {
224-
if (!checkIfReady()) {
225-
completableFuture.completeExceptionally(new CoordinatorException
226-
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
227-
return;
228-
}
225+
if (this.maxActiveTransactionsPerCoordinator == 0
226+
|| this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
227+
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
228+
internalPinnedExecutor.execute(() -> {
229+
if (!checkIfReady()) {
230+
completableFuture.completeExceptionally(new CoordinatorException
231+
.TransactionMetadataStoreStateException(tcID, State.Ready, getState(), "new Transaction"));
232+
return;
233+
}
229234

230-
long mostSigBits = tcID.getId();
231-
long leastSigBits = sequenceIdGenerator.generateSequenceId();
232-
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
233-
long currentTimeMillis = System.currentTimeMillis();
234-
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
235-
.setTxnidMostBits(mostSigBits)
236-
.setTxnidLeastBits(leastSigBits)
237-
.setStartTime(currentTimeMillis)
238-
.setTimeoutMs(timeOut)
239-
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
240-
.setLastModificationTime(currentTimeMillis)
241-
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
242-
transactionLog.append(transactionMetadataEntry)
243-
.whenComplete((position, throwable) -> {
244-
if (throwable != null) {
245-
completableFuture.completeExceptionally(throwable);
246-
} else {
247-
appendLogCount.increment();
248-
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
249-
List<Position> positions = new ArrayList<>();
250-
positions.add(position);
251-
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
252-
txnMetaMap.put(leastSigBits, pair);
253-
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
254-
createdTransactionCount.increment();
255-
completableFuture.complete(txnID);
256-
}
257-
});
258-
});
259-
return completableFuture;
235+
long mostSigBits = tcID.getId();
236+
long leastSigBits = sequenceIdGenerator.generateSequenceId();
237+
TxnID txnID = new TxnID(mostSigBits, leastSigBits);
238+
long currentTimeMillis = System.currentTimeMillis();
239+
TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry()
240+
.setTxnidMostBits(mostSigBits)
241+
.setTxnidLeastBits(leastSigBits)
242+
.setStartTime(currentTimeMillis)
243+
.setTimeoutMs(timeOut)
244+
.setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
245+
.setLastModificationTime(currentTimeMillis)
246+
.setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
247+
transactionLog.append(transactionMetadataEntry)
248+
.whenComplete((position, throwable) -> {
249+
if (throwable != null) {
250+
completableFuture.completeExceptionally(throwable);
251+
} else {
252+
appendLogCount.increment();
253+
TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut);
254+
List<Position> positions = new ArrayList<>();
255+
positions.add(position);
256+
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
257+
txnMetaMap.put(leastSigBits, pair);
258+
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
259+
createdTransactionCount.increment();
260+
completableFuture.complete(txnID);
261+
}
262+
});
263+
});
264+
return completableFuture;
265+
} else {
266+
return FutureUtil.failedFuture(new CoordinatorException.ReachMaxActiveTxnException("New txn op "
267+
+ "reach max active txn! tcId : " + getTransactionCoordinatorID().getId()));
268+
}
260269
}
261270

262271
@Override

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
4141
ManagedLedgerFactory managedLedgerFactory,
4242
ManagedLedgerConfig managedLedgerConfig,
4343
TransactionTimeoutTracker timeoutTracker,
44-
TransactionRecoverTracker recoverTracker) {
44+
TransactionRecoverTracker recoverTracker,
45+
long maxActiveTransactionsPerCoordinator) {
4546
MLTransactionSequenceIdGenerator mlTransactionSequenceIdGenerator = new MLTransactionSequenceIdGenerator();
4647
managedLedgerConfig.setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
4748
MLTransactionLogImpl txnLog = new MLTransactionLogImpl(transactionCoordinatorId,
@@ -50,6 +51,6 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
5051
// MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties.
5152
return txnLog.initialize().thenCompose(__ ->
5253
new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker,
53-
mlTransactionSequenceIdGenerator).init(recoverTracker));
54+
mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker));
5455
}
5556
}

0 commit comments

Comments
 (0)