Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpk: Add Support for Topic Mounting and Unmounting #23575

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/go/rpk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
GOCMD=go
GOLANGCILINTCMD=golangci-lint
GOFUMPTCMD=gofumpt
BAZELCMD=bazel

ifeq ($(shell uname),Darwin)
BAZELCMD := bazelisk
BAZEL_GAZELLE_CMD := run --config system-clang //:gazelle
else
BAZELCMD := bazel
BAZEL_GAZELLE_CMD := run //:gazelle
endif

GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)
Expand All @@ -29,6 +36,7 @@ all: help

ready: build test fmt lint bazel check_diff ## Runs all. Ensures commit is ready.

.PHONY: build
build: ## Build rpk.
$(shell mkdir -p $(OUTDIR))
$(GOCMD) build -ldflags '$(LDFLAGS)' -o $(OUTDIR) ./...
Expand Down Expand Up @@ -65,7 +73,7 @@ install_bazelisk:
@$(GOCMD) install github.com/bazelbuild/bazelisk@latest

bazel_generate_build: install_bazelisk
@$(BAZELCMD) run //:gazelle
@$(BAZELCMD) $(BAZEL_GAZELLE_CMD)

bazel_tidy: install_bazelisk
@$(BAZELCMD) mod tidy
Expand Down
12 changes: 11 additions & 1 deletion src/go/rpk/pkg/cli/cluster/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "storage",
srcs = ["storage.go"],
srcs = [
"cancel-mount.go",
"list-mount.go",
"mount.go",
"status-mount.go",
"storage.go",
"unmount.go",
],
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage",
visibility = ["//visibility:public"],
deps = [
"//src/go/rpk/pkg/adminapi",
"//src/go/rpk/pkg/cli/cluster/storage/recovery",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
],
Expand Down
54 changes: 54 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"strconv"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountCancel(fs afero.Fs, p *config.Params) *cobra.Command {
cmd := &cobra.Command{
Use: "cancel-mount [MIGRATION ID]",
Aliases: []string{"cancel-unmount"},
Short: "Cancels a mount/unmount operation",
Long: `Cancels a mount/unmount operation on a topic.

Use the migration ID that is emitted when the mount or unmount operation is executed.
You can also get the migration ID by listing the mount/unmount operations.`,
Example: `
Cancel a mount/unmount operation
rpk cluster storage cancel-mount 123
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrationID, err := strconv.Atoi(from[0])
out.MaybeDie(err, "invalid migration ID: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
fmt.Printf("Successfully canceled the operation with ID %v", migrationID)
},
}
return cmd
}
165 changes: 165 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/list-mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"io"
"os"
"strings"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountList(fs afero.Fs, p *config.Params) *cobra.Command {
var filter string
cmd := &cobra.Command{
Use: "list-mount",
Short: "List mount/unmount operations",
Aliases: []string{"list-unmount"},
Long: `List mount/unmount operations on a topic to the Redpanda cluster from Tiered Storage.

You can also filter the list by state using the --filter flag. The possible states are:
gene-redpanda marked this conversation as resolved.
Show resolved Hide resolved
- planned
- prepared
- executed
- finished

If no filter is provided, all migrations will be listed.`,
Example: `
Lists mount/unmount operations
rpk cluster storage list-mount

Use filter to list only migrations in a specific state
rpk cluster storage list-mount --filter planned
`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, _ []string) {
f := p.Formatter
if h, ok := f.Help([]migrationState{}); ok {
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrations, err := adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text")
return cmd
}

type filterOpts int

const (
FilterOptsAll filterOpts = iota
FilterOptsPlanned
FilterOptsPrepared
FilterOptsExecuted
FilterOptsFinished
)

func (f filterOpts) String() string {
switch f {
case FilterOptsAll:
return "all"
case FilterOptsPlanned:
return "planned"
case FilterOptsPrepared:
return "prepared"
case FilterOptsExecuted:
return "executed"
case FilterOptsFinished:
return "finished"
default:
return ""
}
}

func filterOptFromString(s string) filterOpts {
switch s {
case "planned":
return FilterOptsPlanned
case "prepared":
return FilterOptsPrepared
case "executed":
return FilterOptsExecuted
case "finished":
return FilterOptsFinished
case "all":
return FilterOptsAll
default:
return -1
}
}

func printDetailedListMount(f config.OutFormatter, ft filterOpts, d []migrationState, w io.Writer) {
if isText, _, t, err := f.Format(d); !isText {
out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err)
fmt.Fprintln(w, t)
return
}
tw := out.NewTableTo(w, "ID", "State", "Migration", "Topics")
defer tw.Flush()
for _, m := range d {
if ft != FilterOptsAll {
if m.State != ft.String() {
continue
}
}
tw.Print(m.ID, m.State, m.MigrationType, strings.Join(m.Topics, ", "))
}
}

func rpadminMigrationStateToMigrationState(in []rpadmin.MigrationState) (resp []migrationState) {
resp = make([]migrationState, 0, len(in))
for _, entry := range in {
resp = append(resp, migrationState{
ID: entry.ID,
State: entry.State,
MigrationType: entry.Migration.MigrationType,
Topics: rpadminTopicsToStringSlice(entry.Migration.Topics),
})
}
return resp
}

// rpadminTopicsToStringSlice converts a slice of rpadmin.NamespacedTopic to a slice of strings
// if the topic has a non nil namespace it will appear as `namespace:topic`
// otherwise it will appear as `topic`.
func rpadminTopicsToStringSlice(in []rpadmin.NamespacedTopic) (resp []string) {
for _, entry := range in {
if entry.Namespace != nil {
resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic))
continue
}
resp = append(resp, entry.Topic)
}
return
}

type migrationState struct {
ID int `json:"id" yaml:"id"`
State string `json:"state" yaml:"state"`
MigrationType string `json:"type" yaml:"type"`
Topics []string `json:"topics" yaml:"topics"`
}
104 changes: 104 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"strings"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var to string

cmd := &cobra.Command{
Use: "mount [TOPIC]",
Short: "Mount a topic",
Long: `Mount a topic to the Redpanda cluster from Tiered Storage.

This command mounts a topic in the Redpanda cluster using log segments stored
in Tiered Storage. The topic may be optionally renamed with the --to flag.

Requirements:
- Tiered Storage must be enabled.
- Log segments for the topic must be available in Tiered Storage.
gene-redpanda marked this conversation as resolved.
Show resolved Hide resolved
- A topic with the same name must not already exist in the cluster.`,
gene-redpanda marked this conversation as resolved.
Show resolved Hide resolved
Example: `
Mounts topic my-typic from Tiered Storage to the cluster in the my-namespace
rpk cluster storage mount my-topic
r-vasquez marked this conversation as resolved.
Show resolved Hide resolved

Mount topic my-topic from Tiered Storage to the cluster in the my-namespace
with my-new-topic as the new topic name
rpk cluster storage mount my-namespace/my-topic --to my-namespace/my-new-topic

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are other namespaces supported? It appears that at least the Admin API call only supports kafka

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well there's support for namespace in the spec for migration/mount and we auto-define the kafka namespace if they don't provide one

`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

n, t := nsTopic(from[0])
if t == "" {
out.Die("topic is required")
}
topic := rpadmin.InboundTopic{
SourceTopic: rpadmin.NamespacedTopic{
Namespace: string2pointer(n),
Topic: t,
},
}
an, at := nsTopic(to)
alias := t
if at != "" {
alias = at
topic.Alias = &rpadmin.NamespacedTopic{
Namespace: string2pointer(an),
Topic: alias,
}
}

mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}})
out.MaybeDie(err, "unable to mount topic: %v", err)

fmt.Printf(`
Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v
has started with Migration ID %v
To check the status run 'rpk cluster storage status-mount %d'\n`, t, alias, mg.ID, mg.ID)
},
}
cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)")
return cmd
}

// nsTopic splits a topic string consisting of <namespace>/<topicName> and
// returns each component, if the namespace is not specified, returns 'kafka'.
func nsTopic(nst string) (namespace string, topic string) {
nsTopic := strings.SplitN(nst, "/", 2)
if len(nsTopic) == 1 {
namespace = "kafka"
topic = nsTopic[0]
} else {
namespace = nsTopic[0]
topic = nsTopic[1]
}
return namespace, topic
}

func string2pointer(s string) *string {
return &s
}
Loading