Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework how the client connects to brokers. #10

Merged
merged 4 commits into from
Aug 14, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 40 additions & 16 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Broker struct {

correlation_id int32
conn net.Conn
conn_err error
lock sync.Mutex

responses chan responsePromise
Expand All @@ -27,7 +28,7 @@ type responsePromise struct {
}

// NewBroker creates and returns a Broker targetting the given host:port address.
// This does not attempt to actually connect, you have to call Connect() for that.
// This does not attempt to actually connect, you have to call Open() for that.
func NewBroker(host string, port int32) *Broker {
b := new(Broker)
b.id = -1 // don't know it yet
Expand All @@ -36,34 +37,52 @@ func NewBroker(host string, port int32) *Broker {
return b
}

func (b *Broker) Connect() error {
// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
// The only error Open will return directly is AlreadyConnected.
func (b *Broker) Open() error {
b.lock.Lock()
defer b.lock.Unlock()

if b.conn != nil {
b.lock.Unlock()
return AlreadyConnected
}

addr, err := net.ResolveIPAddr("ip", b.host)
if err != nil {
return err
}
go func() {
defer b.lock.Unlock()

b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if err != nil {
return err
}
var addr *net.IPAddr
addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
if b.conn_err != nil {
return
}

b.done = make(chan bool)
b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if b.conn_err != nil {
return
}

b.done = make(chan bool)

// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)
// permit a few outstanding requests before we block waiting for responses
b.responses = make(chan responsePromise, 4)

go b.responseReceiver()
go b.responseReceiver()
}()

return nil
}

// Connected returns true if the broker is connected and false otherwise. If the broker is not
// connected but it had tried to connect, the error from that connection attempt is also returned.
func (b *Broker) Connected() (bool, error) {
b.lock.Lock()
defer b.lock.Unlock()

return b.conn != nil, b.conn_err
}

func (b *Broker) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
Expand All @@ -78,6 +97,7 @@ func (b *Broker) Close() error {
err := b.conn.Close()

b.conn = nil
b.conn_err = nil
b.done = nil
b.responses = nil

Expand Down Expand Up @@ -184,7 +204,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
defer b.lock.Unlock()

if b.conn == nil {
return nil, NotConnected
if b.conn_err != nil {
return nil, b.conn_err
} else {
return nil, NotConnected
}
}

fullRequest := request{b.correlation_id, clientID, req}
Expand Down
10 changes: 6 additions & 4 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,18 +141,20 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {

func ExampleBroker() error {
broker := NewBroker("localhost", 9092)
err := broker.Connect()
err := broker.Open()
if err != nil {
return err
}
defer broker.Close()

request := MetadataRequest{Topics: []string{"myTopic"}}
response, err := broker.GetMetadata("myClient", &request)
if err != nil {
return err
}

fmt.Println("There are", len(response.Topics), "topics active in the cluster.")

broker.Close()

return nil
}

Expand Down Expand Up @@ -217,7 +219,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
defer mockBroker.Close()

broker := NewBroker("localhost", mockBroker.Port())
err := broker.Connect()
err := broker.Open()
if err != nil {
t.Fatal(err)
}
Expand Down
38 changes: 17 additions & 21 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ type Client struct {
// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
func NewClient(id string, host string, port int32) (client *Client, err error) {
tmp := NewBroker(host, port)
err = tmp.Connect()
err = tmp.Open()
if err != nil {
return nil, err
}
_, err = tmp.Connected()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +181,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
partitions := client.leaders[topic]
if partitions != nil {
leader, ok := partitions[partition_id]
if ok && leader != -1 {
if ok {
return client.brokers[leader]
}
}
Expand Down Expand Up @@ -205,34 +209,29 @@ func (client *Client) cachedPartitions(topic string) []int32 {

// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
func (client *Client) update(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

// First discard brokers that we already know about. This avoids bouncing TCP connections,
// and especially avoids closing valid connections out from under other code which may be trying
// to use them. We only need a read-lock for this.
// to use them.
var newBrokers []*Broker
client.lock.RLock()
for _, broker := range data.Brokers {
if !broker.Equals(client.brokers[broker.ID()]) {
newBrokers = append(newBrokers, broker)
}
}
client.lock.RUnlock()

// connect to the brokers before taking the write lock, as this can take a while
// to timeout if one of them isn't reachable
for _, broker := range newBrokers {
err := broker.Connect()
if err != nil {
return nil, err
}
}

client.lock.Lock()
defer client.lock.Unlock()

// Now asynchronously try to open connections to the new brokers. We don't care if they
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
// If it fails and we do care, whoever tries to use it will get the connection error.
// If we have an old broker with that ID (but a different host/port, since they didn't
// compare as equals above) then close and remove that broker before saving the new one.
for _, broker := range newBrokers {
if client.brokers[broker.ID()] != nil {
go client.brokers[broker.ID()].Close()
}
broker.Open()
client.brokers[broker.ID()] = broker
}

Expand All @@ -251,11 +250,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
for _, partition := range topic.Partitions {
switch partition.Err {
case LEADER_NOT_AVAILABLE:
// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
// partition is in the middle of leader election, so we fallthrough to save it
// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
toRetry[topic.Name] = true
fallthrough
delete(client.leaders[topic.Name], partition.Id)
case NO_ERROR:
client.leaders[topic.Name][partition.Id] = partition.Leader
default:
Expand Down
2 changes: 1 addition & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp
// (meaning one outside of the range [0...numPartitions-1]).
var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")

// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
var AlreadyConnected = errors.New("kafka: broker: already connected")

// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
Expand Down