Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(txmgr): ErrOffsetsLoadInProgress is retriable #2701

Merged
merged 1 commit into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 93 additions & 93 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,98 +173,98 @@ type KError int16

// Numeric error codes returned by the Kafka server.
const (
ErrNoError KError = 0
ErrUnknown KError = -1
ErrOffsetOutOfRange KError = 1
ErrInvalidMessage KError = 2
ErrUnknownTopicOrPartition KError = 3
ErrInvalidMessageSize KError = 4
ErrLeaderNotAvailable KError = 5
ErrNotLeaderForPartition KError = 6
ErrRequestTimedOut KError = 7
ErrBrokerNotAvailable KError = 8
ErrReplicaNotAvailable KError = 9
ErrMessageSizeTooLarge KError = 10
ErrStaleControllerEpochCode KError = 11
ErrOffsetMetadataTooLarge KError = 12
ErrNetworkException KError = 13
ErrOffsetsLoadInProgress KError = 14
ErrConsumerCoordinatorNotAvailable KError = 15
ErrNotCoordinatorForConsumer KError = 16
ErrInvalidTopic KError = 17
ErrMessageSetSizeTooLarge KError = 18
ErrNotEnoughReplicas KError = 19
ErrNotEnoughReplicasAfterAppend KError = 20
ErrInvalidRequiredAcks KError = 21
ErrIllegalGeneration KError = 22
ErrInconsistentGroupProtocol KError = 23
ErrInvalidGroupId KError = 24
ErrUnknownMemberId KError = 25
ErrInvalidSessionTimeout KError = 26
ErrRebalanceInProgress KError = 27
ErrInvalidCommitOffsetSize KError = 28
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
ErrInvalidTimestamp KError = 32
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
ErrTopicAlreadyExists KError = 36
ErrInvalidPartitions KError = 37
ErrInvalidReplicationFactor KError = 38
ErrInvalidReplicaAssignment KError = 39
ErrInvalidConfig KError = 40
ErrNotController KError = 41
ErrInvalidRequest KError = 42
ErrUnsupportedForMessageFormat KError = 43
ErrPolicyViolation KError = 44
ErrOutOfOrderSequenceNumber KError = 45
ErrDuplicateSequenceNumber KError = 46
ErrInvalidProducerEpoch KError = 47
ErrInvalidTxnState KError = 48
ErrInvalidProducerIDMapping KError = 49
ErrInvalidTransactionTimeout KError = 50
ErrConcurrentTransactions KError = 51
ErrTransactionCoordinatorFenced KError = 52
ErrTransactionalIDAuthorizationFailed KError = 53
ErrSecurityDisabled KError = 54
ErrOperationNotAttempted KError = 55
ErrKafkaStorageError KError = 56
ErrLogDirNotFound KError = 57
ErrSASLAuthenticationFailed KError = 58
ErrUnknownProducerID KError = 59
ErrReassignmentInProgress KError = 60
ErrDelegationTokenAuthDisabled KError = 61
ErrDelegationTokenNotFound KError = 62
ErrDelegationTokenOwnerMismatch KError = 63
ErrDelegationTokenRequestNotAllowed KError = 64
ErrDelegationTokenAuthorizationFailed KError = 65
ErrDelegationTokenExpired KError = 66
ErrInvalidPrincipalType KError = 67
ErrNonEmptyGroup KError = 68
ErrGroupIDNotFound KError = 69
ErrFetchSessionIDNotFound KError = 70
ErrInvalidFetchSessionEpoch KError = 71
ErrListenerNotFound KError = 72
ErrTopicDeletionDisabled KError = 73
ErrFencedLeaderEpoch KError = 74
ErrUnknownLeaderEpoch KError = 75
ErrUnsupportedCompressionType KError = 76
ErrStaleBrokerEpoch KError = 77
ErrOffsetNotAvailable KError = 78
ErrMemberIdRequired KError = 79
ErrPreferredLeaderNotAvailable KError = 80
ErrGroupMaxSizeReached KError = 81
ErrFencedInstancedId KError = 82
ErrEligibleLeadersNotAvailable KError = 83
ErrElectionNotNeeded KError = 84
ErrNoReassignmentInProgress KError = 85
ErrGroupSubscribedToTopic KError = 86
ErrInvalidRecord KError = 87
ErrUnstableOffsetCommit KError = 88
ErrThrottlingQuotaExceeded KError = 89
ErrProducerFenced KError = 90
ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR
ErrNoError KError = 0 // Errors.NONE
ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE
ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE
ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION
ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE
ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE
ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER
ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT
ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE
ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE
ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE
ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH
ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION
ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR
ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE
ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS
ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS
ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION
ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID
ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID
ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT
ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS
ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP
ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE
ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION
ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS
ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS
ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR
ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG
ErrNotController KError = 41 // Errors.NOT_CONTROLLER
ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST
ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION
ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH
ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE
ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS
ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED
ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR
ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND
ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID
ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP
ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND
ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND
ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED
ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH
ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH
ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE
ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED
ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID
ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED
ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD
ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT
ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED
ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED
)

