diff --git a/broker.go b/broker.go index 5958476a7..7a5685a8a 100644 --- a/broker.go +++ b/broker.go @@ -14,6 +14,7 @@ type Broker struct { correlation_id int32 conn net.Conn + conn_err error lock sync.Mutex responses chan responsePromise @@ -27,7 +28,7 @@ type responsePromise struct { } // NewBroker creates and returns a Broker targetting the given host:port address. -// This does not attempt to actually connect, you have to call Connect() for that. +// This does not attempt to actually connect, you have to call Open() for that. func NewBroker(host string, port int32) *Broker { b := new(Broker) b.id = -1 // don't know it yet @@ -36,34 +37,52 @@ func NewBroker(host string, port int32) *Broker { return b } -func (b *Broker) Connect() error { +// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which +// connects and releases the lock. This means any subsequent operations on the broker will block waiting for +// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected(). +// The only error Open will return directly is AlreadyConnected. +func (b *Broker) Open() error { b.lock.Lock() - defer b.lock.Unlock() if b.conn != nil { + b.lock.Unlock() return AlreadyConnected } - addr, err := net.ResolveIPAddr("ip", b.host) - if err != nil { - return err - } + go func() { + defer b.lock.Unlock() - b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) - if err != nil { - return err - } + var addr *net.IPAddr + addr, b.conn_err = net.ResolveIPAddr("ip", b.host) + if b.conn_err != nil { + return + } - b.done = make(chan bool) + b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)}) + if b.conn_err != nil { + return + } + + b.done = make(chan bool) - // permit a few outstanding requests before we block waiting for responses - b.responses = make(chan responsePromise, 4) + // permit a few outstanding requests before we block waiting for responses + b.responses = make(chan responsePromise, 4) - go b.responseReceiver() + go b.responseReceiver() + }() return nil } +// Connected returns true if the broker is connected and false otherwise. If the broker is not +// connected but it had tried to connect, the error from that connection attempt is also returned. +func (b *Broker) Connected() (bool, error) { + b.lock.Lock() + defer b.lock.Unlock() + + return b.conn != nil, b.conn_err +} + func (b *Broker) Close() error { b.lock.Lock() defer b.lock.Unlock() @@ -78,6 +97,7 @@ func (b *Broker) Close() error { err := b.conn.Close() b.conn = nil + b.conn_err = nil b.done = nil b.responses = nil @@ -184,7 +204,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool) defer b.lock.Unlock() if b.conn == nil { - return nil, NotConnected + if b.conn_err != nil { + return nil, b.conn_err + } else { + return nil, NotConnected + } } fullRequest := request{b.correlation_id, clientID, req} diff --git a/broker_test.go b/broker_test.go index a63191856..23dd573f8 100644 --- a/broker_test.go +++ b/broker_test.go @@ -141,18 +141,20 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker { func ExampleBroker() error { broker := NewBroker("localhost", 9092) - err := broker.Connect() + err := broker.Open() if err != nil { return err } + defer broker.Close() request := MetadataRequest{Topics: []string{"myTopic"}} response, err := broker.GetMetadata("myClient", &request) + if err != nil { + return err + } fmt.Println("There are", len(response.Topics), "topics active in the cluster.") - broker.Close() - return nil } @@ -217,7 +219,7 @@ func TestSimpleBrokerCommunication(t *testing.T) { defer mockBroker.Close() broker := NewBroker("localhost", mockBroker.Port()) - err := broker.Connect() + err := broker.Open() if err != nil { t.Fatal(err) } diff --git a/client.go b/client.go index 80d70df95..691662ed8 100644 --- a/client.go +++ b/client.go @@ -22,7 +22,11 @@ type Client struct { // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created. func NewClient(id string, host string, port int32) (client *Client, err error) { tmp := NewBroker(host, port) - err = tmp.Connect() + err = tmp.Open() + if err != nil { + return nil, err + } + _, err = tmp.Connected() if err != nil { return nil, err } @@ -177,7 +181,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker { partitions := client.leaders[topic] if partitions != nil { leader, ok := partitions[partition_id] - if ok && leader != -1 { + if ok { return client.brokers[leader] } } @@ -205,34 +209,29 @@ func (client *Client) cachedPartitions(topic string) []int32 { // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE func (client *Client) update(data *MetadataResponse) ([]string, error) { + client.lock.Lock() + defer client.lock.Unlock() + // First discard brokers that we already know about. This avoids bouncing TCP connections, // and especially avoids closing valid connections out from under other code which may be trying - // to use them. We only need a read-lock for this. + // to use them. var newBrokers []*Broker - client.lock.RLock() for _, broker := range data.Brokers { if !broker.Equals(client.brokers[broker.ID()]) { newBrokers = append(newBrokers, broker) } } - client.lock.RUnlock() - - // connect to the brokers before taking the write lock, as this can take a while - // to timeout if one of them isn't reachable - for _, broker := range newBrokers { - err := broker.Connect() - if err != nil { - return nil, err - } - } - - client.lock.Lock() - defer client.lock.Unlock() + // Now asynchronously try to open connections to the new brokers. We don't care if they + // fail, since maybe that broker is unreachable but doesn't have a topic we care about. + // If it fails and we do care, whoever tries to use it will get the connection error. + // If we have an old broker with that ID (but a different host/port, since they didn't + // compare as equals above) then close and remove that broker before saving the new one. for _, broker := range newBrokers { if client.brokers[broker.ID()] != nil { go client.brokers[broker.ID()].Close() } + broker.Open() client.brokers[broker.ID()] = broker } @@ -251,11 +250,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) { for _, partition := range topic.Partitions { switch partition.Err { case LEADER_NOT_AVAILABLE: - // in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the - // partition is in the middle of leader election, so we fallthrough to save it - // anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID) toRetry[topic.Name] = true - fallthrough + delete(client.leaders[topic.Name], partition.Id) case NO_ERROR: client.leaders[topic.Name][partition.Id] = partition.Leader default: diff --git a/errors.go b/errors.go index 818525494..7d2262071 100644 --- a/errors.go +++ b/errors.go @@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp // (meaning one outside of the range [0...numPartitions-1]). var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.") -// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected. +// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected. var AlreadyConnected = errors.New("kafka: broker: already connected") // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.