Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup error formatting #495

Merged
merged 2 commits into from
Jul 29, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (b *Broker) responseReceiver() {
if decodedHeader.correlationID != response.correlationID {
// TODO if decoded ID < cur ID, discard until we catch up
// TODO if decoded ID > cur ID, save it so when cur ID catches up we have a response
response.errors <- PacketDecodingError{fmt.Sprintf("CorrelationID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
response.errors <- PacketDecodingError{fmt.Sprintf("correlation ID didn't match, wanted %d, got %d", response.correlationID, decodedHeader.correlationID)}
continue
}

Expand Down
50 changes: 25 additions & 25 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,71 +183,71 @@ func (c *Config) Validate() error {
// validate Net values
switch {
case c.Net.MaxOpenRequests <= 0:
return ConfigurationError("Invalid Net.MaxOpenRequests, must be > 0")
return ConfigurationError("Net.MaxOpenRequests must be > 0")
case c.Net.DialTimeout <= 0:
return ConfigurationError("Invalid Net.DialTimeout, must be > 0")
return ConfigurationError("Net.DialTimeout must be > 0")
case c.Net.ReadTimeout <= 0:
return ConfigurationError("Invalid Net.ReadTimeout, must be > 0")
return ConfigurationError("Net.ReadTimeout must be > 0")
case c.Net.WriteTimeout <= 0:
return ConfigurationError("Invalid Net.WriteTimeout, must be > 0")
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.KeepAlive < 0:
return ConfigurationError("Invalid Net.KeepAlive, must be >= 0")
return ConfigurationError("Net.KeepAlive must be >= 0")
}

// validate the Metadata values
switch {
case c.Metadata.Retry.Max < 0:
return ConfigurationError("Invalid Metadata.Retry.Max, must be >= 0")
return ConfigurationError("Metadata.Retry.Max must be >= 0")
case c.Metadata.Retry.Backoff < 0:
return ConfigurationError("Invalid Metadata.Retry.Backoff, must be >= 0")
return ConfigurationError("Metadata.Retry.Backoff must be >= 0")
case c.Metadata.RefreshFrequency < 0:
return ConfigurationError("Invalid Metadata.RefreshFrequency, must be >= 0")
return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
}

// validate the Producer values
switch {
case c.Producer.MaxMessageBytes <= 0:
return ConfigurationError("Invalid Producer.MaxMessageBytes, must be > 0")
return ConfigurationError("Producer.MaxMessageBytes must be > 0")
case c.Producer.RequiredAcks < -1:
return ConfigurationError("Invalid Producer.RequiredAcks, must be >= -1")
return ConfigurationError("Producer.RequiredAcks must be >= -1")
case c.Producer.Timeout <= 0:
return ConfigurationError("Invalid Producer.Timeout, must be > 0")
return ConfigurationError("Producer.Timeout must be > 0")
case c.Producer.Partitioner == nil:
return ConfigurationError("Invalid Producer.Partitioner, must not be nil")
return ConfigurationError("Producer.Partitioner must not be nil")
case c.Producer.Flush.Bytes < 0:
return ConfigurationError("Invalid Producer.Flush.Bytes, must be >= 0")
return ConfigurationError("Producer.Flush.Bytes must be >= 0")
case c.Producer.Flush.Messages < 0:
return ConfigurationError("Invalid Producer.Flush.Messages, must be >= 0")
return ConfigurationError("Producer.Flush.Messages must be >= 0")
case c.Producer.Flush.Frequency < 0:
return ConfigurationError("Invalid Producer.Flush.Frequency, must be >= 0")
return ConfigurationError("Producer.Flush.Frequency must be >= 0")
case c.Producer.Flush.MaxMessages < 0:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= 0")
return ConfigurationError("Producer.Flush.MaxMessages must be >= 0")
case c.Producer.Flush.MaxMessages > 0 && c.Producer.Flush.MaxMessages < c.Producer.Flush.Messages:
return ConfigurationError("Invalid Producer.Flush.MaxMessages, must be >= Producer.Flush.Messages when set")
return ConfigurationError("Producer.Flush.MaxMessages must be >= Producer.Flush.Messages when set")
case c.Producer.Retry.Max < 0:
return ConfigurationError("Invalid Producer.Retry.Max, must be >= 0")
return ConfigurationError("Producer.Retry.Max must be >= 0")
case c.Producer.Retry.Backoff < 0:
return ConfigurationError("Invalid Producer.Retry.Backoff, must be >= 0")
return ConfigurationError("Producer.Retry.Backoff must be >= 0")
}

// validate the Consumer values
switch {
case c.Consumer.Fetch.Min <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Min, must be > 0")
return ConfigurationError("Consumer.Fetch.Min must be > 0")
case c.Consumer.Fetch.Default <= 0:
return ConfigurationError("Invalid Consumer.Fetch.Default, must be > 0")
return ConfigurationError("Consumer.Fetch.Default must be > 0")
case c.Consumer.Fetch.Max < 0:
return ConfigurationError("Invalid Consumer.Fetch.Max, must be >= 0")
return ConfigurationError("Consumer.Fetch.Max must be >= 0")
case c.Consumer.MaxWaitTime < 1*time.Millisecond:
return ConfigurationError("Invalid Consumer.MaxWaitTime, must be > 1ms")
return ConfigurationError("Consumer.MaxWaitTime must be > 1ms")
case c.Consumer.Retry.Backoff < 0:
return ConfigurationError("Invalid Consumer.Retry.Backoff, must be >= 0")
return ConfigurationError("Consumer.Retry.Backoff must be >= 0")
}

// validate misc shared values
switch {
case c.ChannelBufferSize < 0:
return ConfigurationError("Invalid ChannelBufferSize, must be >= 0")
return ConfigurationError("ChannelBufferSize must be >= 0")
}

return nil
Expand Down
4 changes: 2 additions & 2 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func encode(e encoder) ([]byte, error) {
}

if prepEnc.length < 0 || prepEnc.length > int(MaxRequestSize) {
return nil, PacketEncodingError{fmt.Sprintf("Invalid request size: %d", prepEnc.length)}
return nil, PacketEncodingError{fmt.Sprintf("invalid request size (%d)", prepEnc.length)}
}

realEnc.raw = make([]byte, prepEnc.length)
Expand Down Expand Up @@ -55,7 +55,7 @@ func decode(buf []byte, in decoder) error {
}

if helper.off != len(buf) {
return PacketDecodingError{"Length was invalid"}
return PacketDecodingError{"invalid length"}
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// ErrOutOfBrokers is the error returned when the client has run out of brokers to talk to because all of them errored
// or otherwise failed to respond.
var ErrOutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
var ErrOutOfBrokers = errors.New("kafka: client has run out of available brokers to talk to (Is your cluster reachable?)")

// ErrClosedClient is the error returned when a method is called on a client that has been closed.
var ErrClosedClient = errors.New("kafka: tried to use a client that was closed")
Expand Down Expand Up @@ -44,7 +44,7 @@ type PacketEncodingError struct {
}

func (err PacketEncodingError) Error() string {
return fmt.Sprintf("kafka: Error while encoding packet: %s", err.Info)
return fmt.Sprintf("kafka: error encoding packet: %s", err.Info)
}

// PacketDecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
Expand All @@ -54,15 +54,15 @@ type PacketDecodingError struct {
}

func (err PacketDecodingError) Error() string {
return fmt.Sprintf("kafka: Error while decoding packet: %s", err.Info)
return fmt.Sprintf("kafka: error decoding packet: %s", err.Info)
}

// ConfigurationError is the type of error returned from a constructor (e.g. NewClient, or NewConsumer)
// when the specified configuration is invalid.
type ConfigurationError string

func (err ConfigurationError) Error() string {
return "kafka: Invalid Configuration: " + string(err)
return "kafka: invalid configuration (" + string(err) + ")"
}

// KError is the type of error that can be returned directly by the Kafka broker.
Expand Down
2 changes: 1 addition & 1 deletion length_field.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (l *lengthField) run(curOffset int, buf []byte) error {

func (l *lengthField) check(curOffset int, buf []byte) error {
if uint32(curOffset-l.startOffset-4) != binary.BigEndian.Uint32(buf[l.startOffset:]) {
return PacketDecodingError{"Lengthfield check failed"}
return PacketDecodingError{"length field invalid"}
}

return nil
Expand Down
6 changes: 3 additions & 3 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (m *Message) encode(pe packetEncoder) error {
m.compressedCache = tmp
payload = m.compressedCache
default:
return PacketEncodingError{fmt.Sprintf("Unsupported compression codec: %d", m.Codec)}
return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
}
}

Expand All @@ -92,7 +92,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}
if format != messageFormat {
return PacketDecodingError{"Unexpected messageFormat"}
return PacketDecodingError{"unexpected messageFormat"}
}

attribute, err := pd.getInt8()
Expand Down Expand Up @@ -135,7 +135,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
}
return m.decodeSet()
default:
return PacketDecodingError{fmt.Sprintf("Invalid compression specified: %d", m.Codec)}
return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", m.Codec)}
}

err = pd.pop()
Expand Down
8 changes: 4 additions & 4 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (pe *prepEncoder) putInt64(in int64) {

func (pe *prepEncoder) putArrayLength(in int) error {
if in > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Array too long: %d", in)}
return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
}
pe.length += 4
return nil
Expand All @@ -43,15 +43,15 @@ func (pe *prepEncoder) putBytes(in []byte) error {
return nil
}
if len(in) > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
}
pe.length += len(in)
return nil
}

func (pe *prepEncoder) putRawBytes(in []byte) error {
if len(in) > math.MaxInt32 {
return PacketEncodingError{fmt.Sprintf("Byteslice too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
}
pe.length += len(in)
return nil
Expand All @@ -60,7 +60,7 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
func (pe *prepEncoder) putString(in string) error {
pe.length += 2
if len(in) > math.MaxInt16 {
return PacketEncodingError{fmt.Sprintf("String too long: %d", len(in))}
return PacketEncodingError{fmt.Sprintf("string too long (%d)", len(in))}
}
pe.length += len(in)
return nil
Expand Down
10 changes: 5 additions & 5 deletions real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (rd *realDecoder) getArrayLength() (int, error) {
rd.off = len(rd.raw)
return -1, ErrInsufficientData
} else if tmp > 2*math.MaxUint16 {
return -1, PacketDecodingError{"getArrayLength failed: Invalid array length"}
return -1, PacketDecodingError{"invalid array length"}
}
return tmp, nil
}
Expand All @@ -82,7 +82,7 @@ func (rd *realDecoder) getBytes() ([]byte, error) {

switch {
case n < -1:
return nil, PacketDecodingError{"getBytes failed: Invalid length"}
return nil, PacketDecodingError{"invalid byteslice length"}
case n == -1:
return nil, nil
case n == 0:
Expand All @@ -108,7 +108,7 @@ func (rd *realDecoder) getString() (string, error) {

switch {
case n < -1:
return "", PacketDecodingError{"getString failed: invalid length"}
return "", PacketDecodingError{"invalid string length"}
case n == -1:
return "", nil
case n == 0:
Expand Down Expand Up @@ -141,7 +141,7 @@ func (rd *realDecoder) getInt32Array() ([]int32, error) {
}

if n < 0 {
return nil, PacketDecodingError{"getInt32Array failed: invalid length"}
return nil, PacketDecodingError{"invalid array length"}
}

ret := make([]int32, n)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (rd *realDecoder) getInt64Array() ([]int64, error) {
}

if n < 0 {
return nil, PacketDecodingError{"getInt64Array failed: invalid length"}
return nil, PacketDecodingError{"invalid array length"}
}

ret := make([]int64, n)
Expand Down
4 changes: 2 additions & 2 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (r *request) decode(pd packetDecoder) (err error) {

r.body = allocateBody(key, version)
if r.body == nil {
return PacketDecodingError{fmt.Sprintf("Unknown request key: %d", key)}
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
return r.body.decode(pd)
}
Expand All @@ -64,7 +64,7 @@ func decodeRequest(r io.Reader) (req *request, err error) {

length := int32(binary.BigEndian.Uint32(lengthBytes))
if length <= 4 || length > MaxRequestSize {
return nil, PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", length)}
return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
}

encodedReq := make([]byte, length)
Expand Down
2 changes: 1 addition & 1 deletion response_header.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
return err
}
if r.length <= 4 || r.length > MaxResponseSize {
return PacketDecodingError{fmt.Sprintf("Message of length %d too large or too small", r.length)}
return PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", r.length)}
}

r.correlationID, err = pd.getInt32()
Expand Down