From 0b89158135aa238250de1311fe5224183146afe6 Mon Sep 17 00:00:00 2001 From: Travis Bischel Date: Sun, 21 Jan 2024 17:02:40 -0700 Subject: [PATCH] kadm: add DeleteTopic,DeleteGroup,Error Adds Error for many responses, which may sometimes be helpful. --- pkg/kadm/groups.go | 61 ++++++++++++++++++++++++++++++++++ pkg/kadm/partas.go | 12 +++++++ pkg/kadm/topics.go | 83 ++++++++++++++++++++++++++++++++++++---------- 3 files changed, 138 insertions(+), 18 deletions(-) diff --git a/pkg/kadm/groups.go b/pkg/kadm/groups.go index 0b2601b1..6adca804 100644 --- a/pkg/kadm/groups.go +++ b/pkg/kadm/groups.go @@ -162,6 +162,17 @@ func (rs DescribedGroups) On(group string, fn func(*DescribedGroup) error) (Desc return DescribedGroup{}, kerr.GroupIDNotFound } +// Error iterates over all groups and returns the first error encountered, if +// any. +func (ds DescribedGroups) Error() error { + for _, d := range ds { + if d.Err != nil { + return d.Err + } + } + return nil +} + // Topics returns a sorted list of all group names. func (ds DescribedGroups) Names() []string { all := make([]string, 0, len(ds)) @@ -385,6 +396,32 @@ func (rs DeleteGroupResponses) On(group string, fn func(*DeleteGroupResponse) er return DeleteGroupResponse{}, kerr.GroupIDNotFound } +// Error iterates over all groups and returns the first error encountered, if +// any. +func (rs DeleteGroupResponses) Error() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + +// DeleteGroup deletes the specified group. This is similar to DeleteGroups, +// but returns the kerr.ErrorForCode(response.ErrorCode) if the request/response +// is successful. +func (cl *Client) DeleteGroup(ctx context.Context, group string) (DeleteGroupResponse, error) { + rs, err := cl.DeleteGroups(ctx, group) + if err != nil { + return DeleteGroupResponse{}, err + } + g, exists := rs[group] + if !exists { + return DeleteGroupResponse{}, errors.New("requested group was not part of the delete group response") + } + return g, g.Err +} + // DeleteGroups deletes all groups specified. // // The purpose of this request is to allow operators a way to delete groups @@ -984,6 +1021,17 @@ func (rs FetchOffsetsResponses) On(group string, fn func(*FetchOffsetsResponse) return FetchOffsetsResponse{}, kerr.GroupIDNotFound } +// Error iterates over all responses and returns the first error encountered, +// if any. +func (rs FetchOffsetsResponses) Error() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + // FetchManyOffsets issues a fetch offsets requests for each group specified. // // This function is a batch version of FetchOffsets. FetchOffsets and @@ -1092,6 +1140,19 @@ func (ds DeleteOffsetsResponses) EachError(fn func(string, int32, error)) { } } +// Error iterates over all responses and returns the first error encountered, +// if any. +func (ds DeleteOffsetsResponses) Error() error { + for _, ps := range ds { + for _, err := range ps { + if err != nil { + return err + } + } + } + return nil +} + // DeleteOffsets deletes offsets for the given group. // // Originally, offset commits were persisted in Kafka for some retention time. diff --git a/pkg/kadm/partas.go b/pkg/kadm/partas.go index 44246da1..0c6c4a48 100644 --- a/pkg/kadm/partas.go +++ b/pkg/kadm/partas.go @@ -69,6 +69,18 @@ func (rs AlterPartitionAssignmentsResponses) Each(fn func(AlterPartitionAssignme } } +// Error returns the first error in the responses, if any. +func (rs AlterPartitionAssignmentsResponses) Error() error { + for _, ps := range rs { + for _, r := range ps { + if r.Err != nil { + return r.Err + } + } + } + return nil +} + // AlterPartitionAssignments alters partition assignments for the requested // partitions, returning an error if the response could not be issued or if // you do not have permissions. diff --git a/pkg/kadm/topics.go b/pkg/kadm/topics.go index 56461ea9..04d1e269 100644 --- a/pkg/kadm/topics.go +++ b/pkg/kadm/topics.go @@ -91,24 +91,21 @@ func (rs CreateTopicResponses) On(topic string, fn func(*CreateTopicResponse) er return CreateTopicResponse{}, kerr.UnknownTopicOrPartition } +// Error iterates over all responses and returns the first error +// encountered, if any. +func (rs CreateTopicResponses) Error() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + // CreateTopic issues a create topics request with the given partitions, -// replication factor, and (optional) configs for the given topic name. Under -// the hood, this uses the default 15s request timeout and lets Kafka choose -// where to place partitions. This function exists to complement CreateTopics, -// making the single-topic creation case easier to handle. -// -// Version 4 of the underlying create topic request was introduced in Kafka 2.4 -// and brought client support for creation defaults. If talking to a 2.4+ -// cluster, you can use -1 for partitions and replicationFactor to use broker -// defaults. -// -// This package includes a StringPtr function to aid in building config values. -// -// If the topic could not be created this function will return an error. An -// error may be returned due to authorization failure, a failed network -// request, a missing controller or other issues. If the request was successful -// but the CreateTopicResponse.Err is non-nil, this returns the error, so you -// do not need to additionally check the Err field. +// replication factor, and (optional) configs for the given topic name. +// This is similar to CreateTopics, but returns the kerr.ErrorForCode(response.ErrorCode) +// if the request/response is successful. func (cl *Client) CreateTopic( ctx context.Context, partitions int32, @@ -277,8 +274,34 @@ func (rs DeleteTopicResponses) On(topic string, fn func(*DeleteTopicResponse) er return DeleteTopicResponse{}, kerr.UnknownTopicOrPartition } +// Error iterates over all responses and returns the first error +// encountered, if any. +func (rs DeleteTopicResponses) Error() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + +// DeleteTopic issues a delete topic request for the given topic name with a +// (by default) 15s timeout. This is similar to DeleteTopics, but returns the +// kerr.ErrorForCode(response.ErrorCode) if the request/response is successful. +func (cl *Client) DeleteTopic(ctx context.Context, topic string) (DeleteTopicResponse, error) { + rs, err := cl.DeleteTopics(ctx, topic) + if err != nil { + return DeleteTopicResponse{}, err + } + r, exists := rs[topic] + if !exists { + return DeleteTopicResponse{}, errors.New("requested topic was not part of delete topic response") + } + return r, r.Err +} + // DeleteTopics issues a delete topics request for the given topic names with a -// 15s timeout. +// (by default) 15s timeout. // // This does not return an error on authorization failures, instead, // authorization failures are included in the responses. This only returns an @@ -402,6 +425,19 @@ func (rs DeleteRecordsResponses) On(topic string, partition int32, fn func(*Dele return DeleteRecordsResponse{}, kerr.UnknownTopicOrPartition } +// Error iterates over all responses and returns the first error +// encountered, if any. +func (rs DeleteRecordsResponses) Error() error { + for _, ps := range rs { + for _, r := range ps { + if r.Err != nil { + return r.Err + } + } + } + return nil +} + // DeleteRecords issues a delete records request for the given offsets. Per // offset, only the Offset field needs to be set. // @@ -498,6 +534,17 @@ func (rs CreatePartitionsResponses) On(topic string, fn func(*CreatePartitionsRe return CreatePartitionsResponse{}, kerr.UnknownTopicOrPartition } +// Error iterates over all responses and returns the first error +// encountered, if any. +func (rs CreatePartitionsResponses) Error() error { + for _, r := range rs { + if r.Err != nil { + return r.Err + } + } + return nil +} + // CreatePartitions issues a create partitions request for the given topics, // adding "add" partitions to each topic. This request lets Kafka choose where // the new partitions should be.