Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DP-1767] - topicctl get action partitions-status #152

Closed
62 changes: 62 additions & 0 deletions cmd/topicctl/subcmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package subcmd

import (
"context"
"fmt"
"strings"

"github.com/aws/aws-sdk-go/aws/session"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/cli"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
Expand All @@ -31,6 +33,14 @@ type getCmdConfig struct {

var getConfig getCmdConfig

type partitionsStatusCmdConfig struct {
topics []string
status string
}

var partitionsStatusConfig partitionsStatusCmdConfig
var partitionsStatusHelpText = "Allowed values: under-replicated, offline, preferred-leader, not-preferred-leader"

func init() {
getCmd.PersistentFlags().BoolVar(
&getConfig.full,
Expand All @@ -55,6 +65,7 @@ func init() {
partitionsCmd(),
offsetsCmd(),
topicsCmd(),
partitionsStatusCmd(),
)
RootCmd.AddCommand(getCmd)
}
Expand Down Expand Up @@ -272,3 +283,54 @@ func topicsCmd() *cobra.Command {
},
}
}

func partitionsStatusCmd() *cobra.Command {
partitionsStatusCommand := &cobra.Command{
Use: "partitions-status",
Short: "Fetch all partitions status",
Args: cobra.MinimumNArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
sess := session.Must(session.NewSession())

adminClient, err := getConfig.shared.getAdminClient(ctx, sess, true)
if err != nil {
return err
}
defer adminClient.Close()

status := strings.ToUpper(partitionsStatusConfig.status)
if status != "" && !cli.IsValidPartitionStatusString(status) {
return fmt.Errorf("Invalid status flag\n%s", partitionsStatusHelpText)
}

cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner)
return cliRunner.GetPartitionsStatus(
ctx,
partitionsStatusConfig.topics,
admin.PartitionStatus(status),
getConfig.full,
)
},
}

partitionsStatusCommand.Flags().StringSliceVarP(
&partitionsStatusConfig.topics,
"topics",
"t",
[]string{},
"fetch specific topics partition status (comma delimitted)",
)

partitionsStatusCommand.Flags().StringVarP(
&partitionsStatusConfig.status,
"status",
"s",
"",
fmt.Sprintf("Partition status\n%s", partitionsStatusHelpText),
)

// partitionsStatusCommand.MarkFlagRequired("status")

return partitionsStatusCommand
}
176 changes: 176 additions & 0 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,3 +884,179 @@ func maxMapValues(inputMap map[int]int) int {

return maxValue
}

// FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status
func FormatPartitionsByStatus(
partitionsInfoAllStatus map[string][]PartitionStatusInfo,
status PartitionStatus,
full bool,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"Partitions",
"Count",
}
colAlignment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

if full {
headers = []string{
"Topic",
"Partition",
"Leader",
"ISR",
"Replicas",
}
colAlignment = []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(true)
table.SetColumnAlignment(colAlignment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
if full {
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionIsrs := []int{}
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus != status {
continue
}

row := []string{
topicName,
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
}
table.Append(row)
}
}
} else {
partitionIDs := []int{}
for _, partitionStatusInfo := range partitionsStatusInfo {
for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus != status {
continue
}
partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID)
}
}

if len(partitionIDs) == 0 {
continue
}

row := []string{
topicName,
fmt.Sprintf("%+v", partitionIDs),
fmt.Sprintf("%d", len(partitionIDs)),
}

table.Append(row)
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}

// FormatPartitionsAllStatus creates a pretty table that lists all partitions status
func FormatPartitionsAllStatus(
partitionsInfoAllStatus map[string][]PartitionStatusInfo,
) string {
buf := &bytes.Buffer{}

headers := []string{
"Topic",
"Partition",
"Leader",
"ISR",
"Replicas",
"Status",
}
colAlignment := []int{
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
tablewriter.ALIGN_LEFT,
}

table := tablewriter.NewWriter(buf)
table.SetHeader(headers)
table.SetAutoWrapText(false)
table.SetColumnAlignment(colAlignment)
table.SetBorders(
tablewriter.Border{
Left: false,
Top: true,
Right: false,
Bottom: true,
},
)

for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionStatusIsrs := []int{}
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionStatusIsrs = append(partitionStatusIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

partitionStatuses := []string{}
for _, partitionStatus := range partitionStatusInfo.Statuses {
partitionStatuses = append(partitionStatuses, string(partitionStatus))
}

row := []string{
topicName,
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionStatusIsrs),
fmt.Sprintf("%+v", partitionReplicas),
fmt.Sprintf("%s", strings.Join(partitionStatuses, ",")),
}

table.Append(row)
}
}

table.Render()
return string(bytes.TrimRight(buf.Bytes(), "\n"))
}
20 changes: 20 additions & 0 deletions pkg/admin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/topicctl/pkg/util"
)

Expand Down Expand Up @@ -136,6 +137,25 @@ type zkChangeNotification struct {
EntityPath string `json:"entity_path"`
}

type PartitionStatus string

const (
UnderReplicated PartitionStatus = "UNDER-REPLICATED"
Offline PartitionStatus = "OFFLINE"
PreferredLeader PartitionStatus = "PREFERRED-LEADER"
NotPreferredLeader PartitionStatus = "NOT-PREFERRED-LEADER"
)

type PartitionStatusInfo struct {
Topic string
Partition kafka.Partition
Statuses []PartitionStatus
}

const (
ListenerNotFoundError kafka.Error = 72
)

// Addr returns the address of the current BrokerInfo.
func (b BrokerInfo) Addr() string {
return fmt.Sprintf("%s:%d", b.Host, b.Port)
Expand Down
Loading
Loading