Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mockbroker #51

Merged
merged 3 commits into from
Nov 26, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
*.o
*.a
*.so
*.test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compiled test files. Go creates them sometimes for reasons I'm not especially clear on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've never seen them. You can pass a flag to go test to make it leave them behind, but in normal case it does everything in /tmp so I'm not sure why these would show up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too sure. Anyway, they're sometimes created on some machines, so no reason not to gitignore them.


# Folders
_obj
Expand Down
147 changes: 5 additions & 142 deletions broker_test.go
Original file line number Diff line number Diff line change
@@ -1,147 +1,11 @@
package sarama

import (
"encoding/binary"
"fmt"
"io"
"net"
"strconv"
"github.com/Shopify/sarama/mockbroker"
"testing"
"time"
)

// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
// accepts a single connection. It reads Kafka requests from that connection and returns each response
// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
// the server sleeps for 250ms instead of reading a request).
//
// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
// waiting for a response, the test panics.
//
// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
// automatically as a convenience.
type MockBroker struct {
port int32
stopper chan bool
responses chan []byte
listener net.Listener
t *testing.T
}

func (b *MockBroker) Port() int32 {
return b.port
}

func (b *MockBroker) Addr() string {
return b.listener.Addr().String()
}

// Close closes the response channel originally provided, then waits to make sure
// that all requests/responses matched up before exiting.
func (b *MockBroker) Close() {
close(b.responses)
<-b.stopper
}

func (b *MockBroker) serverLoop() {
defer close(b.stopper)
conn, err := b.listener.Accept()
if err != nil {
b.t.Error(err)
conn.Close()
b.listener.Close()
return
}
reqHeader := make([]byte, 4)
resHeader := make([]byte, 8)
for response := range b.responses {
if response == nil {
time.Sleep(250 * time.Millisecond)
continue
}
_, err := io.ReadFull(conn, reqHeader)
if err != nil {
b.t.Error(err)
conn.Close()
b.listener.Close()
return
}
body := make([]byte, binary.BigEndian.Uint32(reqHeader))
if len(body) < 10 {
b.t.Error("Kafka request too short.")
conn.Close()
b.listener.Close()
return
}
_, err = io.ReadFull(conn, body)
if err != nil {
b.t.Error(err)
conn.Close()
b.listener.Close()
return
}
if len(response) == 0 {
continue
}
binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
_, err = conn.Write(resHeader)
if err != nil {
b.t.Error(err)
conn.Close()
b.listener.Close()
return
}
_, err = conn.Write(response)
if err != nil {
b.t.Error(err)
conn.Close()
b.listener.Close()
return
}
}
err = conn.Close()
if err != nil {
b.t.Error(err)
b.listener.Close()
return
}
err = b.listener.Close()
if err != nil {
b.t.Error(err)
return
}
}

// NewMockBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
// If an error occurs it is simply logged to the testing.T and the broker exits.
func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
var err error

broker := new(MockBroker)
broker.stopper = make(chan bool)
broker.responses = responses
broker.t = t

broker.listener, err = net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
}
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
}
tmp, err := strconv.ParseInt(portStr, 10, 32)
if err != nil {
t.Fatal(err)
}
broker.port = int32(tmp)

go broker.serverLoop()

return broker
}

