Skip to content

Commit

Permalink
Fix race in broker version check
Browse files Browse the repository at this point in the history
Move it until after we're sure we have a version to check against. Fixes a race
found by kchaliki.

Also make the which-version-do-I-need part of the protocol interface where it
belongs.
  • Loading branch information
eapache committed Jun 17, 2016
1 parent 19422bf commit 90a4cc6
Show file tree
Hide file tree
Showing 36 changed files with 164 additions and 24 deletions.
4 changes: 4 additions & 0 deletions api_versions_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ func (r *ApiVersionsRequest) key() int16 {
func (r *ApiVersionsRequest) version() int16 {
return 0
}

func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
}
4 changes: 4 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func (r *ApiVersionsResponse) key() int16 {
func (r *ApiVersionsResponse) version() int16 {
return 0
}

func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
}
19 changes: 4 additions & 15 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,21 +198,6 @@ func (b *Broker) GetAvailableOffsets(request *OffsetRequest) (*OffsetResponse, e
}

func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) {
switch request.version() {
case 0:
break
case 1:
if !b.conf.Version.IsAtLeast(V0_9_0_0) {
return nil, ErrUnsupportedVersion
}
case 2:
if !b.conf.Version.IsAtLeast(V0_10_0_0) {
return nil, ErrUnsupportedVersion
}
default:
return nil, ErrUnsupportedVersion
}

var response *ProduceResponse
var err error

Expand Down Expand Up @@ -343,6 +328,10 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
return nil, ErrNotConnected
}

if !b.conf.Version.IsAtLeast(rb.requiredVersion()) {
return nil, ErrUnsupportedVersion
}

req := &request{correlationID: b.correlationID, clientID: b.conf.ClientID, body: rb}
buf, err := encode(req)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func TestSimpleBrokerCommunication(t *testing.T) {
defer mb.Close()

broker := NewBroker(mb.Addr())
err := broker.Open(nil)
conf := NewConfig()
conf.Version = V0_10_0_0
err := broker.Open(conf)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func NewConfig() *Config {

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.Version = V0_8_2_0
c.Version = minVersion

return c
}
Expand Down
4 changes: 4 additions & 0 deletions consumer_metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ func (r *ConsumerMetadataRequest) key() int16 {
func (r *ConsumerMetadataRequest) version() int16 {
return 0
}

func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
return V0_8_2_0
}
4 changes: 4 additions & 0 deletions consumer_metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ func (r *ConsumerMetadataResponse) key() int16 {
func (r *ConsumerMetadataResponse) version() int16 {
return 0
}

func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
return V0_8_2_0
}
4 changes: 4 additions & 0 deletions describe_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (r *DescribeGroupsRequest) version() int16 {
return 0
}

func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *DescribeGroupsRequest) AddGroup(group string) {
r.Groups = append(r.Groups, group)
}
4 changes: 4 additions & 0 deletions describe_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (r *DescribeGroupsResponse) version() int16 {
return 0
}

func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}

