Skip to content

Commit

Permalink
Merge pull request #23575 from gene-redpanda/add-mount-unmount
Browse files Browse the repository at this point in the history
rpk: Add Support for Topic Mounting and Unmounting
  • Loading branch information
gene-redpanda authored Oct 16, 2024
2 parents 41595d8 + 75c20ea commit b91e736
Show file tree
Hide file tree
Showing 8 changed files with 494 additions and 3 deletions.
12 changes: 10 additions & 2 deletions src/go/rpk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
GOCMD=go
GOLANGCILINTCMD=golangci-lint
GOFUMPTCMD=gofumpt
BAZELCMD=bazel

ifeq ($(shell uname),Darwin)
BAZELCMD := bazelisk
BAZEL_GAZELLE_CMD := run --config system-clang //:gazelle
else
BAZELCMD := bazel
BAZEL_GAZELLE_CMD := run //:gazelle
endif

GOOS ?= $(shell go env GOOS)
GOARCH ?= $(shell go env GOARCH)
Expand All @@ -29,6 +36,7 @@ all: help

ready: build test fmt lint bazel check_diff ## Runs all. Ensures commit is ready.

.PHONY: build
build: ## Build rpk.
$(shell mkdir -p $(OUTDIR))
$(GOCMD) build -ldflags '$(LDFLAGS)' -o $(OUTDIR) ./...
Expand Down Expand Up @@ -65,7 +73,7 @@ install_bazelisk:
@$(GOCMD) install github.com/bazelbuild/bazelisk@latest

bazel_generate_build: install_bazelisk
@$(BAZELCMD) run //:gazelle
@$(BAZELCMD) $(BAZEL_GAZELLE_CMD)

