Skip to content

Commit

Permalink
Merge pull request #672 from Shopify/more-protocol-support
Browse files Browse the repository at this point in the history
Add support for latest protocol messages
  • Loading branch information
eapache committed Jun 9, 2016
2 parents 8bf3440 + c5f4248 commit 50ae3cc
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 0 deletions.
20 changes: 20 additions & 0 deletions api_versions_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sarama

type ApiVersionsRequest struct {
}

func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
return nil
}

func (r *ApiVersionsRequest) decode(pd packetDecoder) (err error) {
return nil
}

func (r *ApiVersionsRequest) key() int16 {
return 18
}

func (r *ApiVersionsRequest) version() int16 {
return 0
}
14 changes: 14 additions & 0 deletions api_versions_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package sarama

import "testing"

var (
apiVersionRequest = []byte{}
)

func TestApiVersionsRequest(t *testing.T) {
var request *ApiVersionsRequest

request = new(ApiVersionsRequest)
testRequest(t, "basic", request, apiVersionRequest)
}
74 changes: 74 additions & 0 deletions api_versions_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package sarama

type ApiVersionsResponseBlock struct {
ApiKey int16
MinVersion int16
MaxVersion int16
}

func (r *ApiVersionsResponseBlock) encode(pe packetEncoder) error {
pe.putInt16(r.ApiKey)
pe.putInt16(r.MinVersion)
pe.putInt16(r.MaxVersion)
return nil
}

func (r *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
var err error

if r.ApiKey, err = pd.getInt16(); err != nil {
return err
}

if r.MinVersion, err = pd.getInt16(); err != nil {
return err
}

if r.MaxVersion, err = pd.getInt16(); err != nil {
return err
}

return nil
}

type ApiVersionsResponse struct {
Err KError
ApiVersions []*ApiVersionsResponseBlock
}

func (r *ApiVersionsResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
if err := pe.putArrayLength(len(r.ApiVersions)); err != nil {
return err
}
for _, apiVersion := range r.ApiVersions {
if err := apiVersion.encode(pe); err != nil {
return err
}
}
return nil
}

func (r *ApiVersionsResponse) decode(pd packetDecoder) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
r.Err = KError(kerr)
}

numBlocks, err := pd.getArrayLength()
if err != nil {
return err
}

r.ApiVersions = make([]*ApiVersionsResponseBlock, numBlocks)
for i := 0; i < numBlocks; i++ {
block := new(ApiVersionsResponseBlock)
if err := block.decode(pd); err != nil {
return err
}
r.ApiVersions[i] = block
}

return nil
}
32 changes: 32 additions & 0 deletions api_versions_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package sarama

import "testing"

var (
apiVersionResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03,
0x00, 0x02,
0x00, 0x01,
}
)

func TestApiVersionsResponse(t *testing.T) {
var response *ApiVersionsResponse

response = new(ApiVersionsResponse)
testDecodable(t, "no error", response, apiVersionResponse)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
if response.ApiVersions[0].ApiKey != 0x03 {
t.Error("Decoding error: expected 0x03 but got", response.ApiVersions[0].ApiKey)
}
if response.ApiVersions[0].MinVersion != 0x02 {
t.Error("Decoding error: expected 0x02 but got", response.ApiVersions[0].MinVersion)
}
if response.ApiVersions[0].MaxVersion != 0x01 {
t.Error("Decoding error: expected 0x01 but got", response.ApiVersions[0].MaxVersion)
}
}
15 changes: 15 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
ErrMessageSizeTooLarge KError = 10
ErrStaleControllerEpochCode KError = 11
ErrOffsetMetadataTooLarge KError = 12
ErrNetworkException KError = 13
ErrOffsetsLoadInProgress KError = 14
ErrConsumerCoordinatorNotAvailable KError = 15
ErrNotCoordinatorForConsumer KError = 16
Expand All @@ -103,6 +104,10 @@ const (
ErrTopicAuthorizationFailed KError = 29
ErrGroupAuthorizationFailed KError = 30
ErrClusterAuthorizationFailed KError = 31
ErrInvalidTimestamp KError = 32
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
)

