Skip to content

Commit

Permalink
add transaction-related request/responses (22, 24, 25, 26, 28)
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin committed Jan 25, 2018
1 parent 0f4f8ca commit d813197
Show file tree
Hide file tree
Showing 22 changed files with 1,028 additions and 0 deletions.
52 changes: 52 additions & 0 deletions add_offsets_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package sarama

type AddOffsetsToTxnRequest struct {
TransactionalID string
ProducerID int64
ProducerEpoch int16
GroupID string
}

func (a *AddOffsetsToTxnRequest) encode(pe packetEncoder) error {
if err := pe.putString(a.TransactionalID); err != nil {
return err
}

pe.putInt64(a.ProducerID)

pe.putInt16(a.ProducerEpoch)

if err := pe.putString(a.GroupID); err != nil {
return err
}

return nil
}

func (a *AddOffsetsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
if a.TransactionalID, err = pd.getString(); err != nil {
return err
}
if a.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if a.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}
if a.GroupID, err = pd.getString(); err != nil {
return err
}
return nil
}

func (a *AddOffsetsToTxnRequest) key() int16 {
return 25
}

func (a *AddOffsetsToTxnRequest) version() int16 {
return 0
}

func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
23 changes: 23 additions & 0 deletions add_offsets_to_txn_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package sarama

import "testing"

var (
addOffsetsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64,
0, 0,
0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
}
)

func TestAddOffsetsToTxnRequest(t *testing.T) {
req := &AddOffsetsToTxnRequest{
TransactionalID: "txn",
ProducerID: 8000,
ProducerEpoch: 0,
GroupID: "groupid",
}

testRequest(t, "", req, addOffsetsToTxnRequest)
}
44 changes: 44 additions & 0 deletions add_offsets_to_txn_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sarama

import (
"time"
)

type AddOffsetsToTxnResponse struct {
ThrottleTime time.Duration
Err KError
}

func (a *AddOffsetsToTxnResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
pe.putInt16(int16(a.Err))
return nil
}

func (a *AddOffsetsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

kerr, err := pd.getInt16()
if err != nil {
return err
}
a.Err = KError(kerr)

return nil
}

func (a *AddOffsetsToTxnResponse) key() int16 {
return 25
}

func (a *AddOffsetsToTxnResponse) version() int16 {
return 0
}

func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}
22 changes: 22 additions & 0 deletions add_offsets_to_txn_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sarama

import (
"testing"
"time"
)

var (
addOffsetsToTxnResponse = []byte{
0, 0, 0, 100,
0, 47,
}
)

func TestAddOffsetsToTxnResponse(t *testing.T) {
resp := &AddOffsetsToTxnResponse{
ThrottleTime: 100 * time.Millisecond,
Err: ErrInvalidProducerEpoch,
}

testResponse(t, "", resp, addOffsetsToTxnResponse)
}
76 changes: 76 additions & 0 deletions add_partitions_to_txn_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package sarama

type AddPartitionsToTxnRequest struct {
TransactionalID string
ProducerID int64
ProducerEpoch int16
TopicPartitions map[string][]int32
}

func (a *AddPartitionsToTxnRequest) encode(pe packetEncoder) error {
if err := pe.putString(a.TransactionalID); err != nil {
return err
}
pe.putInt64(a.ProducerID)
pe.putInt16(a.ProducerEpoch)

if err := pe.putArrayLength(len(a.TopicPartitions)); err != nil {
return err
}
for topic, partitions := range a.TopicPartitions {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putInt32Array(partitions); err != nil {
return err
}
}

return nil
}

func (a *AddPartitionsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
if a.TransactionalID, err = pd.getString(); err != nil {
return err
}
if a.ProducerID, err = pd.getInt64(); err != nil {
return err
}
if a.ProducerEpoch, err = pd.getInt16(); err != nil {
return err
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

a.TopicPartitions = make(map[string][]int32)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}

partitions, err := pd.getInt32Array()
if err != nil {
return err
}

a.TopicPartitions[topic] = partitions
}

return nil
}

func (a *AddPartitionsToTxnRequest) key() int16 {
return 24
}

func (a *AddPartitionsToTxnRequest) version() int16 {
return 0
}

func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}
27 changes: 27 additions & 0 deletions add_partitions_to_txn_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package sarama

import "testing"

var (
addPartitionsToTxnRequest = []byte{
0, 3, 't', 'x', 'n',
0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
0, 0, 0, 0, // ProducerEpoch
0, 1, // 1 topic
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, 0, 0, 0, 1,
}
)

func TestAddPartitionsToTxnRequest(t *testing.T) {
req := &AddPartitionsToTxnRequest{
TransactionalID: "txn",
ProducerID: 8000,
ProducerEpoch: 0,
TopicPartitions: map[string][]int32{
"topic": []int32{1},
},
}

testRequest(t, "", req, addPartitionsToTxnRequest)
}
108 changes: 108 additions & 0 deletions add_partitions_to_txn_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package sarama

import (
"time"
)

type AddPartitionsToTxnResponse struct {
ThrottleTime time.Duration
Errors map[string][]*PartitionError
}

func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
if err := pe.putArrayLength(len(a.Errors)); err != nil {
return err
}

for topic, e := range a.Errors {
if err := pe.putString(topic); err != nil {
return err
}
if err := pe.putArrayLength(len(e)); err != nil {
return err
}
for _, partitionError := range e {
if err := partitionError.encode(pe); err != nil {
return err
}
}
}

return nil
}

func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}

a.Errors = make(map[string][]*PartitionError)

for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}

m, err := pd.getArrayLength()
if err != nil {
return err
}

a.Errors[topic] = make([]*PartitionError, m)

for j := 0; j < m; j++ {
a.Errors[topic][j] = new(PartitionError)
if err := a.Errors[topic][j].decode(pd, version); err != nil {
return err
}
}
}

return nil
}

func (a *AddPartitionsToTxnResponse) key() int16 {
return 24
}

func (a *AddPartitionsToTxnResponse) version() int16 {
return 0
}

func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

type PartitionError struct {
Partition int32
Err KError
}

func (p *PartitionError) encode(pe packetEncoder) error {
pe.putInt32(p.Partition)
pe.putInt16(int16(p.Err))
return nil
}

func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
if p.Partition, err = pd.getInt32(); err != nil {
return err
}

kerr, err := pd.getInt16()
if err != nil {
return err
}
p.Err = KError(kerr)

return nil
}
31 changes: 31 additions & 0 deletions add_partitions_to_txn_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package sarama

import (
"testing"
"time"
)

var (
addPartitionsToTxnResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 1, // 1 partition error
0, 0, 0, 2, // partition 2
0, 48, // error
}
)

func TestAddPartitionsToTxnResponse(t *testing.T) {
resp := &AddPartitionsToTxnResponse{
ThrottleTime: 100 * time.Millisecond,
Errors: map[string][]*PartitionError{
"topic": []*PartitionError{&PartitionError{
Err: ErrInvalidTxnState,
Partition: 2,
}},
},
}

testResponse(t, "", resp, addPartitionsToTxnResponse)
}
Loading

0 comments on commit d813197

Please sign in to comment.