Skip to content

Commit

Permalink
feat: add isValidVersion to protocol types
Browse files Browse the repository at this point in the history
The intention here is that we can check that a request version value is
supported by the protocol encoder/decoder before sending it

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Aug 1, 2023
1 parent bbee916 commit 26d9b01
Show file tree
Hide file tree
Showing 83 changed files with 367 additions and 6 deletions.
4 changes: 4 additions & 0 deletions acl_create_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (c *CreateAclsRequest) headerVersion() int16 {
return 1
}

func (c *CreateAclsRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 1
}

func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 1:
Expand Down
5 changes: 5 additions & 0 deletions acl_create_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// CreateAclsResponse is a an acl response creation type
type CreateAclsResponse struct {
Version int16
ThrottleTime time.Duration
AclCreationResponses []*AclCreationResponse
}
Expand Down Expand Up @@ -59,6 +60,10 @@ func (c *CreateAclsResponse) headerVersion() int16 {
return 0
}

func (c *CreateAclsResponse) isValidVersion() bool {
return c.Version == 0
}

func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (d *DeleteAclsRequest) headerVersion() int16 {
return 1
}

func (d *DeleteAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_delete_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (d *DeleteAclsResponse) headerVersion() int16 {
return 0
}

func (d *DeleteAclsResponse) isValidVersion() bool {
return d.Version == 0
}

func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (d *DescribeAclsRequest) headerVersion() int16 {
return 1
}

func (d *DescribeAclsRequest) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
4 changes: 4 additions & 0 deletions acl_describe_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (d *DescribeAclsResponse) headerVersion() int16 {
return 0
}

func (d *DescribeAclsResponse) isValidVersion() bool {
return d.Version >= 0 && d.Version <= 1
}

func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
Expand Down
5 changes: 5 additions & 0 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddOffsetsToTxnRequest adds offsets to a transaction request
type AddOffsetsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -52,6 +53,10 @@ func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddOffsetsToTxnRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
5 changes: 5 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddOffsetsToTxnResponse is a response type for adding offsets to txns
type AddOffsetsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Err KError
}
Expand Down Expand Up @@ -44,6 +45,10 @@ func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
5 changes: 5 additions & 0 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AddPartitionsToTxnRequest is a add partition request
type AddPartitionsToTxnRequest struct {
Version int16
TransactionalID string
ProducerID int64
ProducerEpoch int16
Expand Down Expand Up @@ -76,6 +77,10 @@ func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
return 1
}

func (a *AddPartitionsToTxnRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
5 changes: 5 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

// AddPartitionsToTxnResponse is a partition errors to transaction type
type AddPartitionsToTxnResponse struct {
Version int16
ThrottleTime time.Duration
Errors map[string][]*PartitionError
}
Expand Down Expand Up @@ -83,6 +84,10 @@ func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
5 changes: 5 additions & 0 deletions alter_client_quotas_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package sarama
// validate_only => BOOLEAN

type AlterClientQuotasRequest struct {
Version int16
Entries []AlterClientQuotasEntry // The quota configuration entries to alter.
ValidateOnly bool // Whether the alteration should be validated, but not performed.
}
Expand Down Expand Up @@ -189,6 +190,10 @@ func (a *AlterClientQuotasRequest) headerVersion() int16 {
return 1
}

func (a *AlterClientQuotasRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasRequest) requiredVersion() KafkaVersion {
return V2_6_0_0
}
5 changes: 5 additions & 0 deletions alter_client_quotas_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
// entity_name => NULLABLE_STRING

type AlterClientQuotasResponse struct {
Version int16
ThrottleTime time.Duration // The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota.
Entries []AlterClientQuotasEntryResponse // The quota configuration entries altered.
}
Expand Down Expand Up @@ -140,6 +141,10 @@ func (a *AlterClientQuotasResponse) headerVersion() int16 {
return 0
}

func (a *AlterClientQuotasResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterClientQuotasResponse) requiredVersion() KafkaVersion {
return V2_6_0_0
}
Expand Down
5 changes: 5 additions & 0 deletions alter_configs_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// AlterConfigsRequest is an alter config request type
type AlterConfigsRequest struct {
Version int16
Resources []*AlterConfigsResource
ValidateOnly bool
}
Expand Down Expand Up @@ -121,6 +122,10 @@ func (a *AlterConfigsRequest) headerVersion() int16 {
return 1
}

func (a *AlterConfigsRequest) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
5 changes: 5 additions & 0 deletions alter_configs_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import "time"

// AlterConfigsResponse is a response type for alter config
type AlterConfigsResponse struct {
Version int16
ThrottleTime time.Duration
Resources []*AlterConfigsResourceResponse
}
Expand Down Expand Up @@ -111,6 +112,10 @@ func (a *AlterConfigsResponse) headerVersion() int16 {
return 0
}

func (a *AlterConfigsResponse) isValidVersion() bool {
return a.Version == 0
}

func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions alter_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
return 2
}

func (r *AlterPartitionReassignmentsRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions alter_partition_reassignments_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
return 1
}

func (r *AlterPartitionReassignmentsResponse) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
return V2_4_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions alter_user_scram_credentials_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ func (r *AlterUserScramCredentialsRequest) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterUserScramCredentialsRequest) requiredVersion() KafkaVersion {
return V2_7_0_0
}
4 changes: 4 additions & 0 deletions alter_user_scram_credentials_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func (r *AlterUserScramCredentialsResponse) headerVersion() int16 {
return 2
}

func (r *AlterUserScramCredentialsResponse) isValidVersion() bool {
return r.Version == 0
}

func (r *AlterUserScramCredentialsResponse) requiredVersion() KafkaVersion {
return V2_7_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions api_versions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (r *ApiVersionsRequest) headerVersion() int16 {
return 1
}

func (r *ApiVersionsRequest) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
}

func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
Expand Down
4 changes: 4 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (r *ApiVersionsResponse) headerVersion() int16 {
return 0
}

func (r *ApiVersionsResponse) isValidVersion() bool {
return r.Version >= 0 && r.Version <= 3
}

func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
Expand Down
5 changes: 5 additions & 0 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

// ConsumerMetadataRequest is used for metadata requests
type ConsumerMetadataRequest struct {
Version int16
ConsumerGroup string
}

Expand Down Expand Up @@ -33,6 +34,10 @@ func (r *ConsumerMetadataRequest) headerVersion() int16 {
return 1
}

func (r *ConsumerMetadataRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
return V0_8_2_0
}
5 changes: 5 additions & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

// ConsumerMetadataResponse holds the response for a consumer group meta data requests
type ConsumerMetadataResponse struct {
Version int16
Err KError
Coordinator *Broker
CoordinatorID int32 // deprecated: use Coordinator.ID()
Expand Down Expand Up @@ -77,6 +78,10 @@ func (r *ConsumerMetadataResponse) headerVersion() int16 {
return 0
}

func (r *ConsumerMetadataResponse) isValidVersion() bool {
return r.Version == 0
}

func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
}
5 changes: 5 additions & 0 deletions create_partitions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import "time"

type CreatePartitionsRequest struct {
Version int16
TopicPartitions map[string]*TopicPartition
Timeout time.Duration
ValidateOnly bool
Expand Down Expand Up @@ -71,6 +72,10 @@ func (r *CreatePartitionsRequest) headerVersion() int16 {
return 1
}

func (r *CreatePartitionsRequest) isValidVersion() bool {
return r.Version == 0
}

func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
return V1_0_0_0
}
Expand Down
5 changes: 5 additions & 0 deletions create_partitions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
)

type CreatePartitionsResponse struct {
Version int16
ThrottleTime time.Duration
TopicPartitionErrors map[string]*TopicPartitionError
}
Expand Down Expand Up @@ -67,6 +68,10 @@ func (r *CreatePartitionsResponse) headerVersion() int16 {
return 0
}

func (r *CreatePartitionsResponse) isValidVersion() bool {
return r.Version == 0
}

func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
return V1_0_0_0
}
Expand Down
4 changes: 4 additions & 0 deletions create_topics_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (r *CreateTopicsRequest) headerVersion() int16 {
return 1
}

func (c *CreateTopicsRequest) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 2
}

func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
switch c.Version {
case 2:
Expand Down
4 changes: 4 additions & 0 deletions create_topics_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ func (c *CreateTopicsResponse) headerVersion() int16 {
return 0
}

func (c *CreateTopicsResponse) isValidVersion() bool {
return c.Version >= 0 && c.Version <= 2
}

func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
switch c.Version {
case 2:
Expand Down
Loading

0 comments on commit 26d9b01

Please sign in to comment.