Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fill in the Fetch{Request,Response} protocol #1582

Merged
merged 1 commit into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 124 additions & 14 deletions fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,41 @@
package sarama

type fetchRequestBlock struct {
fetchOffset int64
maxBytes int32
Version int16
currentLeaderEpoch int32
fetchOffset int64
logStartOffset int64
maxBytes int32
}

func (b *fetchRequestBlock) encode(pe packetEncoder) error {
func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
b.Version = version
if b.Version >= 9 {
pe.putInt32(b.currentLeaderEpoch)
}
pe.putInt64(b.fetchOffset)
if b.Version >= 5 {
pe.putInt64(b.logStartOffset)
}
pe.putInt32(b.maxBytes)
return nil
}

func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
b.Version = version
if b.Version >= 9 {
if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
return err
}
}
if b.fetchOffset, err = pd.getInt64(); err != nil {
return err
}
if b.Version >= 5 {
if b.logStartOffset, err = pd.getInt64(); err != nil {
return err
}
}
if b.maxBytes, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -25,12 +46,15 @@ func (b *fetchRequestBlock) decode(pd packetDecoder) (err error) {
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
blocks map[string]map[int32]*fetchRequestBlock
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
SessionID int32
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
}

type IsolationLevel int8
Expand All @@ -50,6 +74,10 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
if r.Version >= 4 {
pe.putInt8(int8(r.Isolation))
}
if r.Version >= 7 {
pe.putInt32(r.SessionID)
pe.putInt32(r.SessionEpoch)
}
err = pe.putArrayLength(len(r.blocks))
if err != nil {
return err
Expand All @@ -65,17 +93,38 @@ func (r *FetchRequest) encode(pe packetEncoder) (err error) {
}
for partition, block := range blocks {
pe.putInt32(partition)
err = block.encode(pe)
err = block.encode(pe, r.Version)
if err != nil {
return err
}
}
}
if r.Version >= 7 {
err = pe.putArrayLength(len(r.forgotten))
if err != nil {
return err
}
for topic, partitions := range r.forgotten {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putArrayLength(len(partitions))
if err != nil {
return err
}
for _, partition := range partitions {
pe.putInt32(partition)
}
}
}

return nil
}

func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version

if _, err = pd.getInt32(); err != nil {
return err
}
Expand All @@ -97,6 +146,16 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
}
r.Isolation = IsolationLevel(isolation)
}
if r.Version >= 7 {
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
r.SessionEpoch, err = pd.getInt32()
if err != nil {
return err
}
}
topicCount, err := pd.getArrayLength()
if err != nil {
return err
Expand All @@ -121,12 +180,43 @@ func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
return err
}
fetchBlock := &fetchRequestBlock{}
if err = fetchBlock.decode(pd); err != nil {
if err = fetchBlock.decode(pd, r.Version); err != nil {
return err
}
r.blocks[topic][partition] = fetchBlock
}
}

if r.Version >= 7 {
forgottenCount, err := pd.getArrayLength()
if err != nil {
return err
}
if forgottenCount == 0 {
return nil
}
r.forgotten = make(map[string][]int32)
for i := 0; i < forgottenCount; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()
if err != nil {
return err
}
r.forgotten[topic] = make([]int32, partitionCount)

for j := 0; j < partitionCount; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
r.forgotten[topic][j] = partition
}
}
}

return nil
}

Expand All @@ -140,16 +230,28 @@ func (r *FetchRequest) version() int16 {

func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MinVersion
return MaxVersion
}
}

Expand All @@ -158,13 +260,21 @@ func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int
r.blocks = make(map[string]map[int32]*fetchRequestBlock)
}

if r.Version >= 7 && r.forgotten == nil {
r.forgotten = make(map[string][]int32)
}

if r.blocks[topic] == nil {
r.blocks[topic] = make(map[int32]*fetchRequestBlock)
}

tmp := new(fetchRequestBlock)
tmp.Version = r.Version
tmp.maxBytes = maxBytes
tmp.fetchOffset = fetchOffset
if r.Version >= 9 {
tmp.currentLeaderEpoch = int32(-1)
}

r.blocks[topic][partitionID] = tmp
}
44 changes: 28 additions & 16 deletions fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,32 @@ var (
)

func TestFetchRequest(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)

request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)

request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)

request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
t.Run("no blocks", func(t *testing.T) {
request := new(FetchRequest)
testRequest(t, "no blocks", request, fetchRequestNoBlocks)
})

t.Run("with properties", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0x20
request.MinBytes = 0xEF
testRequest(t, "with properties", request, fetchRequestWithProperties)
})

t.Run("one block", func(t *testing.T) {
request := new(FetchRequest)
request.MaxWaitTime = 0
request.MinBytes = 0
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block", request, fetchRequestOneBlock)
})

t.Run("one block v4", func(t *testing.T) {
request := new(FetchRequest)
request.Version = 4
request.MaxBytes = 0xFF
request.Isolation = ReadCommitted
request.AddBlock("topic", 0x12, 0x34, 0x56)
testRequest(t, "one block v4", request, fetchRequestOneBlockV4)
})
}
48 changes: 45 additions & 3 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
LastStableOffset int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Expand All @@ -57,6 +58,13 @@ func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error)
return err
}

if version >= 5 {
b.LogStartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

numTransact, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -166,6 +174,10 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
if version >= 4 {
pe.putInt64(b.LastStableOffset)

if version >= 5 {
pe.putInt64(b.LogStartOffset)
}

if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
Expand Down Expand Up @@ -200,7 +212,9 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
ErrorCode int16
SessionID int32
Version int16
LogAppendTime bool
Timestamp time.Time
}
Expand All @@ -216,6 +230,17 @@ func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}

if r.Version >= 7 {
r.ErrorCode, err = pd.getInt16()
if err != nil {
return err
}
r.SessionID, err = pd.getInt32()
if err != nil {
return err
}
}

numTopics, err := pd.getArrayLength()
if err != nil {
return err
Expand Down Expand Up @@ -258,6 +283,11 @@ func (r *FetchResponse) encode(pe packetEncoder) (err error) {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}

if r.Version >= 7 {
pe.putInt16(r.ErrorCode)
pe.putInt32(r.SessionID)
}

err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
Expand Down Expand Up @@ -296,16 +326,28 @@ func (r *FetchResponse) version() int16 {

func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4:
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
default:
return MinVersion
return MaxVersion
}
}

Expand Down
2 changes: 1 addition & 1 deletion request.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func allocateBody(key, version int16) protocolBody {
case 0:
return &ProduceRequest{}
case 1:
return &FetchRequest{}
return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
Expand Down