Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Synced error names and descriptions with the kafka's protocol" #1262

Merged
merged 1 commit into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {

if topic == "" {
return ErrInvalidTopicException
return ErrInvalidTopic
}

if detail == nil {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
return ErrInvalidTopicException
return ErrInvalidTopic
}

request := &DeleteTopicsRequest{
Expand Down Expand Up @@ -188,7 +188,7 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {

func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopicException
return ErrInvalidTopic
}

topicPartitions := make(map[string]*TopicPartition)
Expand Down Expand Up @@ -224,7 +224,7 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {

if topic == "" {
return ErrInvalidTopicException
return ErrInvalidTopic
}

topics := make(map[string]*DeleteRecordsRequestTopic)
Expand Down
2 changes: 1 addition & 1 deletion admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestClusterAdminDeleteEmptyTopic(t *testing.T) {
}

err = admin.DeleteTopic("")
if err != ErrInvalidTopicException {
if err != ErrInvalidTopic {
t.Fatal(err)
}

Expand Down
6 changes: 3 additions & 3 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *asyncProducer) dispatcher() {
continue
}
if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
p.returnError(msg, ErrMessageTooLarge)
p.returnError(msg, ErrMessageSizeTooLarge)
continue
}

Expand Down Expand Up @@ -827,7 +827,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
case ErrDuplicateSequenceNumber:
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
retryTopics = append(retryTopics, topic)
// Other non-retriable errors
Expand All @@ -852,7 +852,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
}

switch block.Err {
case ErrCorruptMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
Expand Down
10 changes: 5 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (client *client) RefreshMetadata(topics ...string) error {
// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
for _, topic := range topics {
if len(topic) == 0 {
return ErrInvalidTopicException // this is the error that 0.8.2 and later correctly return
return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
}
}

Expand Down Expand Up @@ -465,7 +465,7 @@ func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
}

if coordinator == nil {
return nil, ErrCoordinatorNotAvailable
return nil, ErrConsumerCoordinatorNotAvailable
}

_ = coordinator.Open(client.conf)
Expand Down Expand Up @@ -790,7 +790,7 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
switch topic.Err {
case ErrNoError:
break
case ErrInvalidTopicException, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
err = topic.Err
continue
case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
Expand Down Expand Up @@ -876,7 +876,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
return response, nil

case ErrCoordinatorNotAvailable:
case ErrConsumerCoordinatorNotAvailable:
Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)

// This is very ugly, but this scenario will only happen once per cluster.
Expand All @@ -887,7 +887,7 @@ func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemainin
time.Sleep(2 * time.Second)
}

return retry(ErrCoordinatorNotAvailable)
return retry(ErrConsumerCoordinatorNotAvailable)
default:
return nil, response.Err
}
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrCoordinatorNotAvailable
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

coordinatorResponse2 := new(ConsumerMetadataResponse)
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
}

coordinatorResponse1 := new(ConsumerMetadataResponse)
coordinatorResponse1.Err = ErrCoordinatorNotAvailable
coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
seedBroker.Returns(coordinatorResponse1)

metadataResponse2 := new(MetadataResponse)
Expand Down
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ type Config struct {
Default int32
// The maximum number of message bytes to fetch from the broker in a
// single request. Messages larger than this will return
// ErrMessageSizeTooLarge and will not be consumable, so you must be sure
// ErrMessageTooLarge and will not be consumable, so you must be sure
// this is at least as large as your largest message. Defaults to 0
// (no limit). Similar to the JVM's `fetch.message.max.bytes`. The
// global `sarama.MaxResponseSize` still applies.
Expand Down
2 changes: 1 addition & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
if partialTrailingMessage {
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
// we can't ask for more data, we've hit the configured limit
child.sendError(ErrMessageSizeTooLarge)
child.sendError(ErrMessageTooLarge)
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
Expand Down
8 changes: 4 additions & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
switch join.Err {
case ErrNoError:
c.memberID = join.MemberId
case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
}
switch sync.Err {
case ErrNoError:
case ErrUnknownMemberID, ErrIllegalGeneration: // reset member ID and retry immediately
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, coordinator, topics, handler, retries)
case ErrRebalanceInProgress: // retry after backoff
Expand Down Expand Up @@ -366,7 +366,7 @@ func (c *consumerGroup) leave() error {

// Check response
switch resp.Err {
case ErrRebalanceInProgress, ErrUnknownMemberID, ErrNoError:
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
return nil
default:
return resp.Err
Expand Down Expand Up @@ -664,7 +664,7 @@ func (s *consumerGroupSession) heartbeatLoop() {
switch resp.Err {
case ErrNoError:
retries = s.parent.config.Metadata.Retry.Max
case ErrRebalanceInProgress, ErrUnknownMemberID, ErrIllegalGeneration:
case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
return
default:
s.parent.handleError(err, "", -1)
Expand Down
6 changes: 3 additions & 3 deletions consumer_metadata_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ var (
)

func TestConsumerMetadataResponseError(t *testing.T) {
response := &ConsumerMetadataResponse{Err: ErrCoordinatorLoadInProgress}
response := &ConsumerMetadataResponse{Err: ErrOffsetsLoadInProgress}
testEncodable(t, "", response, consumerMetadataResponseError)

decodedResp := &ConsumerMetadataResponse{}
if err := versionedDecode(consumerMetadataResponseError, decodedResp, 0); err != nil {
t.Error("could not decode: ", err)
}

if decodedResp.Err != ErrCoordinatorLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrCoordinatorLoadInProgress)
if decodedResp.Err != ErrOffsetsLoadInProgress {
t.Errorf("got %s, want %s", decodedResp.Err, ErrOffsetsLoadInProgress)
}
}

Expand Down
Loading