Skip to content

Commit

Permalink
Merge pull request #39 from Miles-Garnsey/feature/dataplane-registration
Browse files Browse the repository at this point in the history
Data plane registration command in k8ssandra-client.
  • Loading branch information
Miles-Garnsey authored May 27, 2024
2 parents 0e1bf9a + 4994b44 commit 82f4078
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 52 deletions.
2 changes: 2 additions & 0 deletions cmd/kubectl-k8ssandra/k8ssandra/k8ssandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/config"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/helm"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/operate"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/register"
"github.com/k8ssandra/k8ssandra-client/cmd/kubectl-k8ssandra/users"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -53,6 +54,7 @@ func NewCmd(streams genericclioptions.IOStreams) *cobra.Command {
// cmd.AddCommand(migrate.NewInstallCmd(streams))
cmd.AddCommand(config.NewCmd(streams))
cmd.AddCommand(helm.NewHelmCmd(streams))
register.SetupRegisterClusterCmd(cmd, streams)

// cmd.Flags().BoolVar(&o.listNamespaces, "list", o.listNamespaces, "if true, print the list of all namespaces in the current KUBECONFIG")
o.configFlags.AddFlags(cmd.Flags())
Expand Down
80 changes: 80 additions & 0 deletions cmd/kubectl-k8ssandra/register/command.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package register

import (
"fmt"

"github.com/charmbracelet/log"
"github.com/k8ssandra/k8ssandra-client/pkg/registration"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
)

var RegisterClusterCmd = &cobra.Command{
Use: "register [flags]",
Short: "register a data plane into the control plane.",
Long: `register creates a ServiceAccount on a source cluster, copies its credentials and then creates a secret containing them on the destination cluster. It then also creates a ClientConfig on the destination cluster to reference the secret.`,
Run: entrypoint,
}

func SetupRegisterClusterCmd(cmd *cobra.Command, streams genericclioptions.IOStreams) {
RegisterClusterCmd.Flags().String("source-kubeconfig",
"",
"path to source cluster's kubeconfig file - defaults to KUBECONFIG then ~/.kube/config")
RegisterClusterCmd.Flags().String("dest-kubeconfig",
"",
"path to destination cluster's kubeconfig file - defaults to KUBECONFIG then ~/.kube/config")
RegisterClusterCmd.Flags().String("source-context", "", "context name for source cluster")
RegisterClusterCmd.Flags().String("dest-context", "", "context name for destination cluster")
RegisterClusterCmd.Flags().String("source-namespace", "k8ssandra-operator", "namespace containing service account for source cluster")
RegisterClusterCmd.Flags().String("dest-namespace", "k8ssandra-operator", "namespace where secret and clientConfig will be created on destination cluster")
RegisterClusterCmd.Flags().String("serviceaccount-name", "k8ssandra-operator", "serviceaccount name for destination cluster")
RegisterClusterCmd.Flags().String("destination-name", "", "name for remote clientConfig and secret on destination cluster")

if err := RegisterClusterCmd.MarkFlagRequired("source-context"); err != nil {
panic(err)

}
if err := RegisterClusterCmd.MarkFlagRequired("dest-context"); err != nil {
panic(err)
}
cmd.AddCommand(RegisterClusterCmd)
}

func entrypoint(cmd *cobra.Command, args []string) {
executor := NewRegistrationExecutorFromRegisterClusterCmd(*cmd)

for i := 0; i < 30; i++ {
res := executor.RegisterCluster()
switch v := res.(type) {
case RetryableError:
log.Info("Registration continuing", "msg", v.Error())
continue
case nil:
log.Info("Registration completed successfully")
return
case NonRecoverableError:
panic(fmt.Sprintf("Registration failed: %s", v.Error()))
}
}
fmt.Println("Registration failed - retries exceeded")
}

func NewRegistrationExecutorFromRegisterClusterCmd(cmd cobra.Command) *RegistrationExecutor {

destName := cmd.Flag("destination-name").Value.String()
srcContext := cmd.Flag("source-context").Value.String()
if destName == "" {
destName = registration.CleanupForKubernetes(srcContext)
}
return &RegistrationExecutor{
SourceKubeconfig: cmd.Flag("source-kubeconfig").Value.String(),
DestKubeconfig: cmd.Flag("dest-kubeconfig").Value.String(),
SourceContext: srcContext,
DestContext: cmd.Flag("dest-context").Value.String(),
SourceNamespace: cmd.Flag("source-namespace").Value.String(),
DestNamespace: cmd.Flag("dest-namespace").Value.String(),
ServiceAccount: cmd.Flag("serviceaccount-name").Value.String(),
Context: cmd.Context(),
DestinationName: destName,
}
}
17 changes: 17 additions & 0 deletions cmd/kubectl-k8ssandra/register/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package register

type RetryableError struct {
Message string
}

func (e RetryableError) Error() string {
return e.Message
}

type NonRecoverableError struct {
Message string
}

func (e NonRecoverableError) Error() string {
return e.Message
}
159 changes: 159 additions & 0 deletions cmd/kubectl-k8ssandra/register/register_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package register

import (
"context"
"fmt"
"os"
"testing"
"time"

configapi "github.com/k8ssandra/k8ssandra-operator/apis/config/v1beta1"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestRegister(t *testing.T) {
require := require.New(t)
client1 := (*multiEnv)[0].GetClientInNamespace("source-namespace")
client2 := (*multiEnv)[1].GetClientInNamespace("dest-namespace")
require.NoError(client1.Create((*multiEnv)[0].Context, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "source-namespace"}}))
require.NoError(client2.Create((*multiEnv)[1].Context, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "dest-namespace"}}))

