Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proto): use full ranges for remaining proto #2570

Merged
merged 2 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V2_7_0_0
case 1:
return V2_0_0_0
default:
case 0:
return V0_11_0_0
default:
return V2_7_0_0
}
}
4 changes: 3 additions & 1 deletion add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,10 @@ func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
case 1:
return V2_0_0_0
default:
case 0:
return V0_11_0_0
default:
return V2_7_0_0
}
}

Expand Down
30 changes: 25 additions & 5 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,11 +238,15 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
Timeout: ca.conf.Admin.Timeout,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}
if ca.conf.Version.IsAtLeast(V1_0_0_0) {
if ca.conf.Version.IsAtLeast(V2_0_0_0) {
// Version 3 is the same as version 2 (brokers response before throttling)
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
// Version 2 is the same as version 1 (response has ThrottleTime)
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_10_2_0) {
// Version 1 adds validateOnly.
request.Version = 1
}

return ca.retryOnError(isErrNoController, func() error {
Expand Down Expand Up @@ -424,7 +428,12 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
Timeout: ca.conf.Admin.Timeout,
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
// Versions 0, 1, 2, and 3 are the same.
if ca.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 3
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 2
} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 1
}

Expand Down Expand Up @@ -918,8 +927,19 @@ func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*Group
describeReq := &DescribeGroupsRequest{
Groups: brokerGroups,
}

if ca.conf.Version.IsAtLeast(V2_4_0_0) {
// Starting in version 4, the response will include group.instance.id info for members.
describeReq.Version = 4
} else if ca.conf.Version.IsAtLeast(V2_3_0_0) {
// Starting in version 3, authorized operations can be requested.
describeReq.Version = 3
} else if ca.conf.Version.IsAtLeast(V2_0_0_0) {
// Version 2 is the same as version 0.
describeReq.Version = 2
} else if ca.conf.Version.IsAtLeast(V1_1_0_0) {
// Version 1 is the same as version 0.
describeReq.Version = 1
}
response, err := broker.DescribeGroups(describeReq)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions api_versions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func (r *ApiVersionsRequest) isValidVersion() bool {

func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_10_0_0
default:
return V2_4_0_0
}
}
10 changes: 7 additions & 3 deletions api_versions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,16 @@ func (r *ApiVersionsResponse) isValidVersion() bool {

func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return V0_10_0_0
case 3:
return V2_4_0_0
default:
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_10_0_0
default:
return V2_4_0_0
}
}

Expand Down
20 changes: 17 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,26 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
}

func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
// FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
brokerErrors := make([]error, 0)
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
var response *InitProducerIDResponse
req := &InitProducerIDRequest{}
request := &InitProducerIDRequest{}

if client.conf.Version.IsAtLeast(V2_7_0_0) {
// Version 4 adds the support for new error code PRODUCER_FENCED.
request.Version = 4
} else if client.conf.Version.IsAtLeast(V2_5_0_0) {
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
request.Version = 3
} else if client.conf.Version.IsAtLeast(V2_4_0_0) {
// Version 2 is the first flexible version.
request.Version = 2
} else if client.conf.Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}

response, err := broker.InitProducerID(req)
response, err := broker.InitProducerID(request)
if err == nil {
return response, nil
} else {
Expand Down
4 changes: 3 additions & 1 deletion create_topics_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
case 1:
return V0_10_2_0
default:
case 0:
return V0_10_1_0
default:
return V2_8_0_0
}
}

Expand Down
4 changes: 3 additions & 1 deletion create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
case 1:
return V0_10_2_0
default:
case 0:
return V0_10_1_0
default:
return V2_8_0_0
}
}

Expand Down
4 changes: 3 additions & 1 deletion delete_topics_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_10_1_0
default:
return V2_2_0_0
}
}
4 changes: 3 additions & 1 deletion delete_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,10 @@ func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
return V2_0_0_0
case 1:
return V0_11_0_0
default:
case 0:
return V0_10_1_0
default:
return V2_2_0_0
}
}

Expand Down
15 changes: 9 additions & 6 deletions describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,19 @@ func (r *DescribeGroupsRequest) isValidVersion() bool {

func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V1_1_0_0
case 2:
return V2_0_0_0
case 4:
return V2_4_0_0
case 3:
return V2_3_0_0
case 4:
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_9_0_0
default:
return V2_4_0_0
}
return V0_9_0_0
}

