Skip to content

Commit

Permalink
Merge pull request #23352 from r-vasquez/manual-backport-rpk-connect-…
Browse files Browse the repository at this point in the history
…v24.1.x

[v24.1.x] rpk connect as a managed plugin
  • Loading branch information
r-vasquez authored Sep 18, 2024
2 parents 5a8c35e + 09c4a4c commit d60a81b
Show file tree
Hide file tree
Showing 16 changed files with 763 additions and 57 deletions.
36 changes: 10 additions & 26 deletions src/go/rpk/pkg/cli/cloud/byoc/uninstall.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package byoc

import (
"fmt"
"os"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
Expand Down Expand Up @@ -38,33 +37,18 @@ disk space.
if !pluginExists {
out.Exit("The BYOC managed plugin is not installed!")
}
messages, anyFailed := removePluginAll(byoc)
for _, message := range messages {
fmt.Println(message)
}
if anyFailed {
os.Exit(1)
messages, anyFailed := byoc.Uninstall(true)
tw := out.NewTable("PATH", "MESSAGE")
defer func() {
tw.Flush()
if anyFailed {
os.Exit(1)
}
}()
for _, m := range messages {
tw.Print(m.Path, m.Message)
}
},
}
return cmd
}

func removePluginAll(p *plugin.Plugin) (messages []string, anyFailed bool) {
if err := os.Remove(p.Path); err != nil {
messages = append(messages, fmt.Sprintf("Unable to remove %q: %v", p.Path, err))
anyFailed = true
} else {
messages = append(messages, fmt.Sprintf("Removed %q", p.Path))
}

for _, shadowed := range p.ShadowedPaths {
if err := os.Remove(shadowed); err != nil {
messages = append(messages, fmt.Sprintf("Unable to remove shadowed at %q: %v", p.Path, err))
anyFailed = true
} else {
messages = append(messages, fmt.Sprintf("Remove shadowed at %q", p.Path))
}
}
return
}
112 changes: 112 additions & 0 deletions src/go/rpk/pkg/cli/connect/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package connect

import (
"context"
"errors"
"fmt"
"net/http"
"os"
"runtime"
"time"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/httpapi"
)

const pluginBaseURL = "https://rpk-plugins.redpanda.com"

type connectArtifact struct {
Path string `json:"path"`
Sha256 string `json:"sha256"`
}

type archive struct {
Version string `json:"version"`
IsLatest bool `json:"is_latest"`
Artifacts map[string]connectArtifact `json:"artifacts"`
}

type connectManifest struct {
Archives []archive `json:"archives"`
}

func (c *connectManifest) LatestArtifact() (connectArtifact, string, error) {
osArch := runtime.GOOS + "-" + runtime.GOARCH
for _, a := range c.Archives {
if a.IsLatest {
if artifact, ok := a.Artifacts[osArch]; ok {
return artifact, a.Version, nil
} else {
return connectArtifact{}, "", fmt.Errorf("no artifact found for os-arch: %s in our latest release. Please report this issue with Redpanda Support", osArch)
}
}
}
return connectArtifact{}, "", errors.New("no latest artifact found. Please report this issue with Redpanda Support")
}

func (c *connectManifest) ArtifactVersion(version string) (connectArtifact, error) {
osArch := runtime.GOOS + "-" + runtime.GOARCH
for _, a := range c.Archives {
if a.Version == version {
if artifact, ok := a.Artifacts[osArch]; ok {
return artifact, nil
} else {
return connectArtifact{}, fmt.Errorf("no artifact found for os-arch: %s in Redpanda Connect version %q. Please report this issue with Redpanda Support", osArch, version)
}
}
}
return connectArtifact{}, fmt.Errorf("unable to find version %q", version)
}

// connectRepoClient is a client to connect against our repository containing
// the Redpanda Connect packages.
type connectRepoClient struct {
cl *httpapi.Client
os string
arch string
}

func newRepoClient() (*connectRepoClient, error) {
timeout := 240 * time.Second
if t := os.Getenv("RPK_PLUGIN_DOWNLOAD_TIMEOUT"); t != "" {
duration, err := time.ParseDuration(t)
if err != nil {
return nil, fmt.Errorf("unable to parse RPK_PLUGIN_DOWNLOAD_TIMEOUT: %v", err)
}
timeout = duration
}
return &connectRepoClient{
cl: httpapi.NewClient(
httpapi.HTTPClient(&http.Client{
Timeout: timeout,
}),
),
os: runtime.GOOS,
arch: runtime.GOARCH,
}, nil
}

func (c *connectRepoClient) Manifest(ctx context.Context) (*connectManifest, error) {
var manifest connectManifest
err := c.cl.Get(ctx, fmt.Sprintf("%v/connect/manifest.json", getPluginURL()), nil, &manifest)
if err != nil {
return nil, fmt.Errorf("unable to retrieve Redpanda Connect manifest: %v", err)
}
return &manifest, nil
}