testDir, err := os.MkdirTemp("", "k8ssandra-operator-test-****")
require.NoError(err)
// buildDir := filepath.Join(envtest.RootDir(), "build")
// testDir := filepath.Join(buildDir, time.Now())

// if _, err := os.Stat(testDir); os.IsNotExist(err) {
// err := os.MkdirAll(testDir, os.ModePerm)
// require.NoError(err)
// } else if err != nil {
// require.NoError(err)
// }
t.Cleanup(func() {
require.NoError(os.RemoveAll(testDir))
})

kc1, err := (*multiEnv)[0].GetKubeconfig()
require.NoError(err)
f1, err := os.Create(testDir + "/kubeconfig1")
require.NoError(err)
t.Cleanup(func() {
require.NoError(f1.Close())
})
_, err = f1.Write(kc1)
require.NoError(err)

f2, err := os.Create(testDir + "/kubeconfig2")
require.NoError(err)
t.Cleanup(func() {
require.NoError(f2.Close())
})

kc2, err := (*multiEnv)[1].GetKubeconfig()
require.NoError(err)
_, err = f2.Write(kc2)
require.NoError(err)

ex := RegistrationExecutor{
SourceKubeconfig: testDir + "/kubeconfig1",
DestKubeconfig: testDir + "/kubeconfig2",
SourceContext: "default-context",
DestContext: "default-context",
SourceNamespace: "source-namespace",
DestNamespace: "dest-namespace",
ServiceAccount: "k8ssandra-operator",
Context: context.TODO(),
DestinationName: "test-destination",
}
ctx := context.Background()

require.Eventually(func() bool {
res := ex.RegisterCluster()
switch v := res.(type) {
case RetryableError:
if res.Error() == "no secret found for service account k8ssandra-operator" {
return true
}
case nil:
return true
case NonRecoverableError:
panic(fmt.Sprintf("Registration failed: %s", v.Error()))
}
return false
}, time.Second*30, time.Second*5)

// This relies on a controller that is not running in the envtest.

desiredSaSecret := &corev1.Secret{}
require.NoError(client1.Get(context.Background(), client.ObjectKey{Name: "k8ssandra-operator-secret", Namespace: "source-namespace"}, desiredSaSecret))
patch := client.MergeFrom(desiredSaSecret.DeepCopy())
desiredSaSecret.Data = map[string][]byte{
"token": []byte("test-token"),
"ca.crt": []byte("test-ca"),
}
require.NoError(client1.Patch(ctx, desiredSaSecret, patch))

desiredSa := &corev1.ServiceAccount{}
require.NoError(client1.Get(
context.Background(),
client.ObjectKey{Name: "k8ssandra-operator", Namespace: "source-namespace"},
desiredSa))

patch = client.MergeFrom(desiredSa.DeepCopy())
desiredSa.Secrets = []corev1.ObjectReference{
{
Name: "k8ssandra-operator-secret",
},
}
require.NoError(client1.Patch(ctx, desiredSa, patch))

// Continue reconciliation

require.Eventually(func() bool {
res := ex.RegisterCluster()
return res == nil
}, time.Second*300, time.Second*1)

if err := configapi.AddToScheme(client2.Scheme()); err != nil {
require.NoError(err)
}
destSecret := &corev1.Secret{}
require.Eventually(func() bool {
err = client2.Get(ctx,
client.ObjectKey{Name: "test-destination", Namespace: "dest-namespace"}, destSecret)
if err != nil {
t.Log("didn't find dest secret")
return false
}
clientConfig := &configapi.ClientConfig{}
err = client2.Get(ctx,
client.ObjectKey{Name: "test-destination", Namespace: "dest-namespace"}, clientConfig)
if err != nil {
t.Log("didn't find dest client config")
return false
}
return err == nil
}, time.Second*60, time.Second*5)

destKubeconfig := ClientConfigFromSecret(destSecret)
require.Equal(
desiredSaSecret.Data["ca.crt"],
destKubeconfig.Clusters["test-destination"].CertificateAuthorityData)

require.Equal(
string(desiredSaSecret.Data["token"]),
destKubeconfig.AuthInfos["test-destination"].Token)
}

func ClientConfigFromSecret(s *corev1.Secret) clientcmdapi.Config {
out, err := clientcmd.Load(s.Data["kubeconfig"])
if err != nil {
panic(err)
}
return *out
}
Loading

0 comments on commit 82f4078

Please sign in to comment.