Skip to content

Commit

Permalink
Merge pull request #548 from Shopify/die-broker-die
Browse files Browse the repository at this point in the history
Make dead brokers die harder
  • Loading branch information
eapache committed Oct 1, 2015
2 parents a23cf43 + b51603a commit bc4baeb
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,40 +344,47 @@ func (b *Broker) encode(pe packetEncoder) (err error) {
}

func (b *Broker) responseReceiver() {
var dead error
header := make([]byte, 8)
for response := range b.responses {
if dead != nil {
response.errors <- dead
continue
}

err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
if err != nil {
dead = err
response.errors <- err
continue
}

_, err = io.ReadFull(b.conn, header)
if err != nil {
dead = err
response.errors <- err
continue
}

decodedHeader := responseHeader{}
err = decode(header, &decodedHeader)
if err != nil {
dead = err
response.errors <- err
continue
}
if decodedHeader.correlationID != response.correlationID {
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
response.errors <- dead
continue
}

buf := make([]byte, decodedHeader.length-4)
_, err = io.ReadFull(b.conn, buf)
if err != nil {
// XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may
// fail with a timeout error. If this happens, our connection is permanently toast since we will no longer
// be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data).
// Can we/should we fail harder in that case?
dead = err
response.errors <- err
continue
}
Expand Down

0 comments on commit bc4baeb

Please sign in to comment.