From 673674780619dc08d99c3ab1e04edae047d6c9ca Mon Sep 17 00:00:00 2001 From: r-vasquez Date: Wed, 22 May 2024 09:16:52 -0700 Subject: [PATCH] rpk: avoid checking for topic existence with regex Fixes #18602 --- src/go/rpk/pkg/cli/topic/consume.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/go/rpk/pkg/cli/topic/consume.go b/src/go/rpk/pkg/cli/topic/consume.go index a5fd56e25f3c9..4c1eac621bd63 100644 --- a/src/go/rpk/pkg/cli/topic/consume.go +++ b/src/go/rpk/pkg/cli/topic/consume.go @@ -89,13 +89,15 @@ func newConsumeCommand(fs afero.Fs, p *config.Params) *cobra.Command { out.MaybeDie(err, "unable to initialize admin kafka client: %v", err) // We fail if the topic does not exist. - listed, err := adm.ListTopics(cmd.Context(), topics...) - out.MaybeDie(err, "unable to check topic existence: %v", err) - listed.EachError(func(d kadm.TopicDetail) { - if errors.Is(d.Err, kerr.UnknownTopicOrPartition) { - out.Die("unable to consume topic %q: %v", d.Topic, d.Err.Error()) - } - }) + if !c.regex { + listed, err := adm.ListTopics(cmd.Context(), topics...) + out.MaybeDie(err, "unable to check topic existence: %v", err) + listed.EachError(func(d kadm.TopicDetail) { + if errors.Is(d.Err, kerr.UnknownTopicOrPartition) { + out.Die("unable to consume topic %q: %v", d.Topic, d.Err.Error()) + } + }) + } err = c.parseOffset(offset, topics, adm) out.MaybeDie(err, "invalid --offset %q: %v", offset, err)