Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ListOffsetRequest v1 [KIP-79] #775

Merged
merged 1 commit into from
Nov 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ env:
- KAFKA_VERSION=0.8.2.2
- KAFKA_VERSION=0.9.0.1
- KAFKA_VERSION=0.10.0.1
- KAFKA_VERSION=0.10.1.0

before_install:
- export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
Expand Down
3 changes: 3 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ func (client *client) getOffset(topic string, partitionID int32, time int64) (in
}

request := &OffsetRequest{}
if client.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 1
}
request.AddBlock(topic, partitionID, time, 1)

response, err := broker.GetAvailableOffsets(request)
Expand Down
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ const (
ErrUnsupportedSASLMechanism KError = 33
ErrIllegalSASLState KError = 34
ErrUnsupportedVersion KError = 35
ErrUnsupportedForMessageFormat KError = 43
)

func (err KError) Error() string {
Expand Down Expand Up @@ -188,6 +189,8 @@ func (err KError) Error() string {
return "kafka server: Request is not valid given the current SASL state."
case ErrUnsupportedVersion:
return "kafka server: The version of API is not supported."
case ErrUnsupportedForMessageFormat:
return "kafka server: The requested operation is not supported by the message format version."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
39 changes: 27 additions & 12 deletions offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,33 @@ package sarama

type offsetRequestBlock struct {
time int64
maxOffsets int32
maxOffsets int32 // Only used in version 0
}

func (b *offsetRequestBlock) encode(pe packetEncoder) error {
func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
pe.putInt64(int64(b.time))
pe.putInt32(b.maxOffsets)
if version == 0 {
pe.putInt32(b.maxOffsets)
}

return nil
}

func (b *offsetRequestBlock) decode(pd packetDecoder) (err error) {
func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
if b.time, err = pd.getInt64(); err != nil {
return err
}
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
if version == 0 {
if b.maxOffsets, err = pd.getInt32(); err != nil {
return err
}
}
return nil
}

type OffsetRequest struct {
blocks map[string]map[int32]*offsetRequestBlock
Version int16
blocks map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
Expand All @@ -42,7 +48,7 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err = block.encode(pe); err != nil {
if err = block.encode(pe, r.Version); err != nil {
return err
}
}
Expand All @@ -51,6 +57,8 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
}

func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
r.Version = version

// Ignore replica ID
if _, err := pd.getInt32(); err != nil {
return err
Expand Down Expand Up @@ -79,7 +87,7 @@ func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
return err
}
block := &offsetRequestBlock{}
if err := block.decode(pd); err != nil {
if err := block.decode(pd, version); err != nil {
return err
}
r.blocks[topic][partition] = block
Expand All @@ -93,11 +101,16 @@ func (r *OffsetRequest) key() int16 {
}

func (r *OffsetRequest) version() int16 {
return 0
return r.Version
}

func (r *OffsetRequest) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_10_1_0
default:
return minVersion
}
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
Expand All @@ -111,7 +124,9 @@ func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, ma

tmp := new(offsetRequestBlock)
tmp.time = time
tmp.maxOffsets = maxOffsets
if r.Version == 0 {
tmp.maxOffsets = maxOffsets
}

r.blocks[topic][partitionID] = tmp
}
17 changes: 17 additions & 0 deletions offset_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ var (
0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02}

offsetRequestOneBlockV1 = []byte{
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x01,
0x00, 0x03, 'b', 'a', 'r',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
)

func TestOffsetRequest(t *testing.T) {
Expand All @@ -24,3 +32,12 @@ func TestOffsetRequest(t *testing.T) {
request.AddBlock("foo", 4, 1, 2)
testRequest(t, "one block", request, offsetRequestOneBlock)
}

func TestOffsetRequestV1(t *testing.T) {
request := new(OffsetRequest)
request.Version = 1
testRequest(t, "no blocks", request, offsetRequestNoBlocks)

request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
testRequest(t, "one block", request, offsetRequestOneBlockV1)
}
58 changes: 45 additions & 13 deletions offset_response.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
package sarama

type OffsetResponseBlock struct {
Err KError
Offsets []int64
Err KError
Offsets []int64 // Version 0
Offset int64 // Version 1
Timestamp int64 // Version 1
}

func (b *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
tmp, err := pd.getInt16()
if err != nil {
return err
}
b.Err = KError(tmp)

b.Offsets, err = pd.getInt64Array()
if version == 0 {
b.Offsets, err = pd.getInt64Array()

return err
return err
}

b.Timestamp, err = pd.getInt64()
if err != nil {
return err
}

b.Offset, err = pd.getInt64()
if err != nil {
return err
}

// For backwards compatibility put the offset in the offsets array too
b.Offsets = []int64{b.Offset}

return nil
}

func (b *OffsetResponseBlock) encode(pe packetEncoder) (err error) {
func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
pe.putInt16(int16(b.Err))

return pe.putInt64Array(b.Offsets)
if version == 0 {
return pe.putInt64Array(b.Offsets)
}

pe.putInt64(b.Timestamp)
pe.putInt64(b.Offset)

return nil
}

type OffsetResponse struct {
Blocks map[string]map[int32]*OffsetResponseBlock
Version int16
Blocks map[string]map[int32]*OffsetResponseBlock
}

func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -54,7 +81,7 @@ func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
}

block := new(OffsetResponseBlock)
err = block.decode(pd)
err = block.decode(pd, version)
if err != nil {
return err
}
Expand Down Expand Up @@ -106,7 +133,7 @@ func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
}
for partition, block := range partitions {
pe.putInt32(partition)
if err = block.encode(pe); err != nil {
if err = block.encode(pe, r.version()); err != nil {
return err
}
}
Expand All @@ -120,11 +147,16 @@ func (r *OffsetResponse) key() int16 {
}

func (r *OffsetResponse) version() int16 {
return 0
return r.Version
}

func (r *OffsetResponse) requiredVersion() KafkaVersion {
return minVersion
switch r.Version {
case 1:
return V0_10_1_0
default:
return minVersion
}
}

// testing API
Expand All @@ -138,5 +170,5 @@ func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset
byTopic = make(map[int32]*OffsetResponseBlock)
r.Blocks[topic] = byTopic
}
byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}}
byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
}
49 changes: 49 additions & 0 deletions offset_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ var (
0x00, 0x00, 0x00, 0x02,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}