func getPluginURL() string {
url := pluginBaseURL
if repoURL := os.Getenv("RPK_PLUGIN_REPOSITORY"); repoURL != "" {
url = repoURL
}
return url
}
119 changes: 119 additions & 0 deletions src/go/rpk/pkg/cli/connect/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package connect

import (
"fmt"
"slices"
"strings"

"github.com/redpanda-data/redpanda/src/go/rpk/pkg/cobraext"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/config"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/out"
"github.com/redpanda-data/redpanda/src/go/rpk/pkg/plugin"
"github.com/spf13/afero"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

func init() {
plugin.RegisterManaged("connect", []string{"connect"}, func(cmd *cobra.Command, _ afero.Fs, p *config.Params) *cobra.Command {
run := cmd.Run
cmd.Run = func(cmd *cobra.Command, args []string) {
pluginArgs, err := parseConnectFlags(p, cmd, args)
out.MaybeDie(err, "unable to parse flags: %v", err)
run(cmd, pluginArgs)
}
return cmd
})
}

func NewCommand(fs afero.Fs, p *config.Params, execFn func(string, []string) error) *cobra.Command {
cmd := &cobra.Command{
Use: "connect",
Short: "A stream processor for mundane tasks - https://docs.redpanda.com/redpanda-connect",
DisableFlagParsing: true, // Required for managed plugins, we manually parse the flags.
Args: cobra.MinimumNArgs(0), // Connect can be run without commands.
Run: func(cmd *cobra.Command, args []string) {
pluginArgs, err := parseConnectFlags(p, cmd, args)
out.MaybeDie(err, "unable to parse flags: %v", err)
connect, pluginExists := plugin.ListPlugins(fs, plugin.UserPaths()).Find("connect")
var pluginPath string
if !pluginExists {
// If it doesn't exist we only download when the user runs a
// subcommand.
var isSubcommand bool
for _, arg := range pluginArgs {
switch {
case arg == "-c":
out.Die("-c flag is not supported by this command; run 'rpk connect run' instead")
case arg == "--version":
fmt.Println("cannot get connect version: rpk connect is not installed; run 'rpk connect install'")
cmd.Help()
return
case strings.HasPrefix(arg, "--") || strings.HasPrefix(arg, "-"):
continue
default:
isSubcommand = true
}
}
if !isSubcommand {
cmd.Help()
return
}
fmt.Println("Downloading latest Redpanda Connect")
path, _, err := installConnect(cmd.Context(), fs, "latest")
out.MaybeDie(err, "unable to install Redpanda Connect: %v; if running on an air-gapped environment you may install 'redpanda-connect' with your package manager.", err)
pluginPath = path
}
if pluginExists {
pluginPath = connect.Path
if !connect.Managed {
zap.L().Sugar().Warn("rpk is using a self-managed version of Redpanda Connect. If you want rpk to manage connect, use rpk connect uninstall && rpk connect install. To continue managing Connect manually, use our redpanda-connect package.")
}
}
zap.L().Debug("executing connect plugin", zap.String("path", pluginPath), zap.Strings("args", pluginArgs))
err = execFn(pluginPath, pluginArgs)
out.MaybeDie(err, "unable to execute redpanda connect plugin: %v", err)
},
}
cmd.AddCommand(
installCommand(fs),
uninstallCommand(fs),
upgradeCommand(fs),
)
return cmd
}

func parseConnectFlags(p *config.Params, cmd *cobra.Command, args []string) ([]string, error) {
f := cmd.Flags()

keepForPlugin, stripForRpk := cobraext.StripFlagset(args, f)
if err := f.Parse(stripForRpk); err != nil {
return nil, err
}
// Since we are manually parsing the flags, we need to force build the
// logger again.
zap.ReplaceGlobals(p.BuildLogger())
// We need to add back the Help and Version flags manually since we strip
// them for rpk.
if cobraext.LongFlagValue(args, f, "help", "h") == "true" && !slices.Contains(keepForPlugin, "--help") {
keepForPlugin = append(keepForPlugin, "--help")
}
// In rpk --verbose has a shorthand -v, in connect -v is used for version.
// This is _only_ valid for the 'connect' command:
isSubCommand := slices.ContainsFunc(keepForPlugin, func(s string) bool { return !strings.HasPrefix(s, "-") })
if cmd.Name() == "connect" && !isSubCommand {
if cobraext.LongFlagValue(args, f, "verbose", "v") == "true" && !slices.Contains(keepForPlugin, "--version") && !slices.Contains(keepForPlugin, "-v") {
keepForPlugin = append(keepForPlugin, "--version")
}
}
return keepForPlugin, nil
}
Loading

0 comments on commit d60a81b

Please sign in to comment.