diff --git a/broker.go b/broker.go index 6bdea3869..46f06a0f3 100644 --- a/broker.go +++ b/broker.go @@ -344,16 +344,24 @@ func (b *Broker) encode(pe packetEncoder) (err error) { } func (b *Broker) responseReceiver() { + var dead error header := make([]byte, 8) for response := range b.responses { + if dead != nil { + response.errors <- dead + continue + } + err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) if err != nil { + dead = err response.errors <- err continue } _, err = io.ReadFull(b.conn, header) if err != nil { + dead = err response.errors <- err continue } @@ -361,23 +369,22 @@ func (b *Broker) responseReceiver() { decodedHeader := responseHeader{} err = decode(header, &decodedHeader) if err != nil { + dead = err response.errors <- err continue } if decodedHeader.correlationID != response.correlationID { // TODO if decoded ID < cur ID, discard until we catch up // TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response - response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} + dead = PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)} + response.errors <- dead continue } buf := make([]byte, decodedHeader.length-4) _, err = io.ReadFull(b.conn, buf) if err != nil { - // XXX: the above ReadFull call inherits the same ReadDeadline set at the top of this loop, so it may - // fail with a timeout error. If this happens, our connection is permanently toast since we will no longer - // be aligned correctly on the stream (we'll be reading garbage Kafka headers from the middle of data). - // Can we/should we fail harder in that case? + dead = err response.errors <- err continue }