Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve expectation API to allow for testing of more intricate scenarios #298

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sarama
import (
"io"
"testing"
"time"
)

func safeClose(t *testing.T, c io.Closer) {
Expand Down Expand Up @@ -200,3 +201,54 @@ func TestClientRefreshBehaviour(t *testing.T) {
seedBroker.Close()
safeClose(t, client)
}

func TestSingleSlowBroker(t *testing.T) {
cluster := NewMockCluster(t, 2)
defer cluster.Close()

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(cluster[1].Addr(), cluster[1].BrokerID())
metadataResponse.AddBroker(cluster[2].Addr(), cluster[2].BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, cluster[1].BrokerID(), []int32{cluster[1].BrokerID()}, []int32{cluster[1].BrokerID()}, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, cluster[2].BrokerID(), []int32{cluster[2].BrokerID()}, []int32{cluster[2].BrokerID()}, ErrNoError)

cluster[1].Expects(&BrokerExpectation{Response: metadataResponse, Latency: 500 * time.Millisecond, IgnoreConnectionErrors: true}) // will timeout
cluster[2].Expects(&BrokerExpectation{Response: metadataResponse}) // will succeed

config := NewClientConfig()
config.DefaultBrokerConf = NewBrokerConfig()
config.DefaultBrokerConf.ReadTimeout = 100 * time.Millisecond

client, err := NewClient("clientID", cluster.Addr(), config)
if err != nil {
t.Fatal(err)
}

safeClose(t, client)
}

func TestSlowCluster(t *testing.T) {
cluster := NewMockCluster(t, 3)
defer cluster.Close()

slowMetadataResponse := &BrokerExpectation{
Response: new(MetadataResponse),
Latency: 500 * time.Millisecond,
IgnoreConnectionErrors: true,
}

cluster[1].Expects(slowMetadataResponse)
cluster[2].Expects(slowMetadataResponse)
cluster[3].Expects(slowMetadataResponse)

config := NewClientConfig()
config.MetadataRetries = 3
config.WaitForElection = 1 * time.Millisecond
config.DefaultBrokerConf = NewBrokerConfig()
config.DefaultBrokerConf.ReadTimeout = 100 * time.Millisecond

_, err := NewClient("clientID", cluster.Addr(), config)
if err != ErrOutOfBrokers {
t.Error("Expected the client to fail due to OutOfBrokers, found: ", err)
}
}
99 changes: 77 additions & 22 deletions mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,24 @@ type MockBroker struct {
brokerID int32
port int32
stopper chan bool
expectations chan encoder
expectations chan *BrokerExpectation
listener net.Listener
t TestState
latency time.Duration
}

func (b *MockBroker) SetLatency(latency time.Duration) {
b.latency = latency
type MockCluster map[int32]*MockBroker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just a slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make sure the index matches the BrokerID. Broker IDs are 1-indexed, while slice indices start at 0.


type callback func()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can now be set separately for every expectation/response.


// BrokerExpectation allos you to specify how the respond to a request the MockBroker will receive.
// See MockBroker's Expects method to add expectation to a mockbroker.
type BrokerExpectation struct {
Before callback // Before will be called before sending the response after a request has been received
Latency time.Duration // Latency before the response will be sent
Response encoder // Response holds what will be sent back to the client
After callback // After will be called after the response has been sent to the client.

IgnoreConnectionErrors bool // IgnoreConnectionErrors should be set to true if connectivity issues while receiving the request or sending the response are to be expected.
}

func (b *MockBroker) BrokerID() int32 {
Expand All @@ -55,7 +65,7 @@ func (b *MockBroker) Addr() string {

func (b *MockBroker) Close() {
if len(b.expectations) > 0 {
b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
b.t.Errorf("Not all expectations were satisfied in mock broker with ID=%d! Still waiting on %d requests.", b.BrokerID(), len(b.expectations))
}
close(b.expectations)
<-b.stopper
Expand All @@ -69,28 +79,32 @@ func (b *MockBroker) serverLoop() (ok bool) {

defer close(b.stopper)
if conn, err = b.listener.Accept(); err != nil {
return b.serverError(err, conn)
return b.serverError(err, conn, false)
}
reqHeader := make([]byte, 4)
resHeader := make([]byte, 8)
for expectation := range b.expectations {
_, err = io.ReadFull(conn, reqHeader)
if err != nil {
return b.serverError(err, conn)
return b.serverError(err, conn, expectation.IgnoreConnectionErrors)
}
body := make([]byte, binary.BigEndian.Uint32(reqHeader))
if len(body) < 10 {
return b.serverError(errors.New("Kafka request too short."), conn)
return b.serverError(errors.New("Kafka request too short."), conn, false)
}
if _, err = io.ReadFull(conn, body); err != nil {
return b.serverError(err, conn)
return b.serverError(err, conn, expectation.IgnoreConnectionErrors)
}

if b.latency > 0 {
time.Sleep(b.latency)
if expectation.Before != nil {
expectation.Before()
}

response, err := encode(expectation)
if expectation.Latency > 0 {
time.Sleep(expectation.Latency)
}

response, err := encode(expectation.Response)
if err != nil {
return false
}
Expand All @@ -101,14 +115,18 @@ func (b *MockBroker) serverLoop() (ok bool) {
binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
if _, err = conn.Write(resHeader); err != nil {
return b.serverError(err, conn)
return b.serverError(err, conn, expectation.IgnoreConnectionErrors)
}
if _, err = conn.Write(response); err != nil {
return b.serverError(err, conn)
return b.serverError(err, conn, expectation.IgnoreConnectionErrors)
}

if expectation.After != nil {
expectation.After()
}
}
if err = conn.Close(); err != nil {
return b.serverError(err, nil)
return b.serverError(err, nil, false)
}
if err = b.listener.Close(); err != nil {
b.t.Error(err)
Expand All @@ -117,19 +135,52 @@ func (b *MockBroker) serverLoop() (ok bool) {
return true
}

func (b *MockBroker) serverError(err error, conn net.Conn) bool {
b.t.Error(err)
func (b *MockBroker) serverError(err error, conn net.Conn, ignoreErrors bool) bool {
if !ignoreErrors {
b.t.Error(err)
}
if conn != nil {
if err := conn.Close(); err != nil {
b.t.Error(err)
if !ignoreErrors {
b.t.Error(err)
}
}
}
if err := b.listener.Close(); err != nil {
b.t.Error(err)
if !ignoreErrors {
b.t.Error(err)
}
}
return false
}

// NewMockBroker launces a fake Kafka cluster consisting of a specified number
// of brokers.
func NewMockCluster(t TestState, brokers int32) MockCluster {
cluster := make(MockCluster)
for i := int32(1); i <= brokers; i++ {
cluster[i] = NewMockBroker(t, i)
}
return cluster
}

// Returns a list of broker addresses, that can be used to initialize a sarama.Client.
func (mc MockCluster) Addr() []string {
addrs := make([]string, len(mc))
for _, broker := range mc {
addrs = append(addrs, broker.Addr())
}
return addrs
}

// Close closes all the MockBrockers in this cluster, which will validate whether all
// the expectation that were set on the mock brokers are correctly resolved.
func (mc MockCluster) Close() {
for _, broker := range mc {
broker.Close()
}
}

// NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
// test framework and a channel of responses to use. If an error occurs it is
// simply logged to the TestState and the broker exits.
Expand All @@ -146,7 +197,7 @@ func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
stopper: make(chan bool),
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
expectations: make(chan *BrokerExpectation, 512),
}

broker.listener, err = net.Listen("tcp", addr)
Expand All @@ -168,6 +219,10 @@ func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
return broker
}

func (b *MockBroker) Returns(e encoder) {
b.expectations <- e
func (b *MockBroker) Returns(response encoder) {
b.expectations <- &BrokerExpectation{Response: response}
}

func (b *MockBroker) Expects(expectation *BrokerExpectation) {
b.expectations <- expectation
}
Loading