normalOffsetResponseV1 = []byte{
0x00, 0x00, 0x00, 0x02,

0x00, 0x01, 'a',
0x00, 0x00, 0x00, 0x00,

0x00, 0x01, 'z',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x02,
0x00, 0x00,
0x00, 0x00, 0x01, 0x58, 0x1A, 0xE6, 0x48, 0x86,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06}
)

func TestEmptyOffsetResponse(t *testing.T) {
Expand All @@ -28,6 +41,13 @@ func TestEmptyOffsetResponse(t *testing.T) {
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
}

response = OffsetResponse{}

testVersionDecodable(t, "empty", &response, emptyOffsetResponse, 1)
if len(response.Blocks) != 0 {
t.Error("Decoding produced", len(response.Blocks), "topics where there were none.")
}
}

func TestNormalOffsetResponse(t *testing.T) {
Expand Down Expand Up @@ -58,5 +78,34 @@ func TestNormalOffsetResponse(t *testing.T) {
if response.Blocks["z"][2].Offsets[0] != 5 || response.Blocks["z"][2].Offsets[1] != 6 {
t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
}
}

func TestNormalOffsetResponseV1(t *testing.T) {
response := OffsetResponse{}

testVersionDecodable(t, "normal", &response, normalOffsetResponseV1, 1)

if len(response.Blocks) != 2 {
t.Fatal("Decoding produced", len(response.Blocks), "topics where there were two.")
}

if len(response.Blocks["a"]) != 0 {
t.Fatal("Decoding produced", len(response.Blocks["a"]), "partitions for topic 'a' where there were none.")
}

if len(response.Blocks["z"]) != 1 {
t.Fatal("Decoding produced", len(response.Blocks["z"]), "partitions for topic 'z' where there was one.")
}

if response.Blocks["z"][2].Err != ErrNoError {
t.Fatal("Decoding produced invalid error for topic z partition 2.")
}

if response.Blocks["z"][2].Timestamp != 1477920049286 {
t.Fatal("Decoding produced invalid timestamp for topic z partition 2.", response.Blocks["z"][2].Timestamp)
}

if response.Blocks["z"][2].Offset != 6 {
t.Fatal("Decoding produced invalid offsets for topic z partition 2.")
}
}
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func allocateBody(key, version int16) protocolBody {
case 1:
return &FetchRequest{}
case 2:
return &OffsetRequest{}
return &OffsetRequest{Version: version}
case 3:
return &MetadataRequest{}
case 8:
Expand Down
1 change: 1 addition & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ var (
V0_9_0_1 = newKafkaVersion(0, 9, 0, 1)
V0_10_0_0 = newKafkaVersion(0, 10, 0, 0)
V0_10_0_1 = newKafkaVersion(0, 10, 0, 1)
V0_10_1_0 = newKafkaVersion(0, 10, 1, 0)
minVersion = V0_8_2_0
)