Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transactional Producer send msg always got ErrOutOfOrderSequenceNumber error when brokers reconnected #2611

Closed
songxinjianqwe opened this issue Aug 21, 2023 · 4 comments
Labels
needs-investigation Issues that require followup from maintainers stale Issues and pull requests without any recent activity

Comments

@songxinjianqwe
Copy link

Versions

Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.

Sarama Kafka Go
1.40.1 2.6.2 1.19
Configuration

What configuration values are you using for Sarama and Kafka?

	var appName = "tx-id-1"
	var config = sarama.NewConfig()
	config.ClientID = appName
	config.Producer.Idempotent = true
	config.Producer.Return.Errors = true
	config.Producer.Return.Successes = true
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewManualPartitioner
	config.Producer.Transaction.Retry.Max = 3
	config.Producer.Transaction.ID = appName
	config.Net.MaxOpenRequests = 1
	config.Producer.Retry.Max = 1
	config.Metadata.Retry.Max = 0
	config.Net.DialTimeout = time.Second * 3
	config.Net.ReadTimeout = time.Second * 3
	config.Net.WriteTimeout = time.Second * 3
Logs

When filing an issue please provide logs from Sarama and Kafka if at all
possible. You can set sarama.Logger to a log.Logger to capture Sarama debug
output.

logs: CLICK ME