func (r *DescribeGroupsRequest) AddGroup(group string) {
Expand Down
15 changes: 9 additions & 6 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,19 @@ func (r *DescribeGroupsResponse) isValidVersion() bool {

func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V1_1_0_0
case 2:
return V2_0_0_0
case 4:
return V2_4_0_0
case 3:
return V2_3_0_0
case 4:
case 2:
return V2_0_0_0
case 1:
return V0_11_0_0
case 0:
return V0_9_0_0
default:
return V2_4_0_0
}
return V0_9_0_0
}

func (r *DescribeGroupsResponse) throttleTime() time.Duration {
Expand Down
4 changes: 3 additions & 1 deletion init_producer_id_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (i *InitProducerIDRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
case 1:
return V2_0_0_0
default:
case 0:
return V0_11_0_0
default:
return V2_7_0_0
}
}
65 changes: 53 additions & 12 deletions transaction_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ type topicPartition struct {
}

// to ensure that we don't do a full scan every time a partition or an offset is added.
type topicPartitionSet map[topicPartition]struct{}
type topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
type (
topicPartitionSet map[topicPartition]struct{}
topicPartitionOffsets map[topicPartition]*PartitionOffsetMetadata
)

func (s topicPartitionSet) mapToRequest() map[string][]int32 {
result := make(map[string][]int32, len(s))
Expand Down Expand Up @@ -315,12 +317,20 @@ func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets,
if err != nil {
return true, err
}
response, err := coordinator.AddOffsetsToTxn(&AddOffsetsToTxnRequest{
request := &AddOffsetsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
})
}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
response, err := coordinator.AddOffsetsToTxn(request)
if err != nil {
// If an error occurred try to refresh current transaction coordinator.
_ = coordinator.Close()
Expand Down Expand Up @@ -390,13 +400,18 @@ func (t *transactionManager) publishOffsetsToTxn(offsets topicPartitionOffsets,
if err != nil {
return resultOffsets, true, err
}
responses, err := consumerGroupCoordinator.TxnOffsetCommit(&TxnOffsetCommitRequest{
request := &TxnOffsetCommitRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
GroupID: groupId,
Topics: offsets.mapToRequest(),
})
}
if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
responses, err := consumerGroupCoordinator.TxnOffsetCommit(request)
if err != nil {
_ = consumerGroupCoordinator.Close()
_ = t.client.RefreshCoordinator(groupId)
Expand Down Expand Up @@ -466,13 +481,24 @@ func (t *transactionManager) initProducerId() (int64, int16, error) {
}

if t.client.Config().Version.IsAtLeast(V2_5_0_0) {
req.Version = 3
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 4 adds the support for new error code PRODUCER_FENCED.
req.Version = 4
} else {
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try
// to resume after an INVALID_PRODUCER_EPOCH error
req.Version = 3
}
isEpochBump = t.producerID != noProducerID && t.producerEpoch != noProducerEpoch
t.coordinatorSupportsBumpingEpoch = true
req.ProducerID = t.producerID
req.ProducerEpoch = t.producerEpoch
} else if t.client.Config().Version.IsAtLeast(V2_4_0_0) {
// Version 2 is the first flexible version.
req.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
req.Version = 1
}

if isEpochBump {
Expand Down Expand Up @@ -610,12 +636,20 @@ func (t *transactionManager) endTxn(commit bool) error {
if err != nil {
return true, err
}
response, err := coordinator.EndTxn(&EndTxnRequest{
request := &EndTxnRequest{
TransactionalID: t.transactionalID,
ProducerEpoch: t.producerEpoch,
ProducerID: t.producerID,
TransactionResult: commit,
})
}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
response, err := coordinator.EndTxn(request)
if err != nil {
// Always retry on network error
_ = coordinator.Close()
Expand Down Expand Up @@ -779,13 +813,20 @@ func (t *transactionManager) publishTxnPartitions() error {
if err != nil {
return true, err
}
addPartResponse, err := coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{
request := &AddPartitionsToTxnRequest{
TransactionalID: t.transactionalID,
ProducerID: t.producerID,
ProducerEpoch: t.producerEpoch,
TopicPartitions: t.pendingPartitionsInCurrentTxn.mapToRequest(),
})

}
if t.client.Config().Version.IsAtLeast(V2_7_0_0) {
// Version 2 adds the support for new error code PRODUCER_FENCED.
request.Version = 2
} else if t.client.Config().Version.IsAtLeast(V2_0_0_0) {
// Version 1 is the same as version 0.
request.Version = 1
}
addPartResponse, err := coordinator.AddPartitionsToTxn(request)
if err != nil {
_ = coordinator.Close()
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)
Expand Down