Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework how the client connects to brokers. #10

Merged
merged 4 commits into from
Aug 14, 2013
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Broker struct {

correlation_id int32
conn net.Conn
conn_err error
lock sync.Mutex

responses chan responsePromise
Expand All @@ -27,7 +28,7 @@ type responsePromise struct {
}

// NewBroker creates and returns a Broker targetting the given host:port address.
// This does not attempt to actually connect, you have to call Connect() for that.
// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that.
func NewBroker(host string, port int32) *Broker {
b := new(Broker)
b.id = -1 // don't know it yet
Expand All @@ -40,17 +41,39 @@ func (b *Broker) Connect() error {
b.lock.Lock()
defer b.lock.Unlock()

return b.connect()
}

// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is
// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before
// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for
// the connection instead of simply returning NotConnected. This does mean that if someone is already operating
// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock.
func (b *Broker) AsyncConnect() {
b.lock.Lock()

go func() {
defer b.lock.Unlock()
b.connect()
}()

}

func (b *Broker) connect() error {
if b.conn != nil {
return AlreadyConnected
}
b.conn_err = nil

addr, err := net.ResolveIPAddr("ip", b.host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that you could do addr, b.conn_err = .... Just a thought; feel free to ignore if you have some reason not to do that.

if err != nil {
b.conn_err = err
return err
}

b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
if err != nil {
b.conn_err = err
return err
}

Expand Down Expand Up @@ -78,6 +101,7 @@ func (b *Broker) Close() error {
err := b.conn.Close()

b.conn = nil
b.conn_err = nil
b.done = nil
b.responses = nil

Expand Down Expand Up @@ -184,7 +208,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
defer b.lock.Unlock()

if b.conn == nil {
return nil, NotConnected
if b.conn_err != nil {
return nil, b.conn_err
} else {
return nil, NotConnected
}
}

fullRequest := request{b.correlation_id, clientID, req}
Expand Down
32 changes: 12 additions & 20 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
partitions := client.leaders[topic]
if partitions != nil {
leader, ok := partitions[partition_id]
if ok && leader != -1 {
if ok {
return client.brokers[leader]
}
}
Expand Down Expand Up @@ -205,34 +205,29 @@ func (client *Client) cachedPartitions(topic string) []int32 {

// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
func (client *Client) update(data *MetadataResponse) ([]string, error) {
client.lock.Lock()
defer client.lock.Unlock()

// First discard brokers that we already know about. This avoids bouncing TCP connections,
// and especially avoids closing valid connections out from under other code which may be trying
// to use them. We only need a read-lock for this.
// to use them.
var newBrokers []*Broker
client.lock.RLock()
for _, broker := range data.Brokers {
if !broker.Equals(client.brokers[broker.ID()]) {
newBrokers = append(newBrokers, broker)
}
}
client.lock.RUnlock()

// connect to the brokers before taking the write lock, as this can take a while
// to timeout if one of them isn't reachable
for _, broker := range newBrokers {
err := broker.Connect()
if err != nil {
return nil, err
}
}

client.lock.Lock()
defer client.lock.Unlock()

// Now asynchronously try to open connections to the new brokers. We don't care if they
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
// If it fails and we do care, whoever tries to use it will get the connection error.
// If we have an old broker with that ID (but a different host/port, since they didn't
// compare as equals above) then close and remove that broker before saving the new one.
for _, broker := range newBrokers {
if client.brokers[broker.ID()] != nil {
go client.brokers[broker.ID()].Close()
}
broker.AsyncConnect()
client.brokers[broker.ID()] = broker
}

Expand All @@ -251,11 +246,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
for _, partition := range topic.Partitions {
switch partition.Err {
case LEADER_NOT_AVAILABLE:
// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
// partition is in the middle of leader election, so we fallthrough to save it
// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
toRetry[topic.Name] = true
fallthrough
delete(client.leaders[topic.Name], partition.Id)
case NO_ERROR:
client.leaders[topic.Name][partition.Id] = partition.Leader
default:
Expand Down