type GroupDescription struct {
Err KError
GroupId string
Expand Down
4 changes: 4 additions & 0 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func (f *FetchRequest) version() int16 {
return 0
}

func (r *FetchRequest) requiredVersion() KafkaVersion {
return minVersion
}

func (f *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
if f.blocks == nil {
f.blocks = make(map[string]map[int32]*fetchRequestBlock)
Expand Down
4 changes: 4 additions & 0 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (r *FetchResponse) version() int16 {
return 0
}

func (r *FetchResponse) requiredVersion() KafkaVersion {
return minVersion
}

func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
if fr.Blocks == nil {
return nil
Expand Down
4 changes: 4 additions & 0 deletions heartbeat_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ func (r *HeartbeatRequest) key() int16 {
func (r *HeartbeatRequest) version() int16 {
return 0
}

func (r *HeartbeatRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions heartbeat_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (r *HeartbeatResponse) key() int16 {
func (r *HeartbeatResponse) version() int16 {
return 0
}

func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions join_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (r *JoinGroupRequest) version() int16 {
return 0
}

func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *JoinGroupRequest) AddGroupProtocol(name string, metadata []byte) {
if r.GroupProtocols == nil {
r.GroupProtocols = make(map[string][]byte)
Expand Down
4 changes: 4 additions & 0 deletions join_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ func (r *JoinGroupResponse) key() int16 {
func (r *JoinGroupResponse) version() int16 {
return 0
}

func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions leave_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func (r *LeaveGroupRequest) key() int16 {
func (r *LeaveGroupRequest) version() int16 {
return 0
}

func (r *LeaveGroupRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions leave_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@ func (r *LeaveGroupResponse) key() int16 {
func (r *LeaveGroupResponse) version() int16 {
return 0
}

func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions list_groups_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ func (r *ListGroupsRequest) key() int16 {
func (r *ListGroupsRequest) version() int16 {
return 0
}

func (r *ListGroupsRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions list_groups_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,7 @@ func (r *ListGroupsResponse) key() int16 {
func (r *ListGroupsResponse) version() int16 {
return 0
}

func (r *ListGroupsResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
4 changes: 4 additions & 0 deletions metadata_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ func (mr *MetadataRequest) key() int16 {
func (mr *MetadataRequest) version() int16 {
return 0
}

func (mr *MetadataRequest) requiredVersion() KafkaVersion {
return minVersion
}
4 changes: 4 additions & 0 deletions metadata_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (r *MetadataResponse) version() int16 {
return 0
}

func (r *MetadataResponse) requiredVersion() KafkaVersion {
return minVersion
}

// testing API

func (m *MetadataResponse) AddBroker(addr string, id int32) {
Expand Down
13 changes: 12 additions & 1 deletion offset_commit_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type OffsetCommitRequest struct {
// Version can be:
// - 0 (kafka 0.8.1 and later)
// - 1 (kafka 0.8.2 and later)
// - 2 (kafka 0.8.3 and later)
// - 2 (kafka 0.9.0 and later)
Version int16
blocks map[string]map[int32]*offsetCommitRequestBlock
}
Expand Down Expand Up @@ -166,6 +166,17 @@ func (r *OffsetCommitRequest) version() int16 {
return r.Version
}

func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_8_2_0
case 2:
return V0_9_0_0
default:
return minVersion
}
}

func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
Expand Down
4 changes: 4 additions & 0 deletions offset_commit_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,7 @@ func (r *OffsetCommitResponse) key() int16 {
func (r *OffsetCommitResponse) version() int16 {
return 0
}

func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
return minVersion
}
9 changes: 9 additions & 0 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ func (r *OffsetFetchRequest) version() int16 {
return r.Version
}

func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_8_2_0
default:
return minVersion
}
}

func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
if r.partitions == nil {
r.partitions = make(map[string][]int32)
Expand Down
4 changes: 4 additions & 0 deletions offset_fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ func (r *OffsetFetchResponse) version() int16 {
return 0
}

func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
return minVersion
}

func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
if r.Blocks == nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func initOffsetManager(t *testing.T) (om OffsetManager,
config := NewConfig()
config.Metadata.Retry.Max = 1
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
config.Version = V0_9_0_0

broker = NewMockBroker(t, 1)
coordinator = NewMockBroker(t, 2)
Expand Down
4 changes: 4 additions & 0 deletions offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func (r *OffsetRequest) version() int16 {
return 0
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
return minVersion
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetRequestBlock)
Expand Down
4 changes: 4 additions & 0 deletions offset_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (r *OffsetResponse) version() int16 {
return 0
}

func (r *OffsetResponse) requiredVersion() KafkaVersion {
return minVersion
}

// testing API

func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
Expand Down
11 changes: 11 additions & 0 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,17 @@ func (p *ProduceRequest) version() int16 {
return p.Version
}

func (p *ProduceRequest) requiredVersion() KafkaVersion {
switch p.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
if p.msgSets == nil {
p.msgSets = make(map[string]map[int32]*MessageSet)
Expand Down
11 changes: 11 additions & 0 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ func (r *ProduceResponse) version() int16 {
return r.Version
}

func (r *ProduceResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
default:
return minVersion
}
}

func (pr *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
if pr.Blocks == nil {
return nil
Expand Down
1 change: 1 addition & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type protocolBody interface {
versionedDecoder
key() int16
version() int16
requiredVersion() KafkaVersion
}

type request struct {
Expand Down
4 changes: 4 additions & 0 deletions sasl_handshake_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ func (r *SaslHandshakeRequest) key() int16 {
func (r *SaslHandshakeRequest) version() int16 {
return 0
}

func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
}
4 changes: 4 additions & 0 deletions sasl_handshake_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ func (r *SaslHandshakeResponse) key() int16 {
func (r *SaslHandshakeResponse) version() int16 {
return 0
}

func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion {
return V0_10_0_0
}
4 changes: 4 additions & 0 deletions sync_group_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func (r *SyncGroupRequest) version() int16 {
return 0
}

func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
return V0_9_0_0
}

func (r *SyncGroupRequest) AddGroupAssignment(memberId string, memberAssignment []byte) {
if r.GroupAssignments == nil {
r.GroupAssignments = make(map[string][]byte)
Expand Down
4 changes: 4 additions & 0 deletions sync_group_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,7 @@ func (r *SyncGroupResponse) key() int16 {
func (r *SyncGroupResponse) version() int16 {
return 0
}

func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
return V0_9_0_0
}
Loading

0 comments on commit 90a4cc6

Please sign in to comment.