func (err KError) Error() string {
Expand Down Expand Up @@ -137,6 +142,8 @@ func (err KError) Error() string {
return "kafka server: StaleControllerEpochCode (internal error code for broker-to-broker communication)."
case ErrOffsetMetadataTooLarge:
return "kafka server: Specified a string larger than the configured maximum for offset metadata."
case ErrNetworkException:
return "kafka server: The server disconnected before a response was received."
case ErrOffsetsLoadInProgress:
return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition."
case ErrConsumerCoordinatorNotAvailable:
Expand Down Expand Up @@ -173,6 +180,14 @@ func (err KError) Error() string {
return "kafka server: The client is not authorized to access this group."
case ErrClusterAuthorizationFailed:
return "kafka server: The client is not authorized to send this request type."
case ErrInvalidTimestamp:
return "kafka server: The timestamp of the message is out of acceptable range."
case ErrUnsupportedSASLMechanism:
return "kafka server: The broker does not support the requested SASL mechanism."
case ErrIllegalSASLState:
return "kafka server: Request is not valid given the current SASL state."
case ErrUnsupportedVersion:
return "kafka server: The version of API is not supported."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
4 changes: 4 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func allocateBody(key, version int16) requestBody {
return &DescribeGroupsRequest{}
case 16:
return &ListGroupsRequest{}
case 17:
return &SaslHandshakeRequest{}
case 18:
return &ApiVersionsRequest{}
}
return nil
}
29 changes: 29 additions & 0 deletions sasl_handshake_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sarama

type SaslHandshakeRequest struct {
Mechanism string
}

func (r *SaslHandshakeRequest) encode(pe packetEncoder) error {
if err := pe.putString(r.Mechanism); err != nil {
return err
}

return nil
}

func (r *SaslHandshakeRequest) decode(pd packetDecoder) (err error) {
if r.Mechanism, err = pd.getString(); err != nil {
return err
}

return nil
}

func (r *SaslHandshakeRequest) key() int16 {
return 17
}

func (r *SaslHandshakeRequest) version() int16 {
return 0
}
17 changes: 17 additions & 0 deletions sasl_handshake_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package sarama

import "testing"

var (
baseSaslRequest = []byte{
0, 3, 'f', 'o', 'o', // Mechanism
}
)

func TestSaslHandshakeRequest(t *testing.T) {
var request *SaslHandshakeRequest

request = new(SaslHandshakeRequest)
request.Mechanism = "foo"
testRequest(t, "basic", request, baseSaslRequest)
}
26 changes: 26 additions & 0 deletions sasl_handshake_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package sarama

type SaslHandshakeResponse struct {
Err KError
EnabledMechanisms []string
}

func (r *SaslHandshakeResponse) encode(pe packetEncoder) error {
pe.putInt16(int16(r.Err))
return pe.putStringArray(r.EnabledMechanisms)
}

func (r *SaslHandshakeResponse) decode(pd packetDecoder) error {
if kerr, err := pd.getInt16(); err != nil {
return err
} else {
r.Err = KError(kerr)
}

var err error
if r.EnabledMechanisms, err = pd.getStringArray(); err != nil {
return err
}

return nil
}
24 changes: 24 additions & 0 deletions sasl_handshake_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sarama

import "testing"

var (
saslHandshakeResponse = []byte{
0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'f', 'o', 'o',
}
)

func TestSaslHandshakeResponse(t *testing.T) {
var response *SaslHandshakeResponse

response = new(SaslHandshakeResponse)
testDecodable(t, "no error", response, saslHandshakeResponse)
if response.Err != ErrNoError {
t.Error("Decoding error failed: no error expected but found", response.Err)
}
if response.EnabledMechanisms[0] != "foo" {
t.Error("Decoding error failed: expected 'foo' but found", response.EnabledMechanisms)
}
}

0 comments on commit 50ae3cc

Please sign in to comment.