2023/08/21 15:36:04 begin txn start
[Sarama] 2023/08/21 15:36:47 txnmgr/transition [tx-id-1] transition from ProducerTxnStateReady to ProducerTxnStateInTransaction
2023/08/21 15:36:53 begin txn done
2023/08/21 15:36:57 send msg: 10 start 
[Sarama] 2023/08/21 15:36:58 client/metadata fetching metadata for [slowsql_unitest] from broker alikafka-post-cn-zxu39oke500i-3-vpc.alikafka.aliyuncs.com:9092
[Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #103 at 10.130.232.87:9092
[Sarama] 2023/08/21 15:36:58 client/brokers registered new broker #102 at 10.130.232.86:9092
[Sarama] 2023/08/21 15:36:58 producer/broker/102 starting up
[Sarama] 2023/08/21 15:36:58 producer/broker/102 state change to [open] on slowsql_unitest/0
[Sarama] 2023/08/21 15:36:58 producer/leader/slowsql_unitest/0 selected broker 102
[Sarama] 2023/08/21 15:36:58 Connected to broker at 10.130.232.86:9092 (registered as #102)
[Sarama] 2023/08/21 15:36:58 txnmgr/add-partition-to-txn [tx-id-1] successful to add partitions txn &{ThrottleTime:0s Errors:map[slowsql_unitest:[0xc000022310]]}
[Sarama] 2023/08/21 15:36:58 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInTransaction to ProducerTxnStateInError|ProducerTxnStateAbortableError
2023/08/21 15:37:14 send msg: 10 done
2023/08/21 15:37:14 send msg 10 fail, err: kafka server: The broker received an out of order sequence number
2023/08/21 15:37:14 abort txn start
[Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] aborting transaction
[Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateInError|ProducerTxnStateAbortableError to ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction
[Sarama] 2023/08/21 15:37:16 txnmgr/endtxn [tx-id-1] successful to end txn &{ThrottleTime:0s Err:kafka server: Not an error, why are you printing me?}
[Sarama] 2023/08/21 15:37:16 txnmgr/transition [tx-id-1] transition from ProducerTxnStateEndTransaction|ProducerTxnStateAbortingTransaction to ProducerTxnStateReady
[Sarama] 2023/08/21 15:37:16 producer/txnmgr [tx-id-1] transaction aborted
2023/08/21 15:37:18 abort txn done

Problem Description

hi, I am using the transactional api in a sync-send-multi-messages-in-txn scenario, but I found an unexpected case when the network not connectable.
The code like this:

		for {
			select {
			case <-ctx.Done():
				return nil
			default:
				log.Printf("begin txn start")
				if err := producer.BeginTxn(); err != nil {
					log.Printf("could not begin transaction: %v", err)
					time.Sleep(time.Second)
					continue
				}
				log.Printf("begin txn done")
				msgList := make([]*sarama.ProducerMessage, 0, 10)
				for j := 0; j < 10; j++ {
					message := sarama.ProducerMessage{Topic: topic, Partition: 0, Key: sarama.StringEncoder(strconv.Itoa(i)), Value: sarama.StringEncoder(msgBody)}
					msgList = append(msgList, &message)
					i += 1
				}
				var commit = true
				for _, msg := range msgList {
					log.Printf("send msg: %s start ", msg.Key)
					_, _, err := producer.SendMessage(msg)
					log.Printf("send msg: %s done", msg.Key)
					if err != nil {
						log.Printf("send msg %s fail, err: %+v", msg.Key, err)
						log.Printf("abort txn start")
						if err := producer.AbortTxn(); err != nil {
							log.Printf("could not abort transaction: %+v", err)
							time.Sleep(time.Second)
							commit = false
							break
						}
						log.Printf("abort txn done")
						time.Sleep(time.Second)
						commit = false
						break
					}
					time.Sleep(time.Second)
				}
				if !commit {
					continue
				}
				log.Printf("commit txn start")
				if err := producer.CommitTxn(); err != nil {
					log.Printf("could not commit transaction: %v\n", err)
					log.Printf("abort txn start")
					if err := producer.AbortTxn(); err != nil {
						log.Printf("could not abort transaction: %v", err)
						time.Sleep(time.Second)
						continue
					}
					log.Printf("abort txn done")
					time.Sleep(time.Second)
					continue
				}
				log.Printf("commit txn done")
				time.Sleep(time.Second)
			}
		}

When brokers die, all messages-sending-call will return error ErrOutOfOrderSequenceNumber (45), until I recreate a sync producer. The case looks like this problem #1430, and this is a fix for the problem : #1661.
I look at this commit ba2b4bc and I found maybe this line of code is the reason for ErrOutOfOrderSequenceNumber.

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
	if p.IsTransactional() {
		_ = p.maybeTransitionToErrorState(err)
	}
	// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
	// will never see a message with this number, so we can never continue the sequence.
        // this line! 
	if !p.IsTransactional() && msg.hasSequence {
		Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
		p.bumpIdempotentProducerEpoch()
	}

!p.IsTransactional() && msg.hasSequence will bump epoch only when non-transactional. But I write a similar code of Java to send messages using Kafka Java SDK, and ErrOutOfOrderSequenceNumber will not happen after the brokers reconnect.
I found a log printed by Kafak Java SDK:

2023-08-21 15:51:11.980 [INFO] [kafka-producer-network-thread | producer-1] [org.apache.kafka.clients.producer.internals.TransactionManager] @@@traceId=TID: N/A@@@ [Producer clientId=producer-1, transactionalId=1] ProducerId set to 42002 with epoch 46

I think this log means that no matter transaction is enabled, kafka will always bump the epoch after messages sent fail, but in sarama, only idempotent enabled and not transaction enabled will bump the epoch.
Is this a bug in sarama ? Could we fix this by removing the transaction predicate in if !p.IsTransactional() && msg.hasSequence condition.

@songxinjianqwe
Copy link
Author

Update: I found a way to fix this, set Version >= 2.5.0
I read transactionManager and found this code

func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
	if errors.Is(err, ErrClusterAuthorizationFailed) ||
		errors.Is(err, ErrProducerFenced) ||
		errors.Is(err, ErrUnsupportedVersion) ||
		errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
		return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
	}
	if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
		p.txnmgr.epochBumpRequired = true
	}
	return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}

and this code:

	if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
		req.Version = 3
		isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
		t.coordinatorSupportsBumpingEpoch = true
		req.ProducerID = t.producerID
		req.ProducerEpoch = t.producerEpoch
	} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
		req.Version = 2
	}

So if we set Version >= 2.5.0, the producer will bump the epoch after messages send fail.

This comment was marked as outdated.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Jan 26, 2024
@dnwe dnwe added needs-investigation Issues that require followup from maintainers and removed stale Issues and pull requests without any recent activity labels Jan 26, 2024
@dnwe
Copy link
Collaborator

dnwe commented Jan 26, 2024

@songxinjianqwe yes exactly right, that bit of code is essentially Sarama's client-side implementation of KIP-360: Improve reliability of idempotent/transactional producer / KAFKA-8805: Bump producer epoch following recoverable errors, and it relies upon a broker change (KAFKA-8710) that was made in Kafka 2.5.0

Copy link

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Apr 25, 2024
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale May 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs-investigation Issues that require followup from maintainers stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

2 participants