Skip to content

Commit

Permalink
Merge pull request #1050 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
 Fix FindCoordinatorResponse.encode to allow nil Coordinator
  • Loading branch information
eapache authored Feb 14, 2018
2 parents 96fcca4 + 0a875d8 commit f93325f
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 39 deletions.
11 changes: 8 additions & 3 deletions find_coordinator_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"time"
)

var NoNode = &Broker{id: -1, addr: ":-1"}

type FindCoordinatorResponse struct {
Version int16
ThrottleTime time.Duration
Expand Down Expand Up @@ -36,7 +38,7 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e
}

coordinator := new(Broker)
if err := coordinator.decode(pd, 0); err != nil {
if err := coordinator.decode(pd, version); err != nil {
return err
}
if coordinator.addr == ":0" {
Expand All @@ -60,10 +62,13 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error {
}
}

if err := f.Coordinator.encode(pe, 0); err != nil {
coordinator := f.Coordinator
if coordinator == nil {
coordinator = NoNode
}
if err := coordinator.encode(pe, f.Version); err != nil {
return err
}

return nil
}

Expand Down
113 changes: 77 additions & 36 deletions find_coordinator_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,83 @@ import (
"time"
)

var (
findCoordinatorResponse = []byte{
0, 0, 0, 100,
0, 0,
255, 255, // empty ErrMsg
0, 0, 0, 1,
0, 4, 'h', 'o', 's', 't',
0, 0, 35, 132,
}

findCoordinatorResponseError = []byte{
0, 0, 0, 100,
0, 15,
0, 3, 'm', 's', 'g',
0, 0, 0, 1,
0, 4, 'h', 'o', 's', 't',
0, 0, 35, 132,
}
)

func TestFindCoordinatorResponse(t *testing.T) {
broker := NewBroker("host:9092")
broker.id = 1
resp := &FindCoordinatorResponse{
Version: 1,
ThrottleTime: 100 * time.Millisecond,
Err: ErrNoError,
ErrMsg: nil,
Coordinator: broker,
}

testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)
errMsg := "kaboom"
brokerRack := "foo"

msg := "msg"
resp.Err = ErrConsumerCoordinatorNotAvailable
resp.ErrMsg = &msg

testResponse(t, "version 1 - error", resp, findCoordinatorResponseError)
for _, tc := range []struct {
desc string
response *FindCoordinatorResponse
encoded []byte
}{{
desc: "version 0 - no error",
response: &FindCoordinatorResponse{
Version: 0,
Err: ErrNoError,
Coordinator: &Broker{
id: 7,
addr: "host:9092",
},
},
encoded: []byte{
0, 0, // Err
0, 0, 0, 7, // Coordinator.ID
0, 4, 'h', 'o', 's', 't', // Coordinator.Host
0, 0, 35, 132, // Coordinator.Port
},
}, {
desc: "version 1 - no error",
response: &FindCoordinatorResponse{
Version: 1,
ThrottleTime: 100 * time.Millisecond,
Err: ErrNoError,
Coordinator: &Broker{
id: 7,
addr: "host:9092",
rack: &brokerRack,
},
},
encoded: []byte{
0, 0, 0, 100, // ThrottleTime
0, 0, // Err
255, 255, // ErrMsg: empty
0, 0, 0, 7, // Coordinator.ID
0, 4, 'h', 'o', 's', 't', // Coordinator.Host
0, 0, 35, 132, // Coordinator.Port
0, 3, 'f', 'o', 'o', // Coordinator.Rack
},
}, {
desc: "version 0 - error",
response: &FindCoordinatorResponse{
Version: 0,
Err: ErrConsumerCoordinatorNotAvailable,
Coordinator: NoNode,
},
encoded: []byte{
0, 15, // Err
255, 255, 255, 255, // Coordinator.ID: -1
0, 0, // Coordinator.Host: ""
255, 255, 255, 255, // Coordinator.Port: -1
},
}, {
desc: "version 1 - error",
response: &FindCoordinatorResponse{
Version: 1,
ThrottleTime: 100 * time.Millisecond,
Err: ErrConsumerCoordinatorNotAvailable,
ErrMsg: &errMsg,
Coordinator: NoNode,
},
encoded: []byte{
0, 0, 0, 100, // ThrottleTime
0, 15, // Err
0, 6, 'k', 'a', 'b', 'o', 'o', 'm', // ErrMsg
255, 255, 255, 255, // Coordinator.ID: -1
0, 0, // Coordinator.Host: ""
255, 255, 255, 255, // Coordinator.Port: -1
255, 255, // Coordinator.Rack: empty
},
}} {
testResponse(t, tc.desc, tc.response, tc.encoded)
}
}
54 changes: 54 additions & 0 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,60 @@ func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
return res
}

// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
type MockFindCoordinatorResponse struct {
groupCoordinators map[string]interface{}
transCoordinators map[string]interface{}
t TestReporter
}

func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
return &MockFindCoordinatorResponse{
groupCoordinators: make(map[string]interface{}),
transCoordinators: make(map[string]interface{}),
t: t,
}
}

func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
switch coordinatorType {
case CoordinatorGroup:
mr.groupCoordinators[group] = broker
case CoordinatorTransaction:
mr.transCoordinators[group] = broker
}
return mr
}

func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
switch coordinatorType {
case CoordinatorGroup:
mr.groupCoordinators[group] = kerror
case CoordinatorTransaction:
mr.transCoordinators[group] = kerror
}
return mr
}

func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*FindCoordinatorRequest)
res := &FindCoordinatorResponse{}
var v interface{}
switch req.CoordinatorType {
case CoordinatorGroup:
v = mr.groupCoordinators[req.CoordinatorKey]
case CoordinatorTransaction:
v = mr.transCoordinators[req.CoordinatorKey]
}
switch v := v.(type) {
case *MockBroker:
res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
case KError:
res.Err = v
}
return res
}

// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
type MockOffsetCommitResponse struct {
errors map[string]map[string]map[int32]KError
Expand Down

0 comments on commit f93325f

Please sign in to comment.