From 52aa756669ddaffeb75387557a4d375019aec8f7 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Mon, 18 Sep 2023 21:07:43 -0700 Subject: [PATCH 1/9] [DP-1767] - topicctl get action partitions-status --- cmd/topicctl/subcmd/get.go | 62 ++++++++++ pkg/admin/format.go | 162 ++++++++++++++++++++++++ pkg/admin/types.go | 20 +++ pkg/cli/cli.go | 248 +++++++++++++++++++++++++++++++++++++ 4 files changed, 492 insertions(+) diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 642a9031..b7957cf9 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -2,9 +2,11 @@ package subcmd import ( "context" + "fmt" "strings" "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -31,6 +33,14 @@ type getCmdConfig struct { var getConfig getCmdConfig +type partitionsStatusCmdConfig struct { + topics []string + status string +} + +var partitionsStatusConfig partitionsStatusCmdConfig +var partitionsStatusHelpText = "Allowed values: under-replicated, offline, preferred-leder, not-preferred-leader" + func init() { getCmd.PersistentFlags().BoolVar( &getConfig.full, @@ -55,6 +65,7 @@ func init() { partitionsCmd(), offsetsCmd(), topicsCmd(), + partitionsStatusCmd(), ) RootCmd.AddCommand(getCmd) } @@ -272,3 +283,54 @@ func topicsCmd() *cobra.Command { }, } } + +func partitionsStatusCmd() *cobra.Command { + partitionsStatusCommand := &cobra.Command{ + Use: "partitions-status", + Short: "Fetch all partitions status", + Args: cobra.MinimumNArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + status := strings.ToUpper(partitionsStatusConfig.status) + if status != "" && !cli.IsValidPartitionStatusString(status) { + return fmt.Errorf("Invalid status flag\n%s", partitionsStatusHelpText) + } + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetPartitionsStatus( + ctx, + partitionsStatusConfig.topics, + admin.PartitionStatus(status), + getConfig.full, + ) + }, + } + + partitionsStatusCommand.Flags().StringSliceVarP( + &partitionsStatusConfig.topics, + "topics", + "t", + []string{}, + "fetch specific topics partition status", + ) + + partitionsStatusCommand.Flags().StringVarP( + &partitionsStatusConfig.status, + "status", + "s", + "", + fmt.Sprintf("Partition status\n%s", partitionsStatusHelpText), + ) + + // partitionsStatusCommand.MarkFlagRequired("status") + + return partitionsStatusCommand +} diff --git a/pkg/admin/format.go b/pkg/admin/format.go index c6b9c64f..305ddd92 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -11,6 +11,7 @@ import ( "github.com/fatih/color" "github.com/olekukonko/tablewriter" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" ) @@ -884,3 +885,164 @@ func maxMapValues(inputMap map[int]int) int { return maxValue } + +// FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status +func FormatPartitionsByStatus( + partitionsInfo map[string][]kafka.Partition, + full bool, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "Partitions", + "Count", + } + colAlignment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + if full { + headers = []string{ + "Topic", + "Partition", + "Leader", + "ISR", + "Replicas", + } + colAlignment = []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment(colAlignment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for topicName, partitions := range partitionsInfo { + if full { + for _, partition := range partitions { + partitionIsrs := []int{} + for _, partitionStatusIsr := range partition.Isr { + partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID) + } + + partitionReplicas := []int{} + for _, partitionReplica := range partition.Replicas { + partitionReplicas = append(partitionReplicas, partitionReplica.ID) + } + + row := []string{ + topicName, + fmt.Sprintf("%d", partition.ID), + fmt.Sprintf("%d", partition.Leader.ID), + fmt.Sprintf("%+v", partitionIsrs), + fmt.Sprintf("%+v", partitionReplicas), + } + + table.Append(row) + } + } else { + partitionIDs := []int{} + for _, partition := range partitions { + partitionIDs = append(partitionIDs, partition.ID) + } + + row := []string{ + topicName, + fmt.Sprintf("%+v", partitionIDs), + fmt.Sprintf("%d", len(partitionIDs)), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + +// FormatPartitionsAllStatus creates a pretty table that lists all partitions status +func FormatPartitionsAllStatus( + partitionsInfo map[string][]PartitionStatusInfo, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "Partition", + "Leader", + "ISR", + "Replicas", + "Status", + } + colAlignment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment(colAlignment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for topicName, partitions := range partitionsInfo { + for _, partition := range partitions { + partitionStatusIsrs := []int{} + for _, partitionStatusIsr := range partition.Partition.Isr { + partitionStatusIsrs = append(partitionStatusIsrs, partitionStatusIsr.ID) + } + + partitionReplicas := []int{} + for _, partitionReplica := range partition.Partition.Replicas { + partitionReplicas = append(partitionReplicas, partitionReplica.ID) + } + + partitionStatuses := []string{} + for _, status := range partition.Statuses { + partitionStatuses = append(partitionStatuses, string(status)) + } + + row := []string{ + topicName, + fmt.Sprintf("%d", partition.Partition.ID), + fmt.Sprintf("%d", partition.Partition.Leader.ID), + fmt.Sprintf("%+v", partitionStatusIsrs), + fmt.Sprintf("%+v", partitionReplicas), + fmt.Sprintf("%s", strings.Join(partitionStatuses, ",")), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 0221a39b..3068f46e 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" ) @@ -136,6 +137,25 @@ type zkChangeNotification struct { EntityPath string `json:"entity_path"` } +type PartitionStatus string + +const ( + UnderReplicated PartitionStatus = "UNDER-REPLICATED" + Offline PartitionStatus = "OFFLINE" + PreferredLeader PartitionStatus = "PREFERRED-LEADER" + NotPreferredLeader PartitionStatus = "NOT-PREFERRED-LEADER" +) + +type PartitionStatusInfo struct { + Topic string + Partition kafka.Partition + Statuses []PartitionStatus +} + +const ( + ListenerNotFoundError kafka.Error = 72 +) + // Addr returns the address of the current BrokerInfo. func (b BrokerInfo) Addr() string { return fmt.Sprintf("%s:%d", b.Host, b.Port) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 7b0542c7..2274037c 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -14,6 +14,7 @@ import ( "github.com/briandowns/spinner" "github.com/fatih/color" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" @@ -577,3 +578,250 @@ func stringsToInts(strs []string) ([]int, error) { return ints, nil } + +// kafka-go metadata call +func (c *CLIRunner) GetAllTopicsMetaData( + ctx context.Context, +) (*kafka.MetadataResponse, error) { + topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) + if err != nil { + return nil, fmt.Errorf("Error fetching all topics info: %+v", err) + } + + allTopics := []string{} + for _, topicInfo := range topicsInfo { + allTopics = append(allTopics, topicInfo.Name) + } + + client := c.adminClient.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: allTopics, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + return nil, fmt.Errorf("Error fetching topic metadata: %+v", err) + } + + return metadata, nil +} + +// Get the partitions status for the specified status +// This function displays the partitions in a Pretty format. +// This functions prints and return error if +// - under replicated partitions status +// - offline partitions status +// - non preferred leader partitions status +// - unknown partitions status +func (c *CLIRunner) GetPartitionsStatus( + ctx context.Context, + topics []string, + status admin.PartitionStatus, + full bool, +) error { + c.startSpinner() + + metadata, err := c.GetAllTopicsMetaData(ctx) + if err != nil { + c.stopSpinner() + return err + } + + c.stopSpinner() + log.Debugf("kafka-go metadata response: %v", metadata) + + partitionsInfo := getPartitionsStatus(topics, metadata) + log.Debugf("partitionsInfo: %v", partitionsInfo) + if status != "" { + partitionsInfoByStatus := make(map[string][]kafka.Partition) + partitionsCountByStatus := 0 + found := false + + for topicName, partitions := range partitionsInfo { + statusPartitions := []kafka.Partition{} + + for _, partition := range partitions { + for _, partitionStatus := range partition.Statuses { + if partitionStatus == status { + statusPartitions = append(statusPartitions, partition.Partition) + found = true + } + } + } + + if len(statusPartitions) == 0 { + continue + } + + partitionsInfoByStatus[topicName] = statusPartitions + partitionsCountByStatus += len(statusPartitions) + } + + c.printer( + "%v Partitions:\n%s", + status, + admin.FormatPartitionsByStatus(partitionsInfoByStatus, full), + ) + + log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status) + + if found && status != admin.PreferredLeader { + return fmt.Errorf("%v partitions are found for topics", status) + } + + return nil + } + + c.printer( + "Partitions Status:\n%s", + admin.FormatPartitionsAllStatus(partitionsInfo), + ) + + return nil +} + +// This is the actual function where we fetch all the topic partitions with status +func getPartitionsStatus( + topics []string, + metadata *kafka.MetadataResponse, +) map[string][]admin.PartitionStatusInfo { + filterTopics := make(map[string]bool) + for _, topic := range topics { + filterTopics[topic] = true + } + + partitionsStatusInfo := make(map[string][]admin.PartitionStatusInfo) + for _, topicMetadata := range metadata.Topics { + if topicMetadata.Error != nil { + log.Errorf("topic metadata error: %v", topicMetadata.Error) + continue + } + + if len(topics) != 0 && !filterTopics[topicMetadata.Name] { + continue + } + + partitions := []admin.PartitionStatusInfo{} + for _, partition := range topicMetadata.Partitions { + partitionStatuses := GetPartitionStatuses(partition) + log.Debugf("Topic: %s, Partition is: %v. partition info: %v", + topicMetadata.Name, + partitionStatuses, + partition, + ) + + // kafka-go metadata call fetches the partition broker ID as 0 for partitions + // that are not found + // + // i.e + // - if a replica is missing for a partition, we get broker id as 0 + // - if a isr is missing for a partition, we still get broker id as 0 + // - if a leader is missing for a partition, we still get broker id as 0 (this is offline partition) + // + // It can be confusing to kafka-go users since broker IDs can start from 0 + // + // However, Burrow metadata call fetches missing partitions broker ID as -1 + // + // For user readability, we will modify + // - any ISR broker ID that does not have valid Host or Port from 0 to -1 + // - any Replica broker ID that does not have valid Host or Port from 0 to -1 + // - (Offline) Leader Broker ID that does not have a valid Host or Port from 0 to -1 + // + for _, status := range partitionStatuses { + if status == admin.Offline || status == admin.UnderReplicated { + if status == admin.Offline { + partition.Leader.ID = -1 + } + + for i, _ := range partition.Isr { + if partition.Isr[i].Host == "" && partition.Isr[i].Port == 0 { + partition.Isr[i].ID = -1 + } + } + + for i, _ := range partition.Replicas { + if partition.Replicas[i].Host == "" && partition.Replicas[i].Port == 0 { + partition.Replicas[i].ID = -1 + } + } + } + } + + partitions = append(partitions, admin.PartitionStatusInfo{ + Topic: topicMetadata.Name, + Partition: partition, + Statuses: partitionStatuses, + }) + } + + partitionsStatusInfo[topicMetadata.Name] = partitions + } + + return partitionsStatusInfo +} + +// Get the Partition Status +// Currently supports +// - under-replicated +// - offline +// - preferred-leader +// - not-prefered-leader +// +// NOTE: partition is +// 1. offline - if ListenerNotFound Error observed for leader partition +// 2. underreplicated - if number of isrs are lesser than the replicas +// 3. preferred leader - if the leader partition broker id is similar to first isr broker id +// 4. not preferred leader - if the leader partitions broker id is not similar to first isr broker id +func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { + // + // NOTE: + // In general, partition error precedence is + // Offline > UnderReplicated > PreferredLeader > NotPreferredLeader + // + // BUT offline partition triumps everything + // + // Examples: + // - An under replicated can be preferredleader or not a preferred leader + // - An offline partition is NOT under replicated since there are no isrs + // Gotcha: what if offline partition has ISRs? Not sure how to replicate this + // + statuses := []admin.PartitionStatus{} + + // check offline. If offline, return the statuses + if partition.Leader.Host == "" && partition.Leader.Port == 0 && + admin.ListenerNotFoundError.Error() == partition.Error.Error() { + statuses = append(statuses, admin.Offline) + return statuses + } + + // check under replicated + if len(partition.Isr) < len(partition.Replicas) { + statuses = append(statuses, admin.UnderReplicated) + } + + // check preferred leader + if len(partition.Isr) > 0 && partition.Leader.ID == partition.Isr[0].ID { + statuses = append(statuses, admin.PreferredLeader) + } + + // check not preferred leader + if len(partition.Isr) > 0 && partition.Leader.ID != partition.Isr[0].ID { + statuses = append(statuses, admin.NotPreferredLeader) + } + + return statuses +} + +// returns true|false when a string status value is given. Refer type PartitionStatus +func IsValidPartitionStatusString(status string) bool { + switch status { + case string(admin.UnderReplicated), + string(admin.Offline), + string(admin.PreferredLeader), + string(admin.NotPreferredLeader): + return true + default: + return false + } +} From ebea876a27538c7fc32d7d1544f4adefd9293bd3 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Mon, 18 Sep 2023 22:16:13 -0700 Subject: [PATCH 2/9] [DP-1767] - topicctl get action partitions-status --- cmd/topicctl/subcmd/get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index b7957cf9..29aba1ea 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -319,7 +319,7 @@ func partitionsStatusCmd() *cobra.Command { "topics", "t", []string{}, - "fetch specific topics partition status", + "fetch specific topics partition status (comma delimitted)", ) partitionsStatusCommand.Flags().StringVarP( From dd9a7b6eb6b195199c33a60054b2d317fcc207c8 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Mon, 18 Sep 2023 22:24:12 -0700 Subject: [PATCH 3/9] [DP-1767] - topicctl get action partitions-status --- cmd/topicctl/subcmd/get.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 29aba1ea..e56cf560 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -39,7 +39,7 @@ type partitionsStatusCmdConfig struct { } var partitionsStatusConfig partitionsStatusCmdConfig -var partitionsStatusHelpText = "Allowed values: under-replicated, offline, preferred-leder, not-preferred-leader" +var partitionsStatusHelpText = "Allowed values: under-replicated, offline, preferred-leader, not-preferred-leader" func init() { getCmd.PersistentFlags().BoolVar( From a35b80c4700d248b1951b19d792c1ef53b104ba5 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Mon, 18 Sep 2023 22:27:33 -0700 Subject: [PATCH 4/9] [DP-1767] - topicctl get action partitions-status --- pkg/cli/cli.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 2274037c..fbd1d230 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -777,15 +777,16 @@ func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { // // NOTE: // In general, partition error precedence is - // Offline > UnderReplicated > PreferredLeader > NotPreferredLeader + // Offline > UnderReplicated > NotPreferredLeader // - // BUT offline partition triumps everything + // BUT offline partition triumphs everything // // Examples: // - An under replicated can be preferredleader or not a preferred leader // - An offline partition is NOT under replicated since there are no isrs // Gotcha: what if offline partition has ISRs? Not sure how to replicate this // + statuses := []admin.PartitionStatus{} // check offline. If offline, return the statuses From ecd6c127239bc6e1ad2f13ecce36b0b71b8e13bd Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 19 Sep 2023 00:20:05 -0700 Subject: [PATCH 5/9] [DP-1767] - topicctl get action partitions-status --- pkg/admin/format.go | 2 +- pkg/cli/cli.go | 44 ++++++++++++++++++++++++++------------------ 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/pkg/admin/format.go b/pkg/admin/format.go index 305ddd92..f7f748df 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -923,7 +923,7 @@ func FormatPartitionsByStatus( table := tablewriter.NewWriter(buf) table.SetHeader(headers) - table.SetAutoWrapText(false) + table.SetAutoWrapText(true) table.SetColumnAlignment(colAlignment) table.SetBorders( tablewriter.Border{ diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index fbd1d230..86c4f3e1 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -608,12 +608,10 @@ func (c *CLIRunner) GetAllTopicsMetaData( } // Get the partitions status for the specified status -// This function displays the partitions in a Pretty format. -// This functions prints and return error if -// - under replicated partitions status -// - offline partitions status -// - non preferred leader partitions status -// - unknown partitions status +// This function displays the partitions in a Pretty format and return error for below: +// - under replicated partitions +// - offline partitions +// - non preferred leader partitions func (c *CLIRunner) GetPartitionsStatus( ctx context.Context, topics []string, @@ -636,7 +634,7 @@ func (c *CLIRunner) GetPartitionsStatus( if status != "" { partitionsInfoByStatus := make(map[string][]kafka.Partition) partitionsCountByStatus := 0 - found := false + statusFound := false for topicName, partitions := range partitionsInfo { statusPartitions := []kafka.Partition{} @@ -645,7 +643,7 @@ func (c *CLIRunner) GetPartitionsStatus( for _, partitionStatus := range partition.Statuses { if partitionStatus == status { statusPartitions = append(statusPartitions, partition.Partition) - found = true + statusFound = true } } } @@ -666,7 +664,8 @@ func (c *CLIRunner) GetPartitionsStatus( log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status) - if found && status != admin.PreferredLeader { + // preferred leader is not an error condition + if statusFound && status != admin.PreferredLeader { return fmt.Errorf("%v partitions are found for topics", status) } @@ -771,8 +770,8 @@ func getPartitionsStatus( // NOTE: partition is // 1. offline - if ListenerNotFound Error observed for leader partition // 2. underreplicated - if number of isrs are lesser than the replicas -// 3. preferred leader - if the leader partition broker id is similar to first isr broker id -// 4. not preferred leader - if the leader partitions broker id is not similar to first isr broker id +// 3. preferred leader - if the leader partition broker id is similar to first valid Replicas broker id +// 4. not preferred leader - if the leader partitions broker id is not similar to first valid Replicas broker id func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { // // NOTE: @@ -801,14 +800,23 @@ func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { statuses = append(statuses, admin.UnderReplicated) } - // check preferred leader - if len(partition.Isr) > 0 && partition.Leader.ID == partition.Isr[0].ID { - statuses = append(statuses, admin.PreferredLeader) - } + // check preferred leader or not-preferred-leader + if len(partition.Replicas) > 0 { + firstValidReplicaID := -1 + for _, replica := range partition.Replicas { + if replica.Host == "" && replica.Port == 0 { + continue + } - // check not preferred leader - if len(partition.Isr) > 0 && partition.Leader.ID != partition.Isr[0].ID { - statuses = append(statuses, admin.NotPreferredLeader) + firstValidReplicaID = replica.ID + break + } + + if partition.Leader.ID == firstValidReplicaID { + statuses = append(statuses, admin.PreferredLeader) + } else { + statuses = append(statuses, admin.NotPreferredLeader) + } } return statuses From 961f0352eed67cd02dfbfa3e4a676e66004068ce Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 19 Sep 2023 00:24:15 -0700 Subject: [PATCH 6/9] [DP-1767] - topicctl get action partitions-status --- pkg/cli/cli.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 86c4f3e1..0d3af890 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -770,8 +770,8 @@ func getPartitionsStatus( // NOTE: partition is // 1. offline - if ListenerNotFound Error observed for leader partition // 2. underreplicated - if number of isrs are lesser than the replicas -// 3. preferred leader - if the leader partition broker id is similar to first valid Replicas broker id -// 4. not preferred leader - if the leader partitions broker id is not similar to first valid Replicas broker id +// 3. preferred leader - if the leader partition broker id is similar to first available Replicas broker id +// 4. not preferred leader - if the leader partitions broker id is not similar to first available Replicas broker id func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { // // NOTE: From 3e18b3d360ec04f5edaf4bb0e8f3d456f7a14abf Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 19 Sep 2023 01:04:47 -0700 Subject: [PATCH 7/9] [DP-1767] - naming conventions fixtures --- pkg/admin/format.go | 37 +++++++------ pkg/cli/cli.go | 128 ++++++++++++++++++++++---------------------- 2 files changed, 82 insertions(+), 83 deletions(-) diff --git a/pkg/admin/format.go b/pkg/admin/format.go index f7f748df..65726488 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -11,7 +11,6 @@ import ( "github.com/fatih/color" "github.com/olekukonko/tablewriter" - "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" ) @@ -888,7 +887,7 @@ func maxMapValues(inputMap map[int]int) int { // FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status func FormatPartitionsByStatus( - partitionsInfo map[string][]kafka.Partition, + partitionsInfoByStatus map[string][]PartitionStatusInfo, full bool, ) string { buf := &bytes.Buffer{} @@ -934,23 +933,23 @@ func FormatPartitionsByStatus( }, ) - for topicName, partitions := range partitionsInfo { + for topicName, partitionsStatusInfo := range partitionsInfoByStatus { if full { - for _, partition := range partitions { + for _, partitionStatusInfo := range partitionsStatusInfo { partitionIsrs := []int{} - for _, partitionStatusIsr := range partition.Isr { + for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID) } partitionReplicas := []int{} - for _, partitionReplica := range partition.Replicas { + for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { partitionReplicas = append(partitionReplicas, partitionReplica.ID) } row := []string{ topicName, - fmt.Sprintf("%d", partition.ID), - fmt.Sprintf("%d", partition.Leader.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), fmt.Sprintf("%+v", partitionIsrs), fmt.Sprintf("%+v", partitionReplicas), } @@ -959,8 +958,8 @@ func FormatPartitionsByStatus( } } else { partitionIDs := []int{} - for _, partition := range partitions { - partitionIDs = append(partitionIDs, partition.ID) + for _, partitionStatusInfo := range partitionsStatusInfo { + partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID) } row := []string{ @@ -979,7 +978,7 @@ func FormatPartitionsByStatus( // FormatPartitionsAllStatus creates a pretty table that lists all partitions status func FormatPartitionsAllStatus( - partitionsInfo map[string][]PartitionStatusInfo, + partitionsInfoAllStatus map[string][]PartitionStatusInfo, ) string { buf := &bytes.Buffer{} @@ -1013,27 +1012,27 @@ func FormatPartitionsAllStatus( }, ) - for topicName, partitions := range partitionsInfo { - for _, partition := range partitions { + for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { + for _, partitionStatusInfo := range partitionsStatusInfo { partitionStatusIsrs := []int{} - for _, partitionStatusIsr := range partition.Partition.Isr { + for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { partitionStatusIsrs = append(partitionStatusIsrs, partitionStatusIsr.ID) } partitionReplicas := []int{} - for _, partitionReplica := range partition.Partition.Replicas { + for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { partitionReplicas = append(partitionReplicas, partitionReplica.ID) } partitionStatuses := []string{} - for _, status := range partition.Statuses { - partitionStatuses = append(partitionStatuses, string(status)) + for _, partitionStatus := range partitionStatusInfo.Statuses { + partitionStatuses = append(partitionStatuses, string(partitionStatus)) } row := []string{ topicName, - fmt.Sprintf("%d", partition.Partition.ID), - fmt.Sprintf("%d", partition.Partition.Leader.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), fmt.Sprintf("%+v", partitionStatusIsrs), fmt.Sprintf("%+v", partitionReplicas), fmt.Sprintf("%s", strings.Join(partitionStatuses, ",")), diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 0d3af890..10c2b628 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -579,34 +579,6 @@ func stringsToInts(strs []string) ([]int, error) { return ints, nil } -// kafka-go metadata call -func (c *CLIRunner) GetAllTopicsMetaData( - ctx context.Context, -) (*kafka.MetadataResponse, error) { - topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) - if err != nil { - return nil, fmt.Errorf("Error fetching all topics info: %+v", err) - } - - allTopics := []string{} - for _, topicInfo := range topicsInfo { - allTopics = append(allTopics, topicInfo.Name) - } - - client := c.adminClient.GetConnector().KafkaClient - req := kafka.MetadataRequest{ - Topics: allTopics, - } - - log.Debugf("Metadata request: %+v", req) - metadata, err := client.Metadata(ctx, &req) - if err != nil { - return nil, fmt.Errorf("Error fetching topic metadata: %+v", err) - } - - return metadata, nil -} - // Get the partitions status for the specified status // This function displays the partitions in a Pretty format and return error for below: // - under replicated partitions @@ -629,57 +601,85 @@ func (c *CLIRunner) GetPartitionsStatus( c.stopSpinner() log.Debugf("kafka-go metadata response: %v", metadata) - partitionsInfo := getPartitionsStatus(topics, metadata) - log.Debugf("partitionsInfo: %v", partitionsInfo) - if status != "" { - partitionsInfoByStatus := make(map[string][]kafka.Partition) - partitionsCountByStatus := 0 - statusFound := false - - for topicName, partitions := range partitionsInfo { - statusPartitions := []kafka.Partition{} - - for _, partition := range partitions { - for _, partitionStatus := range partition.Statuses { - if partitionStatus == status { - statusPartitions = append(statusPartitions, partition.Partition) - statusFound = true - } - } - } + partitionsInfoAllStatus := getPartitionsStatus(topics, metadata) + log.Debugf("partitionsInfoAllStatus: %v", partitionsInfoAllStatus) + if status == "" { + c.printer( + "Partitions Status:\n%s", + admin.FormatPartitionsAllStatus(partitionsInfoAllStatus), + ) - if len(statusPartitions) == 0 { - continue - } + return nil + } - partitionsInfoByStatus[topicName] = statusPartitions - partitionsCountByStatus += len(statusPartitions) - } + partitionsInfoByStatus := make(map[string][]admin.PartitionStatusInfo) + partitionsCountByStatus := 0 + foundStatus := false - c.printer( - "%v Partitions:\n%s", - status, - admin.FormatPartitionsByStatus(partitionsInfoByStatus, full), - ) + for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { + statusPartitions := []admin.PartitionStatusInfo{} - log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status) + for _, partitionStatusInfo := range partitionsStatusInfo { + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus == status { + statusPartitions = append(statusPartitions, partitionStatusInfo) + foundStatus = true + } + } + } - // preferred leader is not an error condition - if statusFound && status != admin.PreferredLeader { - return fmt.Errorf("%v partitions are found for topics", status) + if len(statusPartitions) == 0 { + continue } - return nil + partitionsInfoByStatus[topicName] = statusPartitions + partitionsCountByStatus += len(statusPartitions) } c.printer( - "Partitions Status:\n%s", - admin.FormatPartitionsAllStatus(partitionsInfo), + "%v Partitions:\n%s", + status, + admin.FormatPartitionsByStatus(partitionsInfoByStatus, full), ) + log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status) + + // preferred leader is not an error condition + if foundStatus && status != admin.PreferredLeader { + return fmt.Errorf("%v partitions are found for topics", status) + } + return nil } +// kafka-go metadata call +func (c *CLIRunner) GetAllTopicsMetaData( + ctx context.Context, +) (*kafka.MetadataResponse, error) { + topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) + if err != nil { + return nil, fmt.Errorf("Error fetching all topics info: %+v", err) + } + + allTopics := []string{} + for _, topicInfo := range topicsInfo { + allTopics = append(allTopics, topicInfo.Name) + } + + client := c.adminClient.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: allTopics, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + return nil, fmt.Errorf("Error fetching topic metadata: %+v", err) + } + + return metadata, nil +} + // This is the actual function where we fetch all the topic partitions with status func getPartitionsStatus( topics []string, From e131d6f1ec1a83191a1a592fa9b40afa610f20cc Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 19 Sep 2023 01:38:48 -0700 Subject: [PATCH 8/9] [DP-1767] - minor fixtures --- pkg/admin/format.go | 37 ++++++++++++++++++++++++++----------- pkg/cli/cli.go | 42 ++++++++++++++++-------------------------- 2 files changed, 42 insertions(+), 37 deletions(-) diff --git a/pkg/admin/format.go b/pkg/admin/format.go index 65726488..61912cb0 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -887,7 +887,8 @@ func maxMapValues(inputMap map[int]int) int { // FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status func FormatPartitionsByStatus( - partitionsInfoByStatus map[string][]PartitionStatusInfo, + partitionsInfoAllStatus map[string][]PartitionStatusInfo, + status PartitionStatus, full bool, ) string { buf := &bytes.Buffer{} @@ -933,7 +934,7 @@ func FormatPartitionsByStatus( }, ) - for topicName, partitionsStatusInfo := range partitionsInfoByStatus { + for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { if full { for _, partitionStatusInfo := range partitionsStatusInfo { partitionIsrs := []int{} @@ -946,20 +947,34 @@ func FormatPartitionsByStatus( partitionReplicas = append(partitionReplicas, partitionReplica.ID) } - row := []string{ - topicName, - fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), - fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), - fmt.Sprintf("%+v", partitionIsrs), - fmt.Sprintf("%+v", partitionReplicas), + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus != status { + continue + } + + row := []string{ + topicName, + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), + fmt.Sprintf("%+v", partitionIsrs), + fmt.Sprintf("%+v", partitionReplicas), + } + table.Append(row) } - - table.Append(row) } } else { partitionIDs := []int{} for _, partitionStatusInfo := range partitionsStatusInfo { - partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID) + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus != status { + continue + } + partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID) + } + } + + if len(partitionIDs) == 0 { + continue } row := []string{ diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 10c2b628..52995b39 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -612,42 +612,32 @@ func (c *CLIRunner) GetPartitionsStatus( return nil } - partitionsInfoByStatus := make(map[string][]admin.PartitionStatusInfo) - partitionsCountByStatus := 0 - foundStatus := false - - for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { - statusPartitions := []admin.PartitionStatusInfo{} + c.printer( + "%v Partitions:\n%s", + status, + admin.FormatPartitionsByStatus( + partitionsInfoAllStatus, + status, + full, + ), + ) + // Summary: get the count of partitions for the status + partitionsCountByStatus := 0 + for _, partitionsStatusInfo := range partitionsInfoAllStatus { for _, partitionStatusInfo := range partitionsStatusInfo { for _, partitionStatus := range partitionStatusInfo.Statuses { if partitionStatus == status { - statusPartitions = append(statusPartitions, partitionStatusInfo) - foundStatus = true + partitionsCountByStatus += 1 } } } - - if len(statusPartitions) == 0 { - continue - } - - partitionsInfoByStatus[topicName] = statusPartitions - partitionsCountByStatus += len(statusPartitions) } - c.printer( - "%v Partitions:\n%s", - status, - admin.FormatPartitionsByStatus(partitionsInfoByStatus, full), - ) - - log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status) - - // preferred leader is not an error condition - if foundStatus && status != admin.PreferredLeader { - return fmt.Errorf("%v partitions are found for topics", status) + if partitionsCountByStatus > 0 && status != admin.PreferredLeader { + return fmt.Errorf("%d Partitions are %v", partitionsCountByStatus, status) } + log.Infof("%d Partitions are %v", partitionsCountByStatus, status) return nil } From b2ece1cf5f19934e2992181c5d14151e23f49725 Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu Date: Tue, 19 Sep 2023 01:47:27 -0700 Subject: [PATCH 9/9] [DP-1767] - minor fixtures --- pkg/cli/cli.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 52995b39..81b52ed1 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -601,7 +601,7 @@ func (c *CLIRunner) GetPartitionsStatus( c.stopSpinner() log.Debugf("kafka-go metadata response: %v", metadata) - partitionsInfoAllStatus := getPartitionsStatus(topics, metadata) + partitionsInfoAllStatus := GetPartitionsStatusInfo(topics, metadata) log.Debugf("partitionsInfoAllStatus: %v", partitionsInfoAllStatus) if status == "" { c.printer( @@ -671,7 +671,7 @@ func (c *CLIRunner) GetAllTopicsMetaData( } // This is the actual function where we fetch all the topic partitions with status -func getPartitionsStatus( +func GetPartitionsStatusInfo( topics []string, metadata *kafka.MetadataResponse, ) map[string][]admin.PartitionStatusInfo {