Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v24.1.x] rpk: topic describe supports --regex flag #18644

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 113 additions & 85 deletions src/go/rpk/pkg/cli/topic/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package topic
import (
"context"
"errors"
"fmt"
"sort"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
Expand All @@ -31,20 +32,32 @@ func newDescribeCommand(fs afero.Fs, p *config.Params) *cobra.Command {
summary bool
configs bool
partitions bool
re bool
stable bool
)
cmd := &cobra.Command{
Use: "describe [TOPIC]",
Use: "describe [TOPICS]",
Aliases: []string{"info"},
Short: "Describe a topic",
Long: `Describe a topic.
Short: "Describe topics",
Long: `Describe topics.

This command prints detailed information about a topic. There are three
potential sections: a summary of the topic, the topic configs, and a detailed
This command prints detailed information about topics. The output contains
up to three sections: a summary of the topic, the topic configs, and a detailed
partitions section. By default, the summary and configs sections are printed.

The --regex flag (-r) parses arguments as regular expressions
and describes topics that match any of the expressions.

For example,

describe foo bar # describe topics foo and bar
describe -r '^f.*' '.*r$' # describe any topic starting with f and any topics ending in r
describe -r '*' # describe all topics
describe -r . # describe any one-character topics

`,

Args: cobra.ExactArgs(1),
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, topicArg []string) {
p, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
Expand All @@ -53,107 +66,121 @@ partitions section. By default, the summary and configs sections are printed.
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer cl.Close()

topic := topicArg[0]
adm, err := kafka.NewAdmin(fs, p)
out.MaybeDie(err, "unable to initialize kafka client: %v", err)
defer adm.Close()

if re {
topicArg, err = regexTopics(adm, topicArg)
out.MaybeDie(err, "unable to filter topics by regex: %v", err)
}

// By default, if neither are specified, we opt in to
// the config section only.
if !summary && !configs && !partitions {
summary, configs = true, true
}
if all {

// We show all sections if:
// - "print-all" is used or
// - more than one topic are specified or matched.
if all || len(topicArg) > 1 {
summary, configs, partitions = true, true, true
} else if len(topicArg) == 0 {
out.Exit("did not match any topics, exiting.")
}

var t kmsg.MetadataResponseTopic
{
req := kmsg.NewPtrMetadataRequest()
req := kmsg.NewPtrMetadataRequest()
for _, topic := range topicArg {
reqTopic := kmsg.NewMetadataRequestTopic()
reqTopic.Topic = kmsg.StringPtr(topic)
req.Topics = append(req.Topics, reqTopic)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request topic metadata: %v", err)
if len(resp.Topics) != 1 {
out.Die("metadata response returned %d topics when we asked for 1", len(resp.Topics))
}
t = resp.Topics[0]
}
resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request topic metadata: %v", err)

const (
secSummary = "summary"
secConfigs = "configs"
secPart = "partitions"
)

sections := out.NewMaybeHeaderSections(
out.ConditionalSectionHeaders(map[string]bool{
secSummary: summary,
secConfigs: configs,
secPart: partitions,
})...,
)

sections.Add(secSummary, func() {
tw := out.NewTabWriter()
defer tw.Flush()
tw.PrintColumn("NAME", *t.Topic)
if t.IsInternal {
tw.PrintColumn("INTERNAL", t.IsInternal)
}
tw.PrintColumn("PARTITIONS", len(t.Partitions))
if len(t.Partitions) > 0 {
p0 := &t.Partitions[0]
tw.PrintColumn("REPLICAS", len(p0.Replicas))
}
if err := kerr.ErrorForCode(t.ErrorCode); err != nil {
tw.PrintColumn("ERROR", err)
}
})

sections.Add(secConfigs, func() {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)

tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
types.Sort(resp)
for _, config := range resp.Resources[0].Configs {
var val string
if config.IsSensitive {
val = "(sensitive)"
} else if config.Value != nil {
val = *config.Value
for i, topic := range resp.Topics {
sections := out.NewMaybeHeaderSections(
out.ConditionalSectionHeaders(map[string]bool{
secSummary: summary,
secConfigs: configs,
secPart: partitions,
})...,
)

sections.Add(secSummary, func() {
tw := out.NewTabWriter()
defer tw.Flush()
tw.PrintColumn("NAME", *topic.Topic)
if topic.IsInternal {
tw.PrintColumn("INTERNAL", topic.IsInternal)
}
tw.Print(config.Name, val, config.Source)
}
})

sections.Add(secPart, func() {
offsets := listStartEndOffsets(cl, topic, len(t.Partitions), stable)

tw := out.NewTable(describePartitionsHeaders(
t.Partitions,
offsets,
)...)
defer tw.Flush()
for _, row := range describePartitionsRows(
t.Partitions,
offsets,
) {
tw.Print(row...)
tw.PrintColumn("PARTITIONS", len(topic.Partitions))
if len(topic.Partitions) > 0 {
p0 := &topic.Partitions[0]
tw.PrintColumn("REPLICAS", len(p0.Replicas))
}
if err := kerr.ErrorForCode(topic.ErrorCode); err != nil {
tw.PrintColumn("ERROR", err)
}
})

sections.Add(secConfigs, func() {
req := kmsg.NewPtrDescribeConfigsRequest()
reqResource := kmsg.NewDescribeConfigsRequestResource()
reqResource.ResourceType = kmsg.ConfigResourceTypeTopic
reqResource.ResourceName = *topic.Topic
req.Resources = append(req.Resources, reqResource)

resp, err := req.RequestWith(context.Background(), cl)
out.MaybeDie(err, "unable to request configs: %v", err)
if len(resp.Resources) != 1 {
out.Die("config response returned %d resources when we asked for 1", len(resp.Resources))
}
err = kerr.ErrorForCode(resp.Resources[0].ErrorCode)
out.MaybeDie(err, "config response contained error: %v", err)

tw := out.NewTable("KEY", "VALUE", "SOURCE")
defer tw.Flush()
types.Sort(resp)
for _, config := range resp.Resources[0].Configs {
var val string
if config.IsSensitive {
val = "(sensitive)"
} else if config.Value != nil {
val = *config.Value
}
tw.Print(config.Name, val, config.Source)
}
})

sections.Add(secPart, func() {
offsets := listStartEndOffsets(cl, *topic.Topic, len(topic.Partitions), stable)

tw := out.NewTable(describePartitionsHeaders(
topic.Partitions,
offsets,
)...)
defer tw.Flush()
for _, row := range describePartitionsRows(
topic.Partitions,
offsets,
) {
tw.Print(row...)
}
})

i++
if i < len(resp.Topics) {
fmt.Println()
}
})
}
},
}

Expand All @@ -170,6 +197,7 @@ partitions section. By default, the summary and configs sections are printed.
cmd.Flags().BoolVarP(&configs, "print-configs", "c", false, "Print the config section")
cmd.Flags().BoolVarP(&partitions, "print-partitions", "p", false, "Print the detailed partitions section")
cmd.Flags().BoolVarP(&all, "print-all", "a", false, "Print all sections")
cmd.Flags().BoolVarP(&re, "regex", "r", false, "Parse arguments as regex; describe any topic that matches any input topic expression")

cmd.Flags().BoolVar(&stable, "stable", false, "Include the stable offsets column in the partitions section; only relevant if you produce to this topic transactionally")

Expand Down
Loading