Skip to content

Commit

Permalink
[DP-1767] - naming conventions fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
ssingudasu committed Sep 19, 2023
1 parent 961f035 commit 3e18b3d
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 83 deletions.
37 changes: 18 additions & 19 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/fatih/color"
"github.com/olekukonko/tablewriter"
"github.com/segmentio/kafka-go"
"github.com/segmentio/topicctl/pkg/util"
)

Expand Down Expand Up @@ -888,7 +887,7 @@ func maxMapValues(inputMap map[int]int) int {

// FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status
func FormatPartitionsByStatus(
partitionsInfo map[string][]kafka.Partition,
partitionsInfoByStatus map[string][]PartitionStatusInfo,
full bool,
) string {
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -934,23 +933,23 @@ func FormatPartitionsByStatus(
},
)

for topicName, partitions := range partitionsInfo {
for topicName, partitionsStatusInfo := range partitionsInfoByStatus {
if full {
for _, partition := range partitions {
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionIsrs := []int{}
for _, partitionStatusIsr := range partition.Isr {
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partition.Replicas {
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

row := []string{
topicName,
fmt.Sprintf("%d", partition.ID),
fmt.Sprintf("%d", partition.Leader.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
}
Expand All @@ -959,8 +958,8 @@ func FormatPartitionsByStatus(
}
} else {
partitionIDs := []int{}
for _, partition := range partitions {
partitionIDs = append(partitionIDs, partition.ID)
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID)
}

row := []string{
Expand All @@ -979,7 +978,7 @@ func FormatPartitionsByStatus(

// FormatPartitionsAllStatus creates a pretty table that lists all partitions status
func FormatPartitionsAllStatus(
partitionsInfo map[string][]PartitionStatusInfo,
partitionsInfoAllStatus map[string][]PartitionStatusInfo,
) string {
buf := &bytes.Buffer{}

Expand Down Expand Up @@ -1013,27 +1012,27 @@ func FormatPartitionsAllStatus(
},
)

for topicName, partitions := range partitionsInfo {
for _, partition := range partitions {
for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionStatusIsrs := []int{}
for _, partitionStatusIsr := range partition.Partition.Isr {
for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr {
partitionStatusIsrs = append(partitionStatusIsrs, partitionStatusIsr.ID)
}

partitionReplicas := []int{}
for _, partitionReplica := range partition.Partition.Replicas {
for _, partitionReplica := range partitionStatusInfo.Partition.Replicas {
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

partitionStatuses := []string{}
for _, status := range partition.Statuses {
partitionStatuses = append(partitionStatuses, string(status))
for _, partitionStatus := range partitionStatusInfo.Statuses {
partitionStatuses = append(partitionStatuses, string(partitionStatus))
}

row := []string{
topicName,
fmt.Sprintf("%d", partition.Partition.ID),
fmt.Sprintf("%d", partition.Partition.Leader.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionStatusIsrs),
fmt.Sprintf("%+v", partitionReplicas),
fmt.Sprintf("%s", strings.Join(partitionStatuses, ",")),
Expand Down
128 changes: 64 additions & 64 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,34 +579,6 @@ func stringsToInts(strs []string) ([]int, error) {
return ints, nil
}

// kafka-go metadata call
func (c *CLIRunner) GetAllTopicsMetaData(
ctx context.Context,
) (*kafka.MetadataResponse, error) {
topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false)
if err != nil {
return nil, fmt.Errorf("Error fetching all topics info: %+v", err)
}

allTopics := []string{}
for _, topicInfo := range topicsInfo {
allTopics = append(allTopics, topicInfo.Name)
}

client := c.adminClient.GetConnector().KafkaClient
req := kafka.MetadataRequest{
Topics: allTopics,
}

log.Debugf("Metadata request: %+v", req)
metadata, err := client.Metadata(ctx, &req)
if err != nil {
return nil, fmt.Errorf("Error fetching topic metadata: %+v", err)
}

return metadata, nil
}

// Get the partitions status for the specified status
// This function displays the partitions in a Pretty format and return error for below:
// - under replicated partitions
Expand All @@ -629,57 +601,85 @@ func (c *CLIRunner) GetPartitionsStatus(
c.stopSpinner()
log.Debugf("kafka-go metadata response: %v", metadata)

partitionsInfo := getPartitionsStatus(topics, metadata)
log.Debugf("partitionsInfo: %v", partitionsInfo)
if status != "" {
partitionsInfoByStatus := make(map[string][]kafka.Partition)
partitionsCountByStatus := 0
statusFound := false

for topicName, partitions := range partitionsInfo {
statusPartitions := []kafka.Partition{}

for _, partition := range partitions {
for _, partitionStatus := range partition.Statuses {
if partitionStatus == status {
statusPartitions = append(statusPartitions, partition.Partition)
statusFound = true
}
}
}
partitionsInfoAllStatus := getPartitionsStatus(topics, metadata)
log.Debugf("partitionsInfoAllStatus: %v", partitionsInfoAllStatus)
if status == "" {
c.printer(
"Partitions Status:\n%s",
admin.FormatPartitionsAllStatus(partitionsInfoAllStatus),
)

if len(statusPartitions) == 0 {
continue
}
return nil
}

partitionsInfoByStatus[topicName] = statusPartitions
partitionsCountByStatus += len(statusPartitions)
}
partitionsInfoByStatus := make(map[string][]admin.PartitionStatusInfo)
partitionsCountByStatus := 0
foundStatus := false

c.printer(
"%v Partitions:\n%s",
status,
admin.FormatPartitionsByStatus(partitionsInfoByStatus, full),
)
for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
statusPartitions := []admin.PartitionStatusInfo{}

log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status)
for _, partitionStatusInfo := range partitionsStatusInfo {
for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus == status {
statusPartitions = append(statusPartitions, partitionStatusInfo)
foundStatus = true
}
}
}

// preferred leader is not an error condition
if statusFound && status != admin.PreferredLeader {
return fmt.Errorf("%v partitions are found for topics", status)
if len(statusPartitions) == 0 {
continue
}

return nil
partitionsInfoByStatus[topicName] = statusPartitions
partitionsCountByStatus += len(statusPartitions)
}

c.printer(
"Partitions Status:\n%s",
admin.FormatPartitionsAllStatus(partitionsInfo),
"%v Partitions:\n%s",
status,
admin.FormatPartitionsByStatus(partitionsInfoByStatus, full),
)

log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status)

// preferred leader is not an error condition
if foundStatus && status != admin.PreferredLeader {
return fmt.Errorf("%v partitions are found for topics", status)
}

return nil
}

// kafka-go metadata call
func (c *CLIRunner) GetAllTopicsMetaData(
ctx context.Context,
) (*kafka.MetadataResponse, error) {
topicsInfo, err := c.adminClient.GetTopics(ctx, nil, false)
if err != nil {
return nil, fmt.Errorf("Error fetching all topics info: %+v", err)
}

allTopics := []string{}
for _, topicInfo := range topicsInfo {
allTopics = append(allTopics, topicInfo.Name)
}

client := c.adminClient.GetConnector().KafkaClient
req := kafka.MetadataRequest{
Topics: allTopics,
}

log.Debugf("Metadata request: %+v", req)
metadata, err := client.Metadata(ctx, &req)
if err != nil {
return nil, fmt.Errorf("Error fetching topic metadata: %+v", err)
}

return metadata, nil
}

// This is the actual function where we fetch all the topic partitions with status
func getPartitionsStatus(
topics []string,
Expand Down

0 comments on commit 3e18b3d

Please sign in to comment.