From d0340489e8be655e6f978bf93d1d35f1efc62f14 Mon Sep 17 00:00:00 2001 From: Robin Date: Thu, 25 Jan 2018 10:27:38 +0100 Subject: [PATCH] remove deprecated fields --- consumer_metadata_response.go | 41 ++++++++++++++++++------ consumer_metadata_response_test.go | 13 ++++++-- find_coordinator_response.go | 50 +++++------------------------- find_coordinator_response_test.go | 13 +++----- 4 files changed, 55 insertions(+), 62 deletions(-) diff --git a/consumer_metadata_response.go b/consumer_metadata_response.go index 722035205..442cbde7a 100644 --- a/consumer_metadata_response.go +++ b/consumer_metadata_response.go @@ -1,5 +1,10 @@ package sarama +import ( + "net" + "strconv" +) + type ConsumerMetadataResponse struct { Err KError Coordinator *Broker @@ -16,22 +21,40 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder, version int16) (err } r.Err = tmp.Err + r.Coordinator = tmp.Coordinator - r.CoordinatorID = tmp.CoordinatorID - r.CoordinatorHost = tmp.CoordinatorHost - r.CoordinatorPort = tmp.CoordinatorPort + if tmp.Coordinator == nil { + return nil + } + + // this can all go away in 2.0, but we have to fill in deprecated fields to maintain + // backwards compatibility + host, portstr, err := net.SplitHostPort(r.Coordinator.Addr()) + if err != nil { + return err + } + port, err := strconv.ParseInt(portstr, 10, 32) + if err != nil { + return err + } + r.CoordinatorID = r.Coordinator.ID() + r.CoordinatorHost = host + r.CoordinatorPort = int32(port) return nil } func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error { + if r.Coordinator == nil { + r.Coordinator = new(Broker) + r.Coordinator.id = r.CoordinatorID + r.Coordinator.addr = net.JoinHostPort(r.CoordinatorHost, strconv.Itoa(int(r.CoordinatorPort))) + } + tmp := &FindCoordinatorResponse{ - Version: 0, - Err: r.Err, - Coordinator: r.Coordinator, - CoordinatorID: r.CoordinatorID, - CoordinatorHost: r.CoordinatorHost, - CoordinatorPort: r.CoordinatorPort, + Version: 0, + Err: r.Err, + Coordinator: r.Coordinator, } if err := tmp.encode(pe); err != nil { diff --git a/consumer_metadata_response_test.go b/consumer_metadata_response_test.go index b748784d7..8482f6ff1 100644 --- a/consumer_metadata_response_test.go +++ b/consumer_metadata_response_test.go @@ -17,8 +17,17 @@ var ( ) func TestConsumerMetadataResponseError(t *testing.T) { - response := ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} - testResponse(t, "error", &response, consumerMetadataResponseError) + response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress} + testEncodable(t, "", response, consumerMetadataResponseError) + + decodedResp := &ConsumerMetadataResponse{} + if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil { + t.Error("could not decode: ", err) + } + + if decodedResp.Err != ErrOffsetsLoadInProgress { + t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress) + } } func TestConsumerMetadataResponseSuccess(t *testing.T) { diff --git a/find_coordinator_response.go b/find_coordinator_response.go index 2bd6b88f9..f2d178f7c 100644 --- a/find_coordinator_response.go +++ b/find_coordinator_response.go @@ -1,20 +1,15 @@ package sarama import ( - "net" - "strconv" "time" ) type FindCoordinatorResponse struct { - Version int16 - ThrottleTime time.Duration - Err KError - ErrMsg *string - Coordinator *Broker - CoordinatorID int32 // deprecated: use Coordinator.ID() - CoordinatorHost string // deprecated: use Coordinator.Addr() - CoordinatorPort int32 // deprecated: use Coordinator.Addr() + Version int16 + ThrottleTime time.Duration + Err KError + ErrMsg *string + Coordinator *Broker } func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err error) { @@ -49,20 +44,6 @@ func (f *FindCoordinatorResponse) decode(pd packetDecoder, version int16) (err e } f.Coordinator = coordinator - // this can all go away in 2.0, but we have to fill in deprecated fields to maintain - // backwards compatibility - host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - f.CoordinatorID = f.Coordinator.ID() - f.CoordinatorHost = host - f.CoordinatorPort = int32(port) - return nil } @@ -79,27 +60,10 @@ func (f *FindCoordinatorResponse) encode(pe packetEncoder) error { } } - if f.Coordinator != nil { - host, portstr, err := net.SplitHostPort(f.Coordinator.Addr()) - if err != nil { - return err - } - port, err := strconv.ParseInt(portstr, 10, 32) - if err != nil { - return err - } - pe.putInt32(f.Coordinator.ID()) - if err := pe.putString(host); err != nil { - return err - } - pe.putInt32(int32(port)) - return nil - } - pe.putInt32(f.CoordinatorID) - if err := pe.putString(f.CoordinatorHost); err != nil { + if err := f.Coordinator.encode(pe); err != nil { return err } - pe.putInt32(f.CoordinatorPort) + return nil } diff --git a/find_coordinator_response_test.go b/find_coordinator_response_test.go index 6bc5d3cf5..39cec6469 100644 --- a/find_coordinator_response_test.go +++ b/find_coordinator_response_test.go @@ -29,14 +29,11 @@ func TestFindCoordinatorResponse(t *testing.T) { broker := NewBroker("host:9092") broker.id = 1 resp := &FindCoordinatorResponse{ - Version: 1, - ThrottleTime: 100 * time.Millisecond, - Err: ErrNoError, - ErrMsg: nil, - CoordinatorID: 1, - CoordinatorHost: "host", - CoordinatorPort: 9092, - Coordinator: broker, + Version: 1, + ThrottleTime: 100 * time.Millisecond, + Err: ErrNoError, + ErrMsg: nil, + Coordinator: broker, } testResponse(t, "version 1 - no error", resp, findCoordinatorResponse)