Skip to content

Commit

Permalink
Merge pull request #1100 from urso/offset-req-replica-id
Browse files Browse the repository at this point in the history
Optional replica id in offset request
  • Loading branch information
bai authored Dec 10, 2018
2 parents 879f631 + 374f0e3 commit 49e0aa4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 5 deletions.
34 changes: 29 additions & 5 deletions offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
}

type OffsetRequest struct {
Version int16
blocks map[string]map[int32]*offsetRequestBlock
Version int16
replicaID int32
isReplicaIDSet bool
blocks map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
pe.putInt32(-1) // replica ID is always -1 for clients
if r.isReplicaIDSet {
pe.putInt32(r.replicaID)
} else {
// default replica ID is always -1 for clients
pe.putInt32(-1)
}

err := pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand All @@ -59,10 +67,14 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
r.Version = version

// Ignore replica ID
if _, err := pd.getInt32(); err != nil {
replicaID, err := pd.getInt32()
if err != nil {
return err
}
if replicaID >= 0 {
r.SetReplicaID(replicaID)
}

blockCount, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -113,6 +125,18 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
}
}

func (r *OffsetRequest) SetReplicaID(id int32) {
r.replicaID = id
r.isReplicaIDSet = true
}

func (r *OffsetRequest) ReplicaID() int32 {
if r.isReplicaIDSet {
return r.replicaID
}
return -1
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetRequestBlock)
Expand Down
16 changes: 16 additions & 0 deletions offset_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ var (
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x04,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}

offsetRequestReplicaID = []byte{
0x00, 0x00, 0x00, 0x2a,
0x00, 0x00, 0x00, 0x00}
)

func TestOffsetRequest(t *testing.T) {
Expand All @@ -41,3 +45,15 @@ func TestOffsetRequestV1(t *testing.T) {
request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
testRequest(t, "one block", request, offsetRequestOneBlockV1)
}

func TestOffsetRequestReplicaID(t *testing.T) {
request := new(OffsetRequest)
replicaID := int32(42)
request.SetReplicaID(replicaID)

if found := request.ReplicaID(); found != replicaID {
t.Errorf("replicaID: expected %v, found %v", replicaID, found)
}

testRequest(t, "with replica ID", request, offsetRequestReplicaID)
}

0 comments on commit 49e0aa4

Please sign in to comment.