Skip to content

Commit

Permalink
Merge pull request #10 from Shopify/handle_unreachable_brokers
Browse files Browse the repository at this point in the history
Rework how the client connects to brokers.
  • Loading branch information
eapache committed Aug 14, 2013
2 parents 0d0b110 + cb8190c commit 7626e3d
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 42 deletions.
56 changes: 40 additions & 16 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Broker struct {

correlation_id int32
conn net.Conn
conn_err error
lock sync.Mutex

responses chan responsePromise
Expand All @@ -27,7 +28,7 @@ type responsePromise struct {
}

// NewBroker creates and returns a Broker targetting the given host:port address.
// This does not attempt to actually connect, you have to call Connect() for that.
// This does not attempt to actually connect, you have to call Open() for that.
func NewBroker(host string, port int32) *Broker {
b := new(Broker)
b.id = -1 // don't know it yet
Expand All @@ -36,34 +37,52 @@ func NewBroker(host string, port int32) *Broker {
return b
}

func (b *Broker) Connect() error {
// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
// The only error Open will return directly is AlreadyConnected.
func (b *Broker) Open() error {
b.lock.Lock()
defer b.lock.Unlock()

if b.conn != nil {
b.lock.Unlock()
return AlreadyConnected
}

addr, err := net.ResolveIPAddr("ip", b.host)
if err != nil {
return err
}
go func() {
defer b.lock.Unlock()

b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if err != nil {
return err
}
var addr *net.IPAddr
addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
if b.conn_err != nil {
return
}

b.done = make(chan bool)
b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if b.conn_err != nil {
return
}

b.done = make(chan bool)

// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)
// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)

go b.responseReceiver()
go b.responseReceiver()
}()

return nil
}

// Connected returns true if the broker is connected and false otherwise. If the broker is not
// connected but it had tried to connect, the error from that connection attempt is also returned.
func (b *Broker) Connected() (bool, error) {
b.lock.Lock()
defer b.lock.Unlock()

return b.conn != nil, b.conn_err
}

func (b *Broker) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -78,6 +97,7 @@ func (b *Broker) Close() error {
err := b.conn.Close()

b.conn = nil
b.conn_err = nil
b.done = nil
b.responses = nil

Expand Down Expand Up @@ -184,7 +204,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
defer b.lock.Unlock()

if b.conn == nil {
return nil, NotConnected
if b.conn_err != nil {
return nil, b.conn_err
} else {
return nil, NotConnected
}
}

fullRequest := request{b.correlation_id, clientID, req}
Expand Down
10 changes: 6 additions & 4 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {

func ExampleBroker() error {
broker := NewBroker("localhost", 9092)
err := broker.Connect()
err := broker.Open()
if err != nil {
return err
}
defer broker.Close()

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
if err != nil {
return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

broker.Close()

return nil
}

Expand Down Expand Up @@ -217,7 +219,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
defer mockBroker.Close()

broker := NewBroker("localhost", mockBroker.Port())
err := broker.Connect()
err := broker.Open()
if err != nil {
t.Fatal(err)
}
Expand Down
38 changes: 17 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ type Client struct {
// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
func NewClient(id string, host string, port int32) (client *Client, err error) {
tmp := NewBroker(host, port)
err = tmp.Connect()
err = tmp.Open()
if err != nil {
return nil, err
}
_, err = tmp.Connected()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +181,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
partitions := client.leaders[topic]
if partitions != nil {
leader, ok := partitions[partition_id]
if ok && leader != -1 {
if ok {
return client.brokers[leader]
}
}
Expand Down Expand Up @@ -205,34 +209,29 @@ func (client *Client) cachedPartitions(topic string) []int32 {

// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
func (client *Client) update(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

// First discard brokers that we already know about. This avoids bouncing TCP connections,
// and especially avoids closing valid connections out from under other code which may be trying
// to use them. We only need a read-lock for this.
// to use them.
var newBrokers []*Broker
client.lock.RLock()
for _, broker := range data.Brokers {
if !broker.Equals(client.brokers[broker.ID()]) {
newBrokers = append(newBrokers, broker)
}
}
client.lock.RUnlock()

// connect to the brokers before taking the write lock, as this can take a while
// to timeout if one of them isn't reachable
for _, broker := range newBrokers {
err := broker.Connect()
if err != nil {
return nil, err
}
}

client.lock.Lock()
defer client.lock.Unlock()

// Now asynchronously try to open connections to the new brokers. We don't care if they
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
// If it fails and we do care, whoever tries to use it will get the connection error.
// If we have an old broker with that ID (but a different host/port, since they didn't
// compare as equals above) then close and remove that broker before saving the new one.
for _, broker := range newBrokers {
if client.brokers[broker.ID()] != nil {
go client.brokers[broker.ID()].Close()
}
broker.Open()
client.brokers[broker.ID()] = broker
}

Expand All @@ -251,11 +250,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
for _, partition := range topic.Partitions {
switch partition.Err {
case LEADER_NOT_AVAILABLE:
// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
// partition is in the middle of leader election, so we fallthrough to save it
// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
toRetry[topic.Name] = true
fallthrough
delete(client.leaders[topic.Name], partition.Id)
case NO_ERROR:
client.leaders[topic.Name][partition.Id] = partition.Leader
default:
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp
// (meaning one outside of the range [0...numPartitions-1]).
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
var AlreadyConnected = errors.New("kafka: broker: already connected")

// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
Expand Down

0 comments on commit 7626e3d

Please sign in to comment.