Skip to content

Commit

Permalink
Optional replica id in offset request
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed May 5, 2018
1 parent 1c45a00 commit 8563f1c
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions offset_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
}

type OffsetRequest struct {
Version int16
blocks map[string]map[int32]*offsetRequestBlock
Version int16
replicaID *int32
storeReplicaID int32
blocks map[string]map[int32]*offsetRequestBlock
}

func (r *OffsetRequest) encode(pe packetEncoder) error {
pe.putInt32(-1) // replica ID is always -1 for clients
if r.replicaID == nil {
// default replica ID is always -1 for clients
pe.putInt32(-1)
} else {
pe.putInt32(*r.replicaID)
}

err := pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand Down Expand Up @@ -113,6 +121,11 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
}
}

func (r *OffsetRequest) SetReplicaID(id int32) {
r.storeReplicaID = id
r.replicaID = &r.storeReplicaID
}

func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
if r.blocks == nil {
r.blocks = make(map[string]map[int32]*offsetRequestBlock)
Expand Down

0 comments on commit 8563f1c

Please sign in to comment.