Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pulsar Transaction]Transaction client transaction state check #9776

Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ private void testTransactionBufferLowWaterMark() throws Exception {
message = consumer.receive(2, TimeUnit.SECONDS);
assertNull(message);

Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(txn, TransactionImpl.State.OPEN);
producer.newMessage(txn).value(TEST2.getBytes()).send();

message = consumer.receive(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -236,6 +239,9 @@ private void testPendingAckLowWaterMark() throws Exception {
assertTrue(individualAckOfTransaction.containsKey(new TxnID(((TransactionImpl) txn).getTxnIdMostBits(),
((TransactionImpl) txn).getTxnIdLeastBits())));
txn.commit().get();
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(txn, TransactionImpl.State.OPEN);
assertFalse(individualAckOfTransaction.containsKey(new TxnID(((TransactionImpl) txn).getTxnIdMostBits(),
((TransactionImpl) txn).getTxnIdLeastBits())));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ public void txnAckTestBatchAndSharedSubMemoryDeleteTest() throws Exception {
private Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.client.impl;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -30,13 +31,15 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand All @@ -52,6 +55,7 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
Expand All @@ -62,6 +66,8 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -345,6 +351,9 @@ private void txnAckTest(boolean batchEnable, int maxBatchSize,
message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);

Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one transaction should be failed.");
Expand Down Expand Up @@ -549,6 +558,9 @@ public void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscrip
}

commitTxn.commit().get();
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(commitTxn, TransactionImpl.State.OPEN);
try {
commitTxn.commit().get();
fail("recommit one transaction should be failed.");
Expand Down Expand Up @@ -676,4 +688,99 @@ public void produceTxnMessageOrderTest() throws Exception {
}
}

@Test
public void produceAndConsumeCloseStateTxnTest() throws Exception {
String topic = NAMESPACE1 + "/txn-close-state";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.producerName("txn-close-state")
.create();

Transaction produceTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();

Transaction consumeTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();

producer.newMessage(produceTxn).value(("Hello Pulsar!").getBytes()).sendAsync().get();
produceTxn.commit().get();
try {
producer.newMessage(produceTxn).value(("Hello Pulsar!").getBytes()).sendAsync().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}

try {
produceTxn.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}


Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
consumeTxn.commit().get();
try {
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}

try {
consumeTxn.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
}

Transaction timeoutTxn = pulsarClient
.newTransaction()
.withTransactionTimeout(1, TimeUnit.SECONDS)
.build().get();
AtomicReference<TransactionMetadataStore> transactionMetadataStore = new AtomicReference<>();
getPulsarServiceList().forEach(pulsarService -> {
if (pulsarService.getTransactionMetadataStoreService().getStores()
.containsKey(TransactionCoordinatorID.get(((TransactionImpl) timeoutTxn).getTxnIdMostBits()))) {
transactionMetadataStore.set(pulsarService.getTransactionMetadataStoreService().getStores()
.get(TransactionCoordinatorID.get(((TransactionImpl) timeoutTxn).getTxnIdMostBits())));
}
});

Awaitility.await().atMost(3000, TimeUnit.MILLISECONDS).until(() -> {
try {
transactionMetadataStore.get().getTxnMeta(new TxnID(((TransactionImpl) timeoutTxn)
.getTxnIdMostBits(), ((TransactionImpl) timeoutTxn).getTxnIdLeastBits())).get();
return false;
} catch (Exception e) {
return true;
}
});

try {
timeoutTxn.commit().get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
TransactionImpl.State state = (TransactionImpl.State) field.get(timeoutTxn);
assertEquals(state, TransactionImpl.State.ERROR);
}
}
Loading