func ExampleBroker() error {
broker := NewBroker("localhost:9092")
err := broker.Open(4)
Expand Down Expand Up @@ -179,19 +43,18 @@ func TestBrokerAccessors(t *testing.T) {
}

func TestSimpleBrokerCommunication(t *testing.T) {
responses := make(chan []byte)
mockBroker := NewMockBroker(t, responses)
defer mockBroker.Close()
mb := mockbroker.New(t, 0)
defer mb.Close()

broker := NewBroker(mockBroker.Addr())
broker := NewBroker(mb.Addr())
err := broker.Open(4)
if err != nil {
t.Fatal(err)
}

go func() {
for _, tt := range brokerTestTable {
responses <- tt.response
mb.ExpectBytes(tt.response)
}
}()
for _, tt := range brokerTestTable {
Expand Down
135 changes: 42 additions & 93 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,80 +1,57 @@
package sarama

import (
"encoding/binary"
"github.com/Shopify/sarama/mockbroker"
"testing"
)

func TestSimpleClient(t *testing.T) {
responses := make(chan []byte, 1)
mockBroker := NewMockBroker(t, responses)
defer mockBroker.Close()

// Only one response needed, an empty metadata response
responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
mb := mockbroker.New(t, 1)

client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)
mb.ExpectMetadataRequest()

client, err := NewClient("client_id", []string{mb.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
client.Close()
defer client.Close()
defer mb.Close()
}

func TestClientExtraBrokers(t *testing.T) {
responses := make(chan []byte, 1)
mockBroker := NewMockBroker(t, responses)
mockExtra := NewMockBroker(t, make(chan []byte))
defer mockBroker.Close()
defer mockExtra.Close()

// return the extra mock as another available broker
response := []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x01,
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
responses <- response

client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)

mb1 := mockbroker.New(t, 1)
mb2 := mockbroker.New(t, 2)

mb1.ExpectMetadataRequest().
AddBroker(mb2)

client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
client.Close()
defer client.Close()
defer mb1.Close()
defer mb2.Close()
}

func TestClientMetadata(t *testing.T) {
responses := make(chan []byte, 1)
mockBroker := NewMockBroker(t, responses)
mockExtra := NewMockBroker(t, make(chan []byte))
defer mockBroker.Close()
defer mockExtra.Close()

// return the extra mock as another available broker
response := []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0x05,
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
0x00, 0x00, 0x00, 0x00,

0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x05,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
responses <- response

client, err := NewClient("client_id", []string{mockBroker.Addr()}, nil)

mb1 := mockbroker.New(t, 1)
mb5 := mockbroker.New(t, 5)

mb1.ExpectMetadataRequest().
AddBroker(mb5).
AddTopicPartition("my_topic", 0, mb5.BrokerID())

client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb1.Close()
defer mb5.Close()

topics, err := client.Topics()
if err != nil {
Expand All @@ -99,50 +76,22 @@ func TestClientMetadata(t *testing.T) {
}

func TestClientRefreshBehaviour(t *testing.T) {
responses := make(chan []byte, 1)
extraResponses := make(chan []byte, 2)
mockBroker := NewMockBroker(t, responses)
mockExtra := NewMockBroker(t, extraResponses)
defer mockBroker.Close()
defer mockExtra.Close()

// return the extra mock as another available broker
response := []byte{
0x00, 0x00, 0x00, 0x01,
0x00, 0x00, 0x00, 0xaa,
0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
responses <- response
extraResponses <- []byte{
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x05,
0x00, 0x00, 0x00, 0x0e,
0xFF, 0xFF, 0xFF, 0xFF,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}
extraResponses <- []byte{
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x0b,
0x00, 0x00, 0x00, 0xaa,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00}

client, err := NewClient("clientID", []string{mockBroker.Addr()}, &ClientConfig{MetadataRetries: 1})
mb1 := mockbroker.New(t, 1)
mb5 := mockbroker.New(t, 5)

mb1.ExpectMetadataRequest().
AddBroker(mb5)

mb5.ExpectMetadataRequest().
AddTopicPartition("my_topic", 0xb, 5)

client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1})
if err != nil {
t.Fatal(err)
}
defer client.Close()
defer mb1.Close()
defer mb5.Close()

parts, err := client.Partitions("my_topic")
if err != nil {
Expand All @@ -154,7 +103,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
tst, err := client.Leader("my_topic", 0xb)
if err != nil {
t.Error(err)
} else if tst.ID() != 0xaa {
} else if tst.ID() != 5 {
t.Error("Leader for my_topic had incorrect ID.")
}

Expand Down
Loading