diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 642a9031..e56cf560 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -2,9 +2,11 @@ package subcmd import ( "context" + "fmt" "strings" "github.com/aws/aws-sdk-go/aws/session" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/cli" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" @@ -31,6 +33,14 @@ type getCmdConfig struct { var getConfig getCmdConfig +type partitionsStatusCmdConfig struct { + topics []string + status string +} + +var partitionsStatusConfig partitionsStatusCmdConfig +var partitionsStatusHelpText = "Allowed values: under-replicated, offline, preferred-leader, not-preferred-leader" + func init() { getCmd.PersistentFlags().BoolVar( &getConfig.full, @@ -55,6 +65,7 @@ func init() { partitionsCmd(), offsetsCmd(), topicsCmd(), + partitionsStatusCmd(), ) RootCmd.AddCommand(getCmd) } @@ -272,3 +283,54 @@ func topicsCmd() *cobra.Command { }, } } + +func partitionsStatusCmd() *cobra.Command { + partitionsStatusCommand := &cobra.Command{ + Use: "partitions-status", + Short: "Fetch all partitions status", + Args: cobra.MinimumNArgs(0), + RunE: func(cmd *cobra.Command, args []string) error { + ctx := context.Background() + sess := session.Must(session.NewSession()) + + adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true) + if err != nil { + return err + } + defer adminClient.Close() + + status := strings.ToUpper(partitionsStatusConfig.status) + if status != "" && !cli.IsValidPartitionStatusString(status) { + return fmt.Errorf("Invalid status flag\n%s", partitionsStatusHelpText) + } + + cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) + return cliRunner.GetPartitionsStatus( + ctx, + partitionsStatusConfig.topics, + admin.PartitionStatus(status), + getConfig.full, + ) + }, + } + + partitionsStatusCommand.Flags().StringSliceVarP( + &partitionsStatusConfig.topics, + "topics", + "t", + []string{}, + "fetch specific topics partition status (comma delimitted)", + ) + + partitionsStatusCommand.Flags().StringVarP( + &partitionsStatusConfig.status, + "status", + "s", + "", + fmt.Sprintf("Partition status\n%s", partitionsStatusHelpText), + ) + + // partitionsStatusCommand.MarkFlagRequired("status") + + return partitionsStatusCommand +} diff --git a/pkg/admin/format.go b/pkg/admin/format.go index c6b9c64f..61912cb0 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -884,3 +884,179 @@ func maxMapValues(inputMap map[int]int) int { return maxValue } + +// FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status +func FormatPartitionsByStatus( + partitionsInfoAllStatus map[string][]PartitionStatusInfo, + status PartitionStatus, + full bool, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "Partitions", + "Count", + } + colAlignment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + if full { + headers = []string{ + "Topic", + "Partition", + "Leader", + "ISR", + "Replicas", + } + colAlignment = []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(true) + table.SetColumnAlignment(colAlignment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { + if full { + for _, partitionStatusInfo := range partitionsStatusInfo { + partitionIsrs := []int{} + for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { + partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID) + } + + partitionReplicas := []int{} + for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { + partitionReplicas = append(partitionReplicas, partitionReplica.ID) + } + + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus != status { + continue + } + + row := []string{ + topicName, + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), + fmt.Sprintf("%+v", partitionIsrs), + fmt.Sprintf("%+v", partitionReplicas), + } + table.Append(row) + } + } + } else { + partitionIDs := []int{} + for _, partitionStatusInfo := range partitionsStatusInfo { + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus != status { + continue + } + partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID) + } + } + + if len(partitionIDs) == 0 { + continue + } + + row := []string{ + topicName, + fmt.Sprintf("%+v", partitionIDs), + fmt.Sprintf("%d", len(partitionIDs)), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + +// FormatPartitionsAllStatus creates a pretty table that lists all partitions status +func FormatPartitionsAllStatus( + partitionsInfoAllStatus map[string][]PartitionStatusInfo, +) string { + buf := &bytes.Buffer{} + + headers := []string{ + "Topic", + "Partition", + "Leader", + "ISR", + "Replicas", + "Status", + } + colAlignment := []int{ + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + tablewriter.ALIGN_LEFT, + } + + table := tablewriter.NewWriter(buf) + table.SetHeader(headers) + table.SetAutoWrapText(false) + table.SetColumnAlignment(colAlignment) + table.SetBorders( + tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }, + ) + + for topicName, partitionsStatusInfo := range partitionsInfoAllStatus { + for _, partitionStatusInfo := range partitionsStatusInfo { + partitionStatusIsrs := []int{} + for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { + partitionStatusIsrs = append(partitionStatusIsrs, partitionStatusIsr.ID) + } + + partitionReplicas := []int{} + for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { + partitionReplicas = append(partitionReplicas, partitionReplica.ID) + } + + partitionStatuses := []string{} + for _, partitionStatus := range partitionStatusInfo.Statuses { + partitionStatuses = append(partitionStatuses, string(partitionStatus)) + } + + row := []string{ + topicName, + fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID), + fmt.Sprintf("%+v", partitionStatusIsrs), + fmt.Sprintf("%+v", partitionReplicas), + fmt.Sprintf("%s", strings.Join(partitionStatuses, ",")), + } + + table.Append(row) + } + } + + table.Render() + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} diff --git a/pkg/admin/types.go b/pkg/admin/types.go index 0221a39b..3068f46e 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -8,6 +8,7 @@ import ( "strconv" "time" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/util" ) @@ -136,6 +137,25 @@ type zkChangeNotification struct { EntityPath string `json:"entity_path"` } +type PartitionStatus string + +const ( + UnderReplicated PartitionStatus = "UNDER-REPLICATED" + Offline PartitionStatus = "OFFLINE" + PreferredLeader PartitionStatus = "PREFERRED-LEADER" + NotPreferredLeader PartitionStatus = "NOT-PREFERRED-LEADER" +) + +type PartitionStatusInfo struct { + Topic string + Partition kafka.Partition + Statuses []PartitionStatus +} + +const ( + ListenerNotFoundError kafka.Error = 72 +) + // Addr returns the address of the current BrokerInfo. func (b BrokerInfo) Addr() string { return fmt.Sprintf("%s:%d", b.Host, b.Port) diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 7b0542c7..81b52ed1 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -14,6 +14,7 @@ import ( "github.com/briandowns/spinner" "github.com/fatih/color" + "github.com/segmentio/kafka-go" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/check" @@ -577,3 +578,249 @@ func stringsToInts(strs []string) ([]int, error) { return ints, nil } + +// Get the partitions status for the specified status +// This function displays the partitions in a Pretty format and return error for below: +// - under replicated partitions +// - offline partitions +// - non preferred leader partitions +func (c *CLIRunner) GetPartitionsStatus( + ctx context.Context, + topics []string, + status admin.PartitionStatus, + full bool, +) error { + c.startSpinner() + + metadata, err := c.GetAllTopicsMetaData(ctx) + if err != nil { + c.stopSpinner() + return err + } + + c.stopSpinner() + log.Debugf("kafka-go metadata response: %v", metadata) + + partitionsInfoAllStatus := GetPartitionsStatusInfo(topics, metadata) + log.Debugf("partitionsInfoAllStatus: %v", partitionsInfoAllStatus) + if status == "" { + c.printer( + "Partitions Status:\n%s", + admin.FormatPartitionsAllStatus(partitionsInfoAllStatus), + ) + + return nil + } + + c.printer( + "%v Partitions:\n%s", + status, + admin.FormatPartitionsByStatus( + partitionsInfoAllStatus, + status, + full, + ), + ) + + // Summary: get the count of partitions for the status + partitionsCountByStatus := 0 + for _, partitionsStatusInfo := range partitionsInfoAllStatus { + for _, partitionStatusInfo := range partitionsStatusInfo { + for _, partitionStatus := range partitionStatusInfo.Statuses { + if partitionStatus == status { + partitionsCountByStatus += 1 + } + } + } + } + + if partitionsCountByStatus > 0 && status != admin.PreferredLeader { + return fmt.Errorf("%d Partitions are %v", partitionsCountByStatus, status) + } + log.Infof("%d Partitions are %v", partitionsCountByStatus, status) + + return nil +} + +// kafka-go metadata call +func (c *CLIRunner) GetAllTopicsMetaData( + ctx context.Context, +) (*kafka.MetadataResponse, error) { + topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false) + if err != nil { + return nil, fmt.Errorf("Error fetching all topics info: %+v", err) + } + + allTopics := []string{} + for _, topicInfo := range topicsInfo { + allTopics = append(allTopics, topicInfo.Name) + } + + client := c.adminClient.GetConnector().KafkaClient + req := kafka.MetadataRequest{ + Topics: allTopics, + } + + log.Debugf("Metadata request: %+v", req) + metadata, err := client.Metadata(ctx, &req) + if err != nil { + return nil, fmt.Errorf("Error fetching topic metadata: %+v", err) + } + + return metadata, nil +} + +// This is the actual function where we fetch all the topic partitions with status +func GetPartitionsStatusInfo( + topics []string, + metadata *kafka.MetadataResponse, +) map[string][]admin.PartitionStatusInfo { + filterTopics := make(map[string]bool) + for _, topic := range topics { + filterTopics[topic] = true + } + + partitionsStatusInfo := make(map[string][]admin.PartitionStatusInfo) + for _, topicMetadata := range metadata.Topics { + if topicMetadata.Error != nil { + log.Errorf("topic metadata error: %v", topicMetadata.Error) + continue + } + + if len(topics) != 0 && !filterTopics[topicMetadata.Name] { + continue + } + + partitions := []admin.PartitionStatusInfo{} + for _, partition := range topicMetadata.Partitions { + partitionStatuses := GetPartitionStatuses(partition) + log.Debugf("Topic: %s, Partition is: %v. partition info: %v", + topicMetadata.Name, + partitionStatuses, + partition, + ) + + // kafka-go metadata call fetches the partition broker ID as 0 for partitions + // that are not found + // + // i.e + // - if a replica is missing for a partition, we get broker id as 0 + // - if a isr is missing for a partition, we still get broker id as 0 + // - if a leader is missing for a partition, we still get broker id as 0 (this is offline partition) + // + // It can be confusing to kafka-go users since broker IDs can start from 0 + // + // However, Burrow metadata call fetches missing partitions broker ID as -1 + // + // For user readability, we will modify + // - any ISR broker ID that does not have valid Host or Port from 0 to -1 + // - any Replica broker ID that does not have valid Host or Port from 0 to -1 + // - (Offline) Leader Broker ID that does not have a valid Host or Port from 0 to -1 + // + for _, status := range partitionStatuses { + if status == admin.Offline || status == admin.UnderReplicated { + if status == admin.Offline { + partition.Leader.ID = -1 + } + + for i, _ := range partition.Isr { + if partition.Isr[i].Host == "" && partition.Isr[i].Port == 0 { + partition.Isr[i].ID = -1 + } + } + + for i, _ := range partition.Replicas { + if partition.Replicas[i].Host == "" && partition.Replicas[i].Port == 0 { + partition.Replicas[i].ID = -1 + } + } + } + } + + partitions = append(partitions, admin.PartitionStatusInfo{ + Topic: topicMetadata.Name, + Partition: partition, + Statuses: partitionStatuses, + }) + } + + partitionsStatusInfo[topicMetadata.Name] = partitions + } + + return partitionsStatusInfo +} + +// Get the Partition Status +// Currently supports +// - under-replicated +// - offline +// - preferred-leader +// - not-prefered-leader +// +// NOTE: partition is +// 1. offline - if ListenerNotFound Error observed for leader partition +// 2. underreplicated - if number of isrs are lesser than the replicas +// 3. preferred leader - if the leader partition broker id is similar to first available Replicas broker id +// 4. not preferred leader - if the leader partitions broker id is not similar to first available Replicas broker id +func GetPartitionStatuses(partition kafka.Partition) []admin.PartitionStatus { + // + // NOTE: + // In general, partition error precedence is + // Offline > UnderReplicated > NotPreferredLeader + // + // BUT offline partition triumphs everything + // + // Examples: + // - An under replicated can be preferredleader or not a preferred leader + // - An offline partition is NOT under replicated since there are no isrs + // Gotcha: what if offline partition has ISRs? Not sure how to replicate this + // + + statuses := []admin.PartitionStatus{} + + // check offline. If offline, return the statuses + if partition.Leader.Host == "" && partition.Leader.Port == 0 && + admin.ListenerNotFoundError.Error() == partition.Error.Error() { + statuses = append(statuses, admin.Offline) + return statuses + } + + // check under replicated + if len(partition.Isr) < len(partition.Replicas) { + statuses = append(statuses, admin.UnderReplicated) + } + + // check preferred leader or not-preferred-leader + if len(partition.Replicas) > 0 { + firstValidReplicaID := -1 + for _, replica := range partition.Replicas { + if replica.Host == "" && replica.Port == 0 { + continue + } + + firstValidReplicaID = replica.ID + break + } + + if partition.Leader.ID == firstValidReplicaID { + statuses = append(statuses, admin.PreferredLeader) + } else { + statuses = append(statuses, admin.NotPreferredLeader) + } + } + + return statuses +} + +// returns true|false when a string status value is given. Refer type PartitionStatus +func IsValidPartitionStatusString(status string) bool { + switch status { + case string(admin.UnderReplicated), + string(admin.Offline), + string(admin.PreferredLeader), + string(admin.NotPreferredLeader): + return true + default: + return false + } +}