func (err KError) Error() string {
Expand Down Expand Up @@ -302,7 +302,7 @@ func (err KError) Error() string {
case ErrNetworkException:
return "kafka server: The server disconnected before a response was received"
case ErrOffsetsLoadInProgress:
return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition"
return "kafka server: The coordinator is still loading offsets and cannot currently process requests"
case ErrConsumerCoordinatorNotAvailable:
return "kafka server: Offset's topic has not yet been created"
case ErrNotCoordinatorForConsumer:
Expand Down
40 changes: 40 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -1467,3 +1467,43 @@ func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeade
}
return res
}

// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
type MockInitProducerIDResponse struct {
producerID int64
producerEpoch int16
err KError
t TestReporter
}

func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
return &MockInitProducerIDResponse{
t: t,
}
}

func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
m.producerID = int64(id)
return m
}

func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
m.producerEpoch = int16(epoch)
return m
}

func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
m.err = err
return m
}

func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*InitProducerIDRequest)
res := &InitProducerIDResponse{
Version: req.Version,
Err: m.err,
ProducerID: m.producerID,
ProducerEpoch: m.producerEpoch,
}
return res
}
5 changes: 2 additions & 3 deletions transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,9 +569,8 @@ func (t *transactionManager) initProducerId() (int64, int16, error) {
return response.ProducerID, response.ProducerEpoch, false, nil
}
switch response.Err {
case ErrConsumerCoordinatorNotAvailable:
fallthrough
case ErrNotCoordinatorForConsumer:
// Retriable errors
case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
if t.isTransactional() {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
Expand Down
47 changes: 47 additions & 0 deletions transaction_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,53 @@ func TestTxnmgrInitProducerIdTxn(t *testing.T) {
require.Equal(t, ProducerTxnFlagReady, txmng.status)
}

// TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress
func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) {
config := NewTestConfig()
config.Producer.Idempotent = true
config.Producer.Transaction.ID = "txid-group"
config.Version = V0_11_0_0
config.Producer.RequiredAcks = WaitForAll
config.Net.MaxOpenRequests = 1

broker := NewMockBroker(t, 1)
defer broker.Close()

broker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(broker.BrokerID()).
SetBroker(broker.Addr(), broker.BrokerID()),
"FindCoordinatorRequest": NewMockSequence(
NewMockFindCoordinatorResponse(t).
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
NewMockFindCoordinatorResponse(t).
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
NewMockFindCoordinatorResponse(t).
SetCoordinator(CoordinatorTransaction, "txid-group", broker),
),
"InitProducerIDRequest": NewMockSequence(
NewMockInitProducerIDResponse(t).
SetError(ErrOffsetsLoadInProgress),
NewMockInitProducerIDResponse(t).
SetError(ErrOffsetsLoadInProgress),
NewMockInitProducerIDResponse(t).
SetProducerID(1).
SetProducerEpoch(0),
),
})

client, err := NewClient([]string{broker.Addr()}, config)
require.NoError(t, err)
defer client.Close()

txmng, err := newTransactionManager(config, client)
require.NoError(t, err)

require.Equal(t, int64(1), txmng.producerID)
require.Equal(t, int16(0), txmng.producerEpoch)
require.Equal(t, ProducerTxnFlagReady, txmng.status)
}

func TestMaybeAddPartitionToCurrentTxn(t *testing.T) {
type testCase struct {
initialFlags ProducerTxnStatusFlag
Expand Down
Loading