Skip to content

Commit

Permalink
[DP-1767] - minor fixtures
Browse files Browse the repository at this point in the history
  • Loading branch information
ssingudasu committed Sep 19, 2023
1 parent 3e18b3d commit e131d6f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
37 changes: 26 additions & 11 deletions pkg/admin/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,8 @@ func maxMapValues(inputMap map[int]int) int {

// FormatPartitionsByStatus creates a pretty table that lists the details of the partitions by status
func FormatPartitionsByStatus(
partitionsInfoByStatus map[string][]PartitionStatusInfo,
partitionsInfoAllStatus map[string][]PartitionStatusInfo,
status PartitionStatus,
full bool,
) string {
buf := &bytes.Buffer{}
Expand Down Expand Up @@ -933,7 +934,7 @@ func FormatPartitionsByStatus(
},
)

for topicName, partitionsStatusInfo := range partitionsInfoByStatus {
for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
if full {
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionIsrs := []int{}
Expand All @@ -946,20 +947,34 @@ func FormatPartitionsByStatus(
partitionReplicas = append(partitionReplicas, partitionReplica.ID)
}

row := []string{
topicName,
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus != status {
continue
}

row := []string{
topicName,
fmt.Sprintf("%d", partitionStatusInfo.Partition.ID),
fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID),
fmt.Sprintf("%+v", partitionIsrs),
fmt.Sprintf("%+v", partitionReplicas),
}
table.Append(row)
}

table.Append(row)
}
} else {
partitionIDs := []int{}
for _, partitionStatusInfo := range partitionsStatusInfo {
partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID)
for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus != status {
continue
}
partitionIDs = append(partitionIDs, partitionStatusInfo.Partition.ID)
}
}

if len(partitionIDs) == 0 {
continue
}

row := []string{
Expand Down
42 changes: 16 additions & 26 deletions pkg/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,42 +612,32 @@ func (c *CLIRunner) GetPartitionsStatus(
return nil
}

partitionsInfoByStatus := make(map[string][]admin.PartitionStatusInfo)
partitionsCountByStatus := 0
foundStatus := false

for topicName, partitionsStatusInfo := range partitionsInfoAllStatus {
statusPartitions := []admin.PartitionStatusInfo{}
c.printer(
"%v Partitions:\n%s",
status,
admin.FormatPartitionsByStatus(
partitionsInfoAllStatus,
status,
full,
),
)

// Summary: get the count of partitions for the status
partitionsCountByStatus := 0
for _, partitionsStatusInfo := range partitionsInfoAllStatus {
for _, partitionStatusInfo := range partitionsStatusInfo {
for _, partitionStatus := range partitionStatusInfo.Statuses {
if partitionStatus == status {
statusPartitions = append(statusPartitions, partitionStatusInfo)
foundStatus = true
partitionsCountByStatus += 1
}
}
}

if len(statusPartitions) == 0 {
continue
}

partitionsInfoByStatus[topicName] = statusPartitions
partitionsCountByStatus += len(statusPartitions)
}

c.printer(
"%v Partitions:\n%s",
status,
admin.FormatPartitionsByStatus(partitionsInfoByStatus, full),
)

log.Infof("%d Partitions are %v for topics", partitionsCountByStatus, status)

// preferred leader is not an error condition
if foundStatus && status != admin.PreferredLeader {
return fmt.Errorf("%v partitions are found for topics", status)
if partitionsCountByStatus > 0 && status != admin.PreferredLeader {
return fmt.Errorf("%d Partitions are %v", partitionsCountByStatus, status)
}
log.Infof("%d Partitions are %v", partitionsCountByStatus, status)

return nil
}
Expand Down

0 comments on commit e131d6f

Please sign in to comment.