bazel_tidy: install_bazelisk
@$(BAZELCMD) mod tidy
Expand Down
12 changes: 11 additions & 1 deletion src/go/rpk/pkg/cli/cluster/storage/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,22 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "storage",
srcs = ["storage.go"],
srcs = [
"cancel-mount.go",
"list-mount.go",
"mount.go",
"status-mount.go",
"storage.go",
"unmount.go",
],
importpath = "github.com/redpanda-data/redpanda/src/go/rpk/pkg/cli/cluster/storage",
visibility = ["//visibility:public"],
deps = [
"//src/go/rpk/pkg/adminapi",
"//src/go/rpk/pkg/cli/cluster/storage/recovery",
"//src/go/rpk/pkg/config",
"//src/go/rpk/pkg/out",
"@com_github_redpanda_data_common_go_rpadmin//:rpadmin",
"@com_github_spf13_afero//:afero",
"@com_github_spf13_cobra//:cobra",
],
Expand Down
54 changes: 54 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/cancel-mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"strconv"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountCancel(fs afero.Fs, p *config.Params) *cobra.Command {
cmd := &cobra.Command{
Use: "cancel-mount [MIGRATION ID]",
Aliases: []string{"cancel-unmount"},
Short: "Cancels a mount/unmount operation",
Long: `Cancels a mount/unmount operation on a topic.
Use the migration ID that is emitted when the mount or unmount operation is executed.
You can also get the migration ID by listing the mount/unmount operations.`,
Example: `
Cancel a mount/unmount operation
rpk cluster storage cancel-mount 123
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrationID, err := strconv.Atoi(from[0])
out.MaybeDie(err, "invalid migration ID: %v", err)

err = adm.ExecuteMigration(cmd.Context(), migrationID, rpadmin.MigrationActionCancel)
out.MaybeDie(err, "unable to cancel the mount/unmount operation: %v", err)
fmt.Printf("Successfully canceled the operation with ID %v", migrationID)
},
}
return cmd
}
165 changes: 165 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/list-mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"io"
"os"
"strings"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountList(fs afero.Fs, p *config.Params) *cobra.Command {
var filter string
cmd := &cobra.Command{
Use: "list-mount",
Short: "List mount/unmount operations",
Aliases: []string{"list-unmount"},
Long: `List mount/unmount operations on a topic to the Redpanda cluster from Tiered Storage.
You can also filter the list by state using the --filter flag. The possible states are:
- planned
- prepared
- executed
- finished
If no filter is provided, all migrations will be listed.`,
Example: `
Lists mount/unmount operations
rpk cluster storage list-mount
Use filter to list only migrations in a specific state
rpk cluster storage list-mount --filter planned
`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, _ []string) {
f := p.Formatter
if h, ok := f.Help([]migrationState{}); ok {
out.Exit(h)
}

pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

migrations, err := adm.ListMigrations(cmd.Context())
out.MaybeDie(err, "unable to list migrations: %v", err)
printDetailedListMount(p.Formatter, filterOptFromString(filter), rpadminMigrationStateToMigrationState(migrations), os.Stdout)
},
}
p.InstallFormatFlag(cmd)
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter the list of migrations by state. Only valid for text")
return cmd
}

type filterOpts int

const (
FilterOptsAll filterOpts = iota
FilterOptsPlanned
FilterOptsPrepared
FilterOptsExecuted
FilterOptsFinished
)

func (f filterOpts) String() string {
switch f {
case FilterOptsAll:
return "all"
case FilterOptsPlanned:
return "planned"
case FilterOptsPrepared:
return "prepared"
case FilterOptsExecuted:
return "executed"
case FilterOptsFinished:
return "finished"
default:
return ""
}
}

func filterOptFromString(s string) filterOpts {
switch s {
case "planned":
return FilterOptsPlanned
case "prepared":
return FilterOptsPrepared
case "executed":
return FilterOptsExecuted
case "finished":
return FilterOptsFinished
case "all":
return FilterOptsAll
default:
return -1
}
}

func printDetailedListMount(f config.OutFormatter, ft filterOpts, d []migrationState, w io.Writer) {
if isText, _, t, err := f.Format(d); !isText {
out.MaybeDie(err, "unable to print in the requested format %q: %v", f.Kind, err)
fmt.Fprintln(w, t)
return
}
tw := out.NewTableTo(w, "ID", "State", "Migration", "Topics")
defer tw.Flush()
for _, m := range d {
if ft != FilterOptsAll {
if m.State != ft.String() {
continue
}
}
tw.Print(m.ID, m.State, m.MigrationType, strings.Join(m.Topics, ", "))
}
}

func rpadminMigrationStateToMigrationState(in []rpadmin.MigrationState) (resp []migrationState) {
resp = make([]migrationState, 0, len(in))
for _, entry := range in {
resp = append(resp, migrationState{
ID: entry.ID,
State: entry.State,
MigrationType: entry.Migration.MigrationType,
Topics: rpadminTopicsToStringSlice(entry.Migration.Topics),
})
}
return resp
}

// rpadminTopicsToStringSlice converts a slice of rpadmin.NamespacedTopic to a slice of strings
// if the topic has a non nil namespace it will appear as `namespace:topic`
// otherwise it will appear as `topic`.
func rpadminTopicsToStringSlice(in []rpadmin.NamespacedTopic) (resp []string) {
for _, entry := range in {
if entry.Namespace != nil {
resp = append(resp, fmt.Sprintf("%s/%s", *entry.Namespace, entry.Topic))
continue
}
resp = append(resp, entry.Topic)
}
return
}

type migrationState struct {
ID int `json:"id" yaml:"id"`
State string `json:"state" yaml:"state"`
MigrationType string `json:"type" yaml:"type"`
Topics []string `json:"topics" yaml:"topics"`
}
104 changes: 104 additions & 0 deletions src/go/rpk/pkg/cli/cluster/storage/mount.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package storage

import (
"fmt"
"strings"

"github.com/redpanda-data/common-go/rpadmin"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/adminapi"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/spf13/afero"
"github.com/spf13/cobra"
)

func newMountCommand(fs afero.Fs, p *config.Params) *cobra.Command {
var to string

cmd := &cobra.Command{
Use: "mount [TOPIC]",
Short: "Mount a topic",
Long: `Mount a topic to the Redpanda cluster from Tiered Storage.
This command mounts a topic in the Redpanda cluster using log segments stored
in Tiered Storage. The topic may be optionally renamed with the --to flag.
Requirements:
- Tiered Storage must be enabled.
- Log segments for the topic must be available in Tiered Storage.
- A topic with the same name must not already exist in the cluster.`,
Example: `
Mounts topic my-typic from Tiered Storage to the cluster in the my-namespace
rpk cluster storage mount my-topic
Mount topic my-topic from Tiered Storage to the cluster in the my-namespace
with my-new-topic as the new topic name
rpk cluster storage mount my-namespace/my-topic --to my-namespace/my-new-topic
`,
Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, from []string) {
pf, err := p.LoadVirtualProfile(fs)
out.MaybeDie(err, "rpk unable to load config: %v", err)
config.CheckExitCloudAdmin(pf)
adm, err := adminapi.NewClient(cmd.Context(), fs, pf)
out.MaybeDie(err, "unable to initialize admin client: %v", err)

n, t := nsTopic(from[0])
if t == "" {
out.Die("topic is required")
}
topic := rpadmin.InboundTopic{
SourceTopic: rpadmin.NamespacedTopic{
Namespace: string2pointer(n),
Topic: t,
},
}
an, at := nsTopic(to)
alias := t
if at != "" {
alias = at
topic.Alias = &rpadmin.NamespacedTopic{
Namespace: string2pointer(an),
Topic: alias,
}
}

mg, err := adm.MountTopics(cmd.Context(), rpadmin.MountConfiguration{Topics: []rpadmin.InboundTopic{topic}})
out.MaybeDie(err, "unable to mount topic: %v", err)

fmt.Printf(`
Topic mount from Tiered Storage topic %v to your Redpanda Cluster topic %v
has started with Migration ID %v
To check the status run 'rpk cluster storage status-mount %d'\n`, t, alias, mg.ID, mg.ID)
},
}
cmd.Flags().StringVar(&to, "to", "", "New namespace/topic name for the mounted topic (optional)")
return cmd
}

// nsTopic splits a topic string consisting of <namespace>/<topicName> and
// returns each component, if the namespace is not specified, returns 'kafka'.
func nsTopic(nst string) (namespace string, topic string) {
nsTopic := strings.SplitN(nst, "/", 2)
if len(nsTopic) == 1 {
namespace = "kafka"
topic = nsTopic[0]
} else {
namespace = nsTopic[0]
topic = nsTopic[1]
}
return namespace, topic
}

func string2pointer(s string) *string {
return &s
}
Loading

0 comments on commit b91e736

Please sign in to comment.