diff --git a/.github/workflows/kind_e2e_tests.yaml b/.github/workflows/kind_e2e_tests.yaml index 3ff7b113d..123f9ff31 100644 --- a/.github/workflows/kind_e2e_tests.yaml +++ b/.github/workflows/kind_e2e_tests.yaml @@ -19,8 +19,12 @@ jobs: outputs: image: ${{ steps.set_image_var.outputs.image }} steps: - - name: Checkout code - uses: actions/checkout@v2 + - uses: actions/checkout@v2 + if: github.event_name == 'pull_request' + with: + ref: ${{ github.event.pull_request.head.sha }} + - uses: actions/checkout@v2 + if: github.event_name != 'pull_request' - name: Set up Docker buildx uses: docker/setup-buildx-action@v1 - name: Cache Docker layers @@ -33,7 +37,7 @@ jobs: - name: Set build tags id: set_build_tags run: | - image="k8ssandra-operator/k8ssandra-operator:latest" + image="k8ssandra/k8ssandra-operator:latest" echo "build_tags=$image" >> $GITHUB_ENV echo "image=$image" >> $GITHUB_ENV - name: Update build tags @@ -126,7 +130,7 @@ jobs: run: | docker load --input /tmp/k8ssandra-operator.tar - name: Setup kind cluster - run: make IMG=${{ needs.build_image.outputs.image }} e2e-setup-single + run: make IMG=${{ needs.build_image.outputs.image }} create-kind-cluster kind-load-image - name: Run e2e test ( ${{ matrix.e2e_test }} ) run: make E2E_TEST=TestOperator/${{ matrix.e2e_test }} e2e-test - name: Archive k8s logs diff --git a/.github/workflows/kind_multicluster_e2e_tests.yaml b/.github/workflows/kind_multicluster_e2e_tests.yaml index ba9e84588..963a59ecf 100644 --- a/.github/workflows/kind_multicluster_e2e_tests.yaml +++ b/.github/workflows/kind_multicluster_e2e_tests.yaml @@ -33,7 +33,7 @@ jobs: - name: Set build tags id: set_build_tags run: | - image="k8ssandra-operator/k8ssandra-operator:latest" + image="k8ssandra/k8ssandra-operator:latest" echo "build_tags=$image" >> $GITHUB_ENV echo "image=$image" >> $GITHUB_ENV - name: Update build tags @@ -71,6 +71,7 @@ jobs: matrix: e2e_test: - CreateMultiDatacenterCluster + - AddDcToCluster - CheckStargateApisWithMultiDcCluster - CreateMultiStargateAndDatacenter - CreateMultiReaper @@ -128,7 +129,7 @@ jobs: run: | docker load --input /tmp/k8ssandra-operator.tar - name: Setup kind clusters - run: make IMG=${{ needs.build_image.outputs.image }} e2e-setup-multi + run: make IMG=${{ needs.build_image.outputs.image }} create-kind-multicluster kind-load-image-multi - name: Run e2e test ( ${{ matrix.e2e_test }} ) run: make E2E_TEST=TestOperator/${{ matrix.e2e_test }} NODETOOL_STATUS_TIMEOUT=2m e2e-test - name: Archive k8s logs diff --git a/.github/workflows/kuttl_tests.yaml b/.github/workflows/kuttl_tests.yaml index 377af7f02..619739376 100644 --- a/.github/workflows/kuttl_tests.yaml +++ b/.github/workflows/kuttl_tests.yaml @@ -12,6 +12,7 @@ on: paths-ignore: - 'docs/**' - 'CHANGELOG/**' + jobs: build_image: name: Build image @@ -33,7 +34,7 @@ jobs: - name: Set build tags id: set_build_tags run: | - image="k8ssandra-operator/k8ssandra-operator:latest" + image="k8ssandra/k8ssandra-operator:latest" echo "build_tags=$image" >> $GITHUB_ENV echo "image=$image" >> $GITHUB_ENV - name: Update build tags diff --git a/CHANGELOG/CHANGELOG-1.0.md b/CHANGELOG/CHANGELOG-1.0.md index 1a9f6262b..96ce8d81f 100644 --- a/CHANGELOG/CHANGELOG-1.0.md +++ b/CHANGELOG/CHANGELOG-1.0.md @@ -13,6 +13,7 @@ Changelog for the K8ssandra Operator, new PRs should update the `unreleased` sec When cutting a new release, update the `unreleased` heading to the tag being generated and date, like `## vX.Y.Z - YYYY-MM-DD` and create a new placeholder section for `unreleased` entries. ## Unreleased +* [FEATURE] [#21](https://github.com/k8ssandra/k8ssandra-operator/issues/21) Add datacenter to existing cluster ## v1.0.0-alpha.3 2022-01-23 diff --git a/Makefile b/Makefile index a187cdb01..1aad3392e 100644 --- a/Makefile +++ b/Makefile @@ -69,9 +69,15 @@ TEST_ARGS= NS ?= k8ssandra-operator -CLUSTER_SCOPE = false +# DEPLOYMENT specifies a particular kustomization to use for configuring the operator +# in a particular way, cluster-scoped for example. See config/deployments/README.md for +# more info. DEPLOYMENT = +# Indicates the number of kind clusters that are being used. Note that the clusters should +# be created with scripts/setup-kind-multicluster.sh. +NUM_CLUSTERS = 2 + ifeq ($(DEPLOYMENT), ) DEPLOY_TARGET = else @@ -173,18 +179,18 @@ multi-up: cleanup build manifests kustomize docker-build create-kind-multicluste kubectl config use-context kind-k8ssandra-0 $(KUSTOMIZE) build config/deployments/control-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f - ##install the data plane - kubectl config use-context kind-k8ssandra-1 - $(KUSTOMIZE) build config/deployments/data-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f - + for ((i = 1; i < $(NUM_CLUSTERS); ++i)); do \ + kubectl config use-context kind-k8ssandra-$$i; \ + $(KUSTOMIZE) build config/deployments/data-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f -; \ + done ## Create a client config - make create-client-config + make create-clientconfig ## Restart the control plane kubectl config use-context kind-k8ssandra-0 kubectl -n $(NS) delete pod -l control-plane=k8ssandra-operator kubectl -n $(NS) rollout status deployment k8ssandra-operator -ifeq ($(DEPLOYMENT), cass-operator-dev) kubectl -n $(NS) delete pod -l name=cass-operator kubectl -n $(NS) rollout status deployment cass-operator-controller-manager -endif multi-reload: build manifests kustomize docker-build kind-load-image-multi cert-manager-multi # Reload the operator on the control-plane @@ -192,19 +198,17 @@ multi-reload: build manifests kustomize docker-build kind-load-image-multi cert- $(KUSTOMIZE) build config/deployments/control-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f - kubectl -n $(NS) delete pod -l control-plane=k8ssandra-operator kubectl -n $(NS) rollout status deployment k8ssandra-operator -ifeq ($(DEPLOYMENT), cass-operator-dev) kubectl -n $(NS) delete pod -l name=cass-operator kubectl -n $(NS) rollout status deployment cass-operator-controller-manager -endif # Reload the operator on the data-plane - kubectl config use-context kind-k8ssandra-1 - $(KUSTOMIZE) build config/deployments/data-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f - - kubectl -n $(NS) delete pod -l control-plane=k8ssandra-operator - kubectl -n $(NS) rollout status deployment k8ssandra-operator -ifeq ($(DEPLOYMENT), cass-operator-dev) - kubectl -n $(NS) delete pod -l name=cass-operator - kubectl -n $(NS) rollout status deployment cass-operator-controller-manager -endif + for ((i = 1; i < $(NUM_CLUSTERS); ++i)); do \ + kubectl config use-context kind-k8ssandra-$$i; \ + $(KUSTOMIZE) build config/deployments/data-plane$(DEPLOY_TARGET) | kubectl apply --server-side --force-conflicts -f -; \ + kubectl -n $(NS) delete pod -l control-plane=k8ssandra-operator; \ + kubectl -n $(NS) rollout status deployment k8ssandra-operator; \ + kubectl -n $(NS) delete pod -l name=cass-operator; \ + kubectl -n $(NS) rollout status deployment cass-operator-controller-manager; \ + done single-deploy: kubectl config use-context kind-k8ssandra-0 @@ -215,18 +219,20 @@ multi-deploy: kubectl -n $(NS) apply -f test/testdata/samples/k8ssandra-multi-kind.yaml cleanup: - kind delete cluster --name k8ssandra-0 - kind delete cluster --name k8ssandra-1 + for ((i = 0; i < $(NUM_CLUSTERS); ++i)); do \ + kind delete cluster --name k8ssandra-$$i; \ + done create-kind-cluster: scripts/setup-kind-multicluster.sh --clusters 1 --kind-worker-nodes 4 create-kind-multicluster: - scripts/setup-kind-multicluster.sh --clusters 2 --kind-worker-nodes 4 + scripts/setup-kind-multicluster.sh --clusters $(NUM_CLUSTERS) --kind-worker-nodes 4 kind-load-image-multi: - kind load docker-image --name k8ssandra-0 ${IMG} - kind load docker-image --name k8ssandra-1 ${IMG} + for ((i = 0; i < $(NUM_CLUSTERS); ++i)); do \ + kind load docker-image --name k8ssandra-$$i ${IMG}; \ + done ##@ Deployment @@ -249,15 +255,18 @@ cert-manager: ## Install cert-manager to the cluster kubectl rollout status deployment cert-manager-webhook -n cert-manager cert-manager-multi: ## Install cert-manager to the clusters - kubectl config use-context kind-k8ssandra-0 - make cert-manager - kubectl config use-context kind-k8ssandra-1 - make cert-manager + for ((i = 0; i < $(NUM_CLUSTERS); ++i)); do \ + kubectl config use-context kind-k8ssandra-$$i; \ + make cert-manager; \ + done -create-client-config: +create-clientconfig: kubectl config use-context kind-k8ssandra-0 - make install - scripts/create-clientconfig.sh --namespace $(NS) --src-kubeconfig build/kubeconfigs/k8ssandra-1.yaml --dest-kubeconfig build/kubeconfigs/k8ssandra-0.yaml --in-cluster-kubeconfig build/kubeconfigs/updated/k8ssandra-1.yaml --output-dir clientconfig + for ((i = 0; i < $(NUM_CLUSTERS); ++i)); do \ + make install; \ + scripts/create-clientconfig.sh --namespace $(NS) --src-kubeconfig build/kubeconfigs/k8ssandra-$$i.yaml --dest-kubeconfig build/kubeconfigs/k8ssandra-0.yaml --in-cluster-kubeconfig build/kubeconfigs/updated/k8ssandra-$$i.yaml --output-dir clientconfig; \ + done + CONTROLLER_GEN = $(shell pwd)/bin/controller-gen controller-gen: ## Download controller-gen locally if necessary. diff --git a/apis/k8ssandra/v1alpha1/constants.go b/apis/k8ssandra/v1alpha1/constants.go index 33e3aa663..1f80dd6ee 100644 --- a/apis/k8ssandra/v1alpha1/constants.go +++ b/apis/k8ssandra/v1alpha1/constants.go @@ -3,11 +3,32 @@ package v1alpha1 const ( ResourceHashAnnotation = "k8ssandra.io/resource-hash" - // SystemReplicationAnnotation provides the initial replication of system keyspaces + // InitialSystemReplicationAnnotation provides the initial replication of system keyspaces // (system_auth, system_distributed, system_traces) encoded as JSON. This annotation // is set on a K8ssandraCluster when it is first created. The value does not change // regardless of whether the replication of the system keyspaces changes. - SystemReplicationAnnotation = "k8ssandra.io/system-replication" + InitialSystemReplicationAnnotation = "k8ssandra.io/initial-system-replication" + + // DcReplicationAnnotation tells the operator the replication settings to apply to user + // keyspaces when adding a DC to an existing cluster. The value should be serialized + // JSON, e.g., {"dc2": {"ks1": 3, "ks2": 3}}. All user keyspaces must be specified; + // otherwise, reconciliation will fail with a validation error. If you do not want to + // replicate a particular keyspace, specify a value of 0. Replication settings can be + // specified for multiple DCs; however, existing DCs won't be modified, and only the DC + // currently being added will be updated. Specifying multiple DCs can be useful though + // if you add multiple DCs to the cluster at once (Note that the CassandraDatacenters + // are still deployed serially). + DcReplicationAnnotation = "k8ssandra.io/dc-replication" + + // RebuildSourceDcAnnotation tells the operation the DC from which to stream when + // rebuilding a DC. If not set the operator will choose the first DC. The value for + // this annotation must specify the name of a CassandraDatacenter whose Ready + // condition is true. + RebuildSourceDcAnnotation = "k8ssandra.io/rebuild-src-dc" + + RebuildDcAnnotation = "k8ssandra.io/rebuild-dc" + + RebuildLabel = "k8ssandra.io/rebuild" NameLabel = "app.kubernetes.io/name" NameLabelValue = "k8ssandra-operator" @@ -35,3 +56,7 @@ const ( DatacenterLabel = "k8ssandra.io/datacenter" ) + +var ( + SystemKeyspaces = []string{"system_traces", "system_distributed", "system_auth"} +) diff --git a/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go b/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go index 1ec007122..1926bb648 100644 --- a/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go +++ b/apis/k8ssandra/v1alpha1/k8ssandracluster_types.go @@ -156,6 +156,30 @@ func (in *K8ssandraCluster) HasReapers() bool { return false } +func (in *K8ssandraCluster) GetInitializedDatacenters() []CassandraDatacenterTemplate { + datacenters := make([]CassandraDatacenterTemplate, 0) + if in != nil && in.Spec.Cassandra != nil { + for _, dc := range in.Spec.Cassandra.Datacenters { + if status, found := in.Status.Datacenters[dc.Meta.Name]; found && status.Cassandra.GetConditionStatus(cassdcapi.DatacenterInitialized) == corev1.ConditionTrue { + datacenters = append(datacenters, dc) + } + } + } + return datacenters +} + +func (in *K8ssandraCluster) GetReadyDatacenters() []CassandraDatacenterTemplate { + datacenters := make([]CassandraDatacenterTemplate, 0) + if in != nil && in.Spec.Cassandra != nil { + for _, dc := range in.Spec.Cassandra.Datacenters { + if status, found := in.Status.Datacenters[dc.Meta.Name]; found && status.Cassandra.GetConditionStatus(cassdcapi.DatacenterReady) == corev1.ConditionTrue { + datacenters = append(datacenters, dc) + } + } + } + return datacenters +} + // +kubebuilder:object:root=true // K8ssandraClusterList contains a list of K8ssandraCluster diff --git a/config/deployments/control-plane/kustomization.yaml b/config/deployments/control-plane/kustomization.yaml index ac9fbdf13..159a45a77 100644 --- a/config/deployments/control-plane/kustomization.yaml +++ b/config/deployments/control-plane/kustomization.yaml @@ -9,4 +9,4 @@ resources: images: - name: k8ssandra/cass-operator - newTag: 9d1c58a5 \ No newline at end of file + newTag: 9d1c58a5 diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 0932e0830..e35ec9fac 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -59,6 +59,18 @@ rules: - patch - update - watch +- apiGroups: + - control.k8ssandra.io + resources: + - cassandratasks + verbs: + - create + - delete + - get + - list + - patch + - update + - watch - apiGroups: - "" resources: diff --git a/controllers/k8ssandra/add_dc_test.go b/controllers/k8ssandra/add_dc_test.go new file mode 100644 index 000000000..0cb7ed23d --- /dev/null +++ b/controllers/k8ssandra/add_dc_test.go @@ -0,0 +1,701 @@ +package k8ssandra + +import ( + "context" + "fmt" + "github.com/go-logr/logr" + cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" + api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" + reaperapi "github.com/k8ssandra/k8ssandra-operator/apis/reaper/v1alpha1" + stargateapi "github.com/k8ssandra/k8ssandra-operator/apis/stargate/v1alpha1" + "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" + "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" + "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" + testutils "github.com/k8ssandra/k8ssandra-operator/pkg/test" + "github.com/k8ssandra/k8ssandra-operator/pkg/utils" + "github.com/k8ssandra/k8ssandra-operator/test/framework" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/rand" + "sigs.k8s.io/controller-runtime/pkg/client" + "testing" + "time" +) + +// addDc tests scenarios that involve adding a new CassandraDatacenter to an existing +// K8ssandraCluster. +func addDc(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { + t.Run("WithUserKeyspaces", addDcTest(ctx, f, withUserKeyspaces, true)) + t.Run("WithStargateAndReaper", addDcTest(ctx, f, withStargateAndReaper, true)) + t.Run("FailSystemKeyspaceUpdate", addDcTest(ctx, f, failSystemKeyspaceUpdate, true)) + t.Run("FailUserKeyspaceUpdate", addDcTest(ctx, f, failUserKeyspaceUpdate, true)) + t.Run("ConfigureSrcDcForRebuild", addDcTest(ctx, f, configureSrcDcForRebuild, false)) +} + +type addDcTestFunc func(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) + +func addDcSetupForSingleDc(ctx context.Context, t *testing.T, f *framework.Framework, namespace string) *api.K8ssandraCluster { + require := require.New(t) + kc := &api.K8ssandraCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "add-dc-test", + }, + Spec: api.K8ssandraClusterSpec{ + Cassandra: &api.CassandraClusterTemplate{ + Datacenters: []api.CassandraDatacenterTemplate{ + { + Meta: api.EmbeddedObjectMeta{ + Name: "dc1", + }, + K8sContext: k8sCtx0, + Size: 3, + ServerVersion: "4.0.1", + StorageConfig: &cassdcapi.StorageConfig{ + CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{ + StorageClassName: &defaultStorageClass, + }, + }, + }, + }, + }, + }, + } + kcKey := client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name} + + createSuperuserSecret(ctx, t, f, kcKey, kc.Name) + + createReplicatedSecret(ctx, t, f, kcKey, "cluster-0") + setReplicationStatusDone(ctx, t, f, kcKey) + + createCassandraDatacenter(ctx, t, f, kc, 0) + + dc1Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}, K8sContext: k8sCtx0} + + dc := &cassdcapi.CassandraDatacenter{} + err := f.Get(ctx, dc1Key, dc) + require.NoError(err) + + err = f.SetDatacenterStatusReady(ctx, dc1Key) + require.NoError(err, "failed to set dc1 status ready") + + err = f.Client.Create(ctx, kc) + require.NoError(err, "failed to create K8ssandraCluster") + + t.Log("wait for the CassandraInitialized condition to be set") + require.Eventually(func() bool { + kc := &api.K8ssandraCluster{} + err := f.Client.Get(ctx, kcKey, kc) + if err != nil { + return false + } + initialized := kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionTrue + return initialized && len(kc.Status.Datacenters) > 0 + }, timeout, interval, "timed out waiting for CassandraInitialized condition check") + + return kc +} + +func addDcSetupForMultiDc(ctx context.Context, t *testing.T, f *framework.Framework, namespace string) *api.K8ssandraCluster { + require := require.New(t) + kc := &api.K8ssandraCluster{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: "add-dc-test", + }, + Spec: api.K8ssandraClusterSpec{ + Cassandra: &api.CassandraClusterTemplate{ + ServerVersion: "4.0.1", + StorageConfig: &cassdcapi.StorageConfig{ + CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{ + StorageClassName: &defaultStorageClass, + }, + }, + Datacenters: []api.CassandraDatacenterTemplate{ + { + Meta: api.EmbeddedObjectMeta{ + Name: "dc1", + }, + K8sContext: k8sCtx0, + Size: 3, + }, + { + Meta: api.EmbeddedObjectMeta{ + Name: "dc2", + }, + K8sContext: k8sCtx1, + Size: 3, + }, + }, + }, + }, + } + kcKey := client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name} + + createSuperuserSecret(ctx, t, f, kcKey, kc.Name) + + createReplicatedSecret(ctx, t, f, kcKey, k8sCtx0, k8sCtx1) + setReplicationStatusDone(ctx, t, f, kcKey) + + createCassandraDatacenter(ctx, t, f, kc, 0) + + dc1Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc1"}, K8sContext: k8sCtx0} + dc := &cassdcapi.CassandraDatacenter{} + err := f.Get(ctx, dc1Key, dc) + require.NoError(err) + + err = f.SetDatacenterStatusReady(ctx, dc1Key) + require.NoError(err, "failed to set dc1 status ready") + + createCassandraDatacenter(ctx, t, f, kc, 1) + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc2"}, K8sContext: k8sCtx1} + dc = &cassdcapi.CassandraDatacenter{} + err = f.Get(ctx, dc2Key, dc) + require.NoError(err) + + err = f.SetDatacenterStatusReady(ctx, dc2Key) + require.NoError(err, "failed to set dc2 status ready") + + err = f.Client.Create(ctx, kc) + require.NoError(err, "failed to create K8ssandraCluster") + + t.Log("wait for the CassandraInitialized condition to be set") + require.Eventually(func() bool { + kc := &api.K8ssandraCluster{} + err := f.Client.Get(ctx, kcKey, kc) + if err != nil { + return false + } + initialized := kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionTrue + return initialized && len(kc.Status.Datacenters) > 1 + }, timeout, interval, "timed out waiting for CassandraInitialized condition check") + + return kc +} + +func addDcTest(ctx context.Context, f *framework.Framework, test addDcTestFunc, single bool) func(*testing.T) { + return func(t *testing.T) { + namespace := rand.String(9) + if err := f.CreateNamespace(namespace); err != nil { + t.Fatalf("failed to create namespace %s: %v", namespace, err) + } + var kc *api.K8ssandraCluster + if single { + kc = addDcSetupForSingleDc(ctx, t, f, namespace) + } else { + kc = addDcSetupForMultiDc(ctx, t, f, namespace) + } + managementApiFactory.Reset() + test(ctx, t, f, kc) + + if err := f.DeleteK8ssandraCluster(ctx, utils.GetKey(kc)); err != nil { + t.Fatalf("failed to delete k8ssandracluster: %v", err) + } + } +} + +// withUserKeyspaces tests adding a DC to a cluster that has user-defined keyspaces. This +// is a happy path test. +func withUserKeyspaces(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + require := require.New(t) + + replication := map[string]int{"dc1": 3} + updatedReplication := map[string]int{"dc1": 3, "dc2": 3} + // We need a version of the map with string values because GetKeyspaceReplication returns + // a map[string]string. + updatedReplicationStr := map[string]string{"class": cassandra.NetworkTopology, "dc1": "3", "dc2": "3"} + + userKeyspaces := []string{"ks1", "ks2"} + + mockMgmtApi := testutils.NewFakeManagementApiFacade() + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.ListKeyspaces, "").Return(userKeyspaces, nil) + mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + + for _, ks := range userKeyspaces { + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, ks, updatedReplication).Return(nil) + mockMgmtApi.On(testutils.GetKeyspaceReplication, ks).Return(updatedReplicationStr, nil) + } + + adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) { + return mockMgmtApi, nil + } + managementApiFactory.SetAdapter(adapter) + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc2"}, K8sContext: k8sCtx1} + + addDcToCluster(ctx, t, f, kc, dc2Key) + + verifyReplicatedSecretReconciled(ctx, t, f, kc) + + t.Log("check that dc2 was created") + require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval, "failed to verify dc2 was created") + + t.Log("update dc2 status to ready") + err := f.SetDatacenterStatusReady(ctx, dc2Key) + require.NoError(err, "failed to set dc2 status ready") + + verifyReplicationOfSystemKeyspacesUpdated(t, mockMgmtApi, replication, updatedReplication) + + for _, ks := range userKeyspaces { + verifyReplicationOfKeyspaceUpdated(t, mockMgmtApi, ks, updatedReplication) + } + + dc1Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc1"}, K8sContext: k8sCtx0} + + verifyRebuildTaskCreated(ctx, t, f, dc2Key, dc1Key) +} + +// configureSrcDcForRebuild tests adding a DC to a cluster and setting the +// api.RebuildSourceDcAnnotation annotation. The test verifies that the rebuild task is +// configured with the specified source dc. +func configureSrcDcForRebuild(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + require := require.New(t) + + replication := map[string]int{"dc1": 3, "dc2": 3} + updatedReplication := map[string]int{"dc1": 3, "dc2": 3, "dc3": 3} + + mockMgmtApi := testutils.NewFakeManagementApiFacade() + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + mockMgmtApi.On(testutils.ListKeyspaces, "").Return([]string{}, nil) + + adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) { + return mockMgmtApi, nil + } + managementApiFactory.SetAdapter(adapter) + + dc3Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc3"}, K8sContext: k8sCtx2} + + kcKey := utils.GetKey(kc) + kc = &api.K8ssandraCluster{} + err := f.Client.Get(ctx, kcKey, kc) + require.NoError(err, "failed to get K8ssandraCluster") + + kc.Annotations[api.RebuildSourceDcAnnotation] = "dc2" + err = f.Client.Update(ctx, kc) + require.NoError(err, "failed to add %s annotation to K8ssandraCluster", api.RebuildSourceDcAnnotation) + + addDcToCluster(ctx, t, f, kc, dc3Key) + + verifyReplicatedSecretReconciled(ctx, t, f, kc) + + t.Log("check that dc3 was created") + require.Eventually(f.DatacenterExists(ctx, dc3Key), timeout, interval, "failed to verify dc2 was created") + + t.Log("update dc3 status to ready") + err = f.SetDatacenterStatusReady(ctx, dc3Key) + require.NoError(err, "failed to set dc3 status ready") + + verifyReplicationOfSystemKeyspacesUpdated(t, mockMgmtApi, replication, updatedReplication) + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc2"}, K8sContext: k8sCtx1} + + verifyRebuildTaskCreated(ctx, t, f, dc3Key, dc2Key) + + rebuildTaskKey := framework.ClusterKey{ + K8sContext: k8sCtx2, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: "dc3-rebuild", + }, + } + setRebuildTaskFinished(ctx, t, f, rebuildTaskKey) +} + +// withStargateAndReaper tests adding a DC to a cluster that also has Stargate and Reaper +// deployed. There are internal keyspaces for both Stargate and Reaper that this test +// verifies get updated. They are internal in that they are created and have their +// replication managed by the operator like Cassandra's system keyspaces. The test also +// verifies that Stargate and Reaper are deployed in the new DC after the rebuild finishes. +func withStargateAndReaper(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + require := require.New(t) + + replication := map[string]int{"dc1": 3} + updatedReplication := map[string]int{"dc1": 3, "dc2": 3} + + mockMgmtApi := testutils.NewFakeManagementApiFacade() + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, stargate.AuthKeyspace, replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, stargate.AuthKeyspace, updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, reaperapi.DefaultKeyspace, replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, reaperapi.DefaultKeyspace, updatedReplication).Return(nil) + mockMgmtApi.On(testutils.ListTables, stargate.AuthKeyspace).Return([]string{stargate.AuthTable}, nil) + mockMgmtApi.On(testutils.ListKeyspaces, "").Return([]string{}, nil) + mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + + adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) { + return mockMgmtApi, nil + } + managementApiFactory.SetAdapter(adapter) + + addStargateAndReaperToCluster(ctx, t, f, kc) + + sg1Key := framework.ClusterKey{ + K8sContext: k8sCtx0, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: kc.Name + "-dc1-stargate", + }, + } + + t.Log("check that stargate sg1 is created") + require.Eventually(f.StargateExists(ctx, sg1Key), timeout, interval) + + t.Logf("update stargate sg1 status to ready") + err := f.SetStargateStatusReady(ctx, sg1Key) + require.NoError(err, "failed to patch stargate status") + + reaper1Key := framework.ClusterKey{ + K8sContext: k8sCtx0, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: kc.Name + "-dc1-reaper", + }, + } + + t.Log("check that reaper reaper1 is created") + require.Eventually(f.ReaperExists(ctx, reaper1Key), timeout, interval) + + t.Logf("update reaper reaper1 status to ready") + err = f.SetReaperStatusReady(ctx, reaper1Key) + require.NoError(err, "failed to patch reaper status") + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc2"}, K8sContext: k8sCtx1} + + addDcToCluster(ctx, t, f, kc, dc2Key) + + verifyReplicatedSecretReconciled(ctx, t, f, kc) + + t.Log("check that dc2 was created") + require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval, "failed to verify dc2 was created") + + t.Log("update dc2 status to ready") + err = f.SetDatacenterStatusReady(ctx, dc2Key) + require.NoError(err, "failed to set dc2 status ready") + + verifyReplicationOfInternalKeyspacesUpdated(t, mockMgmtApi, replication, updatedReplication) + + dc1Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc1"}, K8sContext: k8sCtx0} + + verifyRebuildTaskCreated(ctx, t, f, dc2Key, dc1Key) + + rebuildTaskKey := framework.ClusterKey{ + K8sContext: k8sCtx1, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: "dc2-rebuild", + }, + } + setRebuildTaskFinished(ctx, t, f, rebuildTaskKey) + + sg2Key := framework.ClusterKey{ + K8sContext: k8sCtx1, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: kc.Name + "-dc2-stargate"}, + } + + t.Log("check that stargate sg2 is created") + require.Eventually(f.StargateExists(ctx, sg2Key), timeout, interval, "failed to verify stargate sg2 created") + + t.Logf("update stargate sg2 status to ready") + err = f.SetStargateStatusReady(ctx, sg2Key) + require.NoError(err, "failed to patch stargate status") + + reaper2Key := framework.ClusterKey{ + K8sContext: k8sCtx1, + NamespacedName: types.NamespacedName{ + Namespace: kc.Namespace, + Name: kc.Name + "-dc2-reaper", + }, + } + + t.Log("check that reaper reaper2 is created") + require.Eventually(f.ReaperExists(ctx, reaper2Key), timeout, interval, "failed to verify reaper reaper2 created") +} + +// failSystemKeyspaceUpdate tests adding a DC to an existing cluster and verifying the +// behavior when updating replication of system keyspaces fails. +func failSystemKeyspaceUpdate(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + require := require.New(t) + + replication := map[string]int{"dc1": 3} + updatedReplication := map[string]int{"dc1": 3, "dc2": 3} + + replicationCheckErr := fmt.Errorf("failed to check replication") + + mockMgmtApi := testutils.NewFakeManagementApiFacade() + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", updatedReplication).Return(replicationCheckErr) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(replicationCheckErr) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(replicationCheckErr) + mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + + adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) { + return mockMgmtApi, nil + } + managementApiFactory.SetAdapter(adapter) + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: kc.Namespace, Name: "dc2"}, K8sContext: k8sCtx1} + + addDcToCluster(ctx, t, f, kc, dc2Key) + + verifyReplicatedSecretReconciled(ctx, t, f, kc) + + t.Log("check that dc2 was created") + require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval, "failed to verify dc2 was created") + + t.Log("update dc2 status to ready") + err := f.SetDatacenterStatusReady(ctx, dc2Key) + require.NoError(err, "failed to set dc2 status ready") + + verifyRebuildTaskNotCreated(ctx, t, f, kc.Namespace, dc2Key.Name) +} + +// failUserKeyspaceUpdate tests adding a DC to an existing cluster and verifying behavior +// when updating replication of user-defined keyspaces fails. +func failUserKeyspaceUpdate(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + require := require.New(t) + + replication := map[string]int{"dc1": 3} + updatedReplication := map[string]int{"dc1": 3, "dc2": 3} + + // We need a version of the map with string values because GetKeyspaceReplication returns + // a map[string]string. + updatedReplicationStr := map[string]string{"class": cassandra.NetworkTopology, "dc1": "3", "dc2": "3"} + + userKeyspaces := []string{"ks1", "ks2"} + + replicationCheckErr := fmt.Errorf("failed to check replication") + + mockMgmtApi := testutils.NewFakeManagementApiFacade() + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_auth", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil) + mockMgmtApi.On(testutils.ListKeyspaces, "").Return(userKeyspaces, nil) + mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + + for _, ks := range userKeyspaces { + mockMgmtApi.On(testutils.GetKeyspaceReplication, ks).Return(updatedReplicationStr, nil) + mockMgmtApi.On(testutils.EnsureKeyspaceReplication, ks, updatedReplication).Return(replicationCheckErr) + } + + adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) { + return mockMgmtApi, nil + } + managementApiFactory.SetAdapter(adapter) + + kcKey := utils.GetKey(kc) + namespace := kcKey.Namespace + + dc2Key := framework.ClusterKey{NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc2"}, K8sContext: k8sCtx1} + + addDcToCluster(ctx, t, f, kc, dc2Key) + + verifyReplicatedSecretReconciled(ctx, t, f, kc) + + t.Log("check that dc2 was created") + require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval, "failed to verify dc2 was created") + + t.Log("update dc2 status to ready") + err := f.SetDatacenterStatusReady(ctx, dc2Key) + require.NoError(err, "failed to set dc2 status ready") + + verifyReplicationOfSystemKeyspacesUpdated(t, mockMgmtApi, replication, updatedReplication) + + verifyRebuildTaskNotCreated(ctx, t, f, namespace, dc2Key.Name) +} + +func addStargateAndReaperToCluster(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { + t.Log("add Stargate and Reaper") + + key := utils.GetKey(kc) + err := f.Client.Get(ctx, key, kc) + require.NoError(t, err, "failed to get K8ssandraCluster") + + kc.Spec.Stargate = &stargateapi.StargateClusterTemplate{ + Size: 1, + } + kc.Spec.Reaper = &reaperapi.ReaperClusterTemplate{} + + err = f.Client.Update(ctx, kc) + require.NoError(t, err, "failed to add Stargate and Reaper") +} + +func addDcToCluster(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster, dcKey framework.ClusterKey) { + t.Logf("add %s to cluster", dcKey.Name) + + key := utils.GetKey(kc) + err := f.Client.Get(ctx, key, kc) + require.NoError(t, err, "failed to get K8ssandraCluster") + + kc.Spec.Cassandra.Datacenters = append(kc.Spec.Cassandra.Datacenters, api.CassandraDatacenterTemplate{ + Meta: api.EmbeddedObjectMeta{ + Name: dcKey.Name, + Namespace: dcKey.Namespace, + }, + K8sContext: dcKey.K8sContext, + Size: 3, + ServerVersion: "4.0.1", + StorageConfig: &cassdcapi.StorageConfig{ + CassandraDataVolumeClaimSpec: &corev1.PersistentVolumeClaimSpec{ + StorageClassName: &defaultStorageClass, + }, + }, + }) + annotations.AddAnnotation(kc, api.DcReplicationAnnotation, fmt.Sprintf(`{"%s": {"ks1": 3, "ks2": 3}}`, dcKey.Name)) + + err = f.Client.Update(ctx, kc) + require.NoError(t, err, "failed to add dc to K8ssandraCluster") +} + +func verifyReplicationOfSystemKeyspacesUpdated(t *testing.T, mockMgmtApi *testutils.FakeManagementApiFacade, replication, updatedReplication map[string]int) { + require.Eventually(t, func() bool { + for _, ks := range api.SystemKeyspaces { + if mockMgmtApi.GetFirstCall(testutils.EnsureKeyspaceReplication, ks, updatedReplication) < 0 { + return false + } + + } + return true + }, timeout, interval, "Failed to verify system keyspaces replication updated") + + for _, ks := range api.SystemKeyspaces { + lastCallOriginalReplication := mockMgmtApi.GetLastCall(testutils.EnsureKeyspaceReplication, ks, replication) + firstCallUpdatedReplication := mockMgmtApi.GetFirstCall(testutils.EnsureKeyspaceReplication, ks, updatedReplication) + assert.True(t, firstCallUpdatedReplication > lastCallOriginalReplication) + } +} + +func verifyReplicationOfInternalKeyspacesUpdated(t *testing.T, mockMgmtApi *testutils.FakeManagementApiFacade, replication, updatedReplication map[string]int) { + internalKeyspaces := append(api.SystemKeyspaces, stargate.AuthKeyspace, reaperapi.DefaultKeyspace) + + require.Eventually(t, func() bool { + for _, ks := range internalKeyspaces { + if mockMgmtApi.GetFirstCall(testutils.EnsureKeyspaceReplication, ks, updatedReplication) < 0 { + t.Logf("failed to find updated replication call for keyspace %s with replication %v", ks, updatedReplication) + return false + } + + } + return true + }, timeout*3, time.Second*1, "Failed to verify internal keyspaces replication updated") + + for _, ks := range internalKeyspaces { + lastCallOriginalReplication := mockMgmtApi.GetLastCall(testutils.EnsureKeyspaceReplication, ks, replication) + firstCallUpdatedReplication := mockMgmtApi.GetFirstCall(testutils.EnsureKeyspaceReplication, ks, updatedReplication) + msg := fmt.Sprintf("replication update check failed for keyspace %s: lastCallOriginal(%d), firstCallUpdated(%d)", ks, lastCallOriginalReplication, firstCallUpdatedReplication) + assert.True(t, firstCallUpdatedReplication > lastCallOriginalReplication, msg) + } +} + +func verifyReplicationOfKeyspaceUpdated(t *testing.T, mockMgmtApi *testutils.FakeManagementApiFacade, keyspace string, replication map[string]int) { + require.Eventually(t, func() bool { + return mockMgmtApi.GetFirstCall(testutils.EnsureKeyspaceReplication, keyspace, replication) > -1 + }, timeout, interval, fmt.Sprintf("failed to verify replication for keyspace %s updated", keyspace)) +} + +func verifyRebuildTaskCreated(ctx context.Context, t *testing.T, f *framework.Framework, targetDcKey, srcDcKey framework.ClusterKey) { + t.Log("check that rebuild task was created") + require := require.New(t) + task := &cassctlapi.CassandraTask{} + taskKey := framework.ClusterKey{ + NamespacedName: types.NamespacedName{ + Namespace: targetDcKey.Namespace, + Name: targetDcKey.Name + "-rebuild", + }, + K8sContext: targetDcKey.K8sContext, + } + + require.Eventually(func() bool { + err := f.Get(ctx, taskKey, task) + return err == nil + }, timeout, interval, "failed to get rebuild task") + + require.Equal(corev1.ObjectReference{Namespace: targetDcKey.Namespace, Name: targetDcKey.Name}, task.Spec.Datacenter) + + expectedJobs := []cassctlapi.CassandraJob{ + { + Name: targetDcKey.Name + "-rebuild", + Command: "rebuild", + Arguments: map[string]string{"source_datacenter": srcDcKey.Name}, + }, + } + require.Equal(expectedJobs, task.Spec.Jobs) +} + +func setRebuildTaskFinished(ctx context.Context, t *testing.T, f *framework.Framework, key framework.ClusterKey) { + t.Log("set rebuild task to finished") + + task := &cassctlapi.CassandraTask{} + err := f.Get(ctx, key, task) + require.NoError(t, err, "failed to get rebuild task") + + task.Status.Succeeded = 1 + err = f.UpdateStatus(ctx, key, task) + require.NoError(t, err, "failed to set rebuild task finished") +} + +func verifyRebuildTaskNotCreated(ctx context.Context, t *testing.T, f *framework.Framework, namespace, dcName string) { + t.Log("check that rebuild task is not created") + + taskKey := framework.ClusterKey{ + NamespacedName: types.NamespacedName{Namespace: namespace, Name: dcName + "-rebuild"}, + K8sContext: k8sCtx1, + } + require.Never(t, func() bool { + err := f.Get(ctx, taskKey, &cassctlapi.CassandraTask{}) + return err == nil + }, timeout, interval, "Failed to verify that the rebuild task was not created") +} + +func createCassandraDatacenter(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster, dcIdx int) { + dcTemplate := kc.Spec.Cassandra.Datacenters[dcIdx] + dcConfig := cassandra.Coalesce(kc.Name, kc.Spec.Cassandra, &dcTemplate) + dc, err := cassandra.NewDatacenter(utils.GetKey(kc), dcConfig) + + require.NoError(t, err, "failed to create CassandraDatacenter") + + annotations.AddHashAnnotation(dc) + + namespace := kc.Namespace + if dcTemplate.Meta.Namespace != "" { + namespace = dcTemplate.Meta.Namespace + } + + dcKey := framework.ClusterKey{ + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: dcTemplate.Meta.Name, + }, + K8sContext: dcTemplate.K8sContext, + } + err = f.Create(ctx, dcKey, dc) + require.NoError(t, err, "failed to create cassandradatacenter") +} diff --git a/controllers/k8ssandra/datacenters.go b/controllers/k8ssandra/datacenters.go index 5e79db47f..4aa651592 100644 --- a/controllers/k8ssandra/datacenters.go +++ b/controllers/k8ssandra/datacenters.go @@ -3,9 +3,9 @@ package k8ssandra import ( "context" "fmt" - "github.com/go-logr/logr" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" @@ -17,12 +17,14 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "strings" ) func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, kc *api.K8ssandraCluster, logger logr.Logger) (result.ReconcileResult, []*cassdcapi.CassandraDatacenter) { kcKey := utils.GetKey(kc) - systemReplication, err := r.checkSystemReplication(ctx, kc, logger) + systemReplication, err := r.checkInitialSystemReplication(ctx, kc, logger) if err != nil { logger.Error(err, "System replication check failed") return result.Error(err), nil @@ -37,7 +39,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k } // Reconcile CassandraDatacenter objects only - for _, dcTemplate := range kc.Spec.Cassandra.Datacenters { + for idx, dcTemplate := range kc.Spec.Cassandra.Datacenters { if !secret.HasReplicatedSecrets(ctx, r.Client, kcKey, dcTemplate.K8sContext) { // ReplicatedSecret has not replicated yet, wait until it has logger.Info("Waiting for replication to complete") @@ -66,15 +68,24 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k if medusaResult := r.ReconcileMedusa(ctx, dcConfig, dcTemplate, kc, logger); medusaResult.Completed() { return medusaResult, actualDcs } + desiredDc, err := cassandra.NewDatacenter(kcKey, dcConfig) if err != nil { logger.Error(err, "Failed to create new CassandraDatacenter") return result.Error(err), actualDcs } + annotations.AddHashAnnotation(desiredDc) + dcKey := types.NamespacedName{Namespace: desiredDc.Namespace, Name: desiredDc.Name} logger := logger.WithValues("CassandraDatacenter", dcKey, "K8SContext", dcTemplate.K8sContext) - annotations.AddHashAnnotation(desiredDc) + if idx > 0 { + desiredDc.Annotations[cassdcapi.SkipUserCreationAnnotation] = "true" + } + + if recResult := r.checkRebuildAnnotation(ctx, kc, dcKey.Name); recResult.Completed() { + return recResult, actualDcs + } actualDc := &cassdcapi.CassandraDatacenter{} @@ -89,11 +100,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k } if err = remoteClient.Get(ctx, dcKey, actualDc); err == nil { - // cassdc already exists, we'll update it - if err = r.setStatusForDatacenter(kc, actualDc); err != nil { - logger.Error(err, "Failed to update status for datacenter") - return result.Error(err), actualDcs - } + r.setStatusForDatacenter(kc, actualDc) if !annotations.CompareHashAnnotations(actualDc, desiredDc) { logger.Info("Updating datacenter") @@ -105,23 +112,8 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k logger.Error(err, "SuperuserSecretName is immutable, reverting to existing value in CassandraDatacenter") } - desiredConfig, err := utils.UnmarshalToMap(desiredDc.Spec.Config) - if err != nil { - return result.Error(err), actualDcs - } - actualConfig, err := utils.UnmarshalToMap(actualDc.Spec.Config) - if err != nil { - return result.Error(err), actualDcs - } - - actualCassYaml, foundActualYaml := actualConfig["cassandra-yaml"].(map[string]interface{}) - desiredCassYaml, foundDesiredYaml := desiredConfig["cassandra-yaml"].(map[string]interface{}) - - if foundActualYaml && foundDesiredYaml { - if actualCassYaml["num_tokens"] != desiredCassYaml["num_tokens"] { - err = fmt.Errorf("tried to change num_tokens in an existing datacenter") - return result.Error(err), actualDcs - } + if err := cassandra.ValidateConfig(desiredDc, actualDc); err != nil { + return result.Error(fmt.Errorf("invalid Cassandra config: %v", err)), actualDcs } actualDc = actualDc.DeepCopy() @@ -143,10 +135,16 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k actualDcs = append(actualDcs, actualDc) - if recResult := r.updateReplicationOfSystemKeyspaces(ctx, kc, desiredDc, remoteClient, logger); recResult.Completed() { + if recResult := r.checkSchemas(ctx, kc, actualDc, remoteClient, logger); recResult.Completed() { return recResult, actualDcs } + if annotations.HasAnnotationWithValue(kc, api.RebuildDcAnnotation, dcKey.Name) { + if recResult := r.reconcileDcRebuild(ctx, kc, actualDc, remoteClient, logger); recResult.Completed() { + return recResult, actualDcs + } + } + } else { if errors.IsNotFound(err) { // cassdc doesn't exist, we'll create it @@ -168,7 +166,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k // or as part of an existing cluster. if kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionUnknown { now := metav1.Now() - (&kc.Status).SetCondition(api.K8ssandraClusterCondition{ + kc.Status.SetCondition(api.K8ssandraClusterCondition{ Type: api.CassandraInitialized, Status: corev1.ConditionTrue, LastTransitionTime: &now, @@ -178,7 +176,7 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k return result.Continue(), actualDcs } -func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter) error { +func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter) { if len(kc.Status.Datacenters) == 0 { kc.Status.Datacenters = make(map[string]api.K8ssandraStatus, 0) } @@ -192,6 +190,137 @@ func (r *K8ssandraClusterReconciler) setStatusForDatacenter(kc *api.K8ssandraClu Cassandra: dc.Status.DeepCopy(), } } +} + +func datacenterAddedToExistingCluster(kc *api.K8ssandraCluster, dcName string) bool { + _, found := kc.Status.Datacenters[dcName] + return kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionTrue && !found +} + +func getSourceDatacenterName(targetDc *cassdcapi.CassandraDatacenter, kc *api.K8ssandraCluster) (string, error) { + dcNames := make([]string, 0) + + for _, dc := range kc.Spec.Cassandra.Datacenters { + if dcStatus, found := kc.Status.Datacenters[dc.Meta.Name]; found { + if dcStatus.Cassandra.GetConditionStatus(cassdcapi.DatacenterReady) == corev1.ConditionTrue { + dcNames = append(dcNames, dc.Meta.Name) + } + } + } + + if rebuildFrom, found := kc.Annotations[api.RebuildSourceDcAnnotation]; found { + if rebuildFrom == targetDc.Name { + return "", fmt.Errorf("rebuild error: src dc and target dc cannot be the same") + } + + for _, dc := range dcNames { + if rebuildFrom == dc { + return dc, nil + } + } + + return "", fmt.Errorf("rebuild error: src dc must a ready dc") + } + + for _, dc := range dcNames { + if dc != targetDc.Name { + return dc, nil + } + } + + return "", fmt.Errorf("rebuild error: unable to determine src dc for target dc (%s) from dc list (%s)", + targetDc.Name, strings.Trim(fmt.Sprint(dcNames), "[]")) +} + +func (r *K8ssandraClusterReconciler) checkRebuildAnnotation(ctx context.Context, kc *api.K8ssandraCluster, dcName string) result.ReconcileResult { + if !annotations.HasAnnotationWithValue(kc, api.RebuildDcAnnotation, dcName) && datacenterAddedToExistingCluster(kc, dcName) { + patch := client.MergeFromWithOptions(kc.DeepCopy()) + annotations.AddAnnotation(kc, api.RebuildDcAnnotation, dcName) + if err := r.Client.Patch(ctx, kc, patch); err != nil { + return result.Error(fmt.Errorf("failed to add rebuild annotation: %v", err)) + } + } + + return result.Continue() +} + +func (r *K8ssandraClusterReconciler) reconcileDcRebuild( + ctx context.Context, + kc *api.K8ssandraCluster, + dc *cassdcapi.CassandraDatacenter, + remoteClient client.Client, + logger logr.Logger) result.ReconcileResult { + + logger.Info("Reconciling rebuild") + + srcDc, err := getSourceDatacenterName(dc, kc) + if err != nil { + return result.Error(err) + } + + desiredTask := newRebuildTask(dc.Name, dc.Namespace, srcDc) + taskKey := client.ObjectKey{Namespace: desiredTask.Namespace, Name: desiredTask.Name} + task := &cassctlapi.CassandraTask{} + + if err := remoteClient.Get(ctx, taskKey, task); err == nil { + if taskFinished(task) { + // TODO what should we do if it failed? + logger.Info("Datacenter rebuild finished") + patch := client.MergeFromWithOptions(kc.DeepCopy()) + delete(kc.Annotations, api.RebuildDcAnnotation) + if err = r.Client.Patch(ctx, kc, patch); err != nil { + err = fmt.Errorf("failed to remove %s annotation: %v", api.RebuildDcAnnotation, err) + return result.Error(err) + } + return result.Continue() + } else { + logger.Info("Waiting for datacenter rebuild to complete", "Task", taskKey) + return result.RequeueSoon(15) + } + } else { + if errors.IsNotFound(err) { + logger.Info("Creating rebuild task", "Task", taskKey) + if err = remoteClient.Create(ctx, desiredTask); err != nil { + logger.Error(err, "Failed to create rebuild task", "Task", taskKey) + return result.Error(err) + } + return result.RequeueSoon(15) + } + logger.Error(err, "Failed to get rebuild task", "Task", taskKey) + return result.Error(err) + } +} + +func taskFinished(task *cassctlapi.CassandraTask) bool { + return len(task.Spec.Jobs) == int(task.Status.Succeeded+task.Status.Failed) +} + +func newRebuildTask(targetDc, namespace, srcDc string) *cassctlapi.CassandraTask { + now := metav1.Now() + task := &cassctlapi.CassandraTask{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: targetDc + "-rebuild", + }, + Spec: cassctlapi.CassandraTaskSpec{ + ScheduledTime: &now, + Datacenter: corev1.ObjectReference{ + Namespace: namespace, + Name: targetDc, + }, + Jobs: []cassctlapi.CassandraJob{ + { + Name: targetDc + "-rebuild", + Command: "rebuild", + Arguments: map[string]string{ + "source_datacenter": srcDc, + }, + }, + }, + }, + } + + annotations.AddHashAnnotation(task) - return nil + return task } diff --git a/controllers/k8ssandra/k8ssandracluster_controller.go b/controllers/k8ssandra/k8ssandracluster_controller.go index c1cd8e507..78f4464f8 100644 --- a/controllers/k8ssandra/k8ssandracluster_controller.go +++ b/controllers/k8ssandra/k8ssandracluster_controller.go @@ -62,6 +62,7 @@ type K8ssandraClusterReconciler struct { // +kubebuilder:rbac:groups=k8ssandra.io,namespace="k8ssandra",resources=k8ssandraclusters/status,verbs=get;update;patch // +kubebuilder:rbac:groups=k8ssandra.io,namespace="k8ssandra",resources=k8ssandraclusters/finalizers,verbs=update // +kubebuilder:rbac:groups=cassandra.datastax.com,namespace="k8ssandra",resources=cassandradatacenters,verbs=get;list;watch;create;update;delete;patch +// +kubebuilder:rbac:groups=control.k8ssandra.io,namespace="k8ssandra",resources=cassandratasks,verbs=get;list;watch;create;update;delete;patch // +kubebuilder:rbac:groups=stargate.k8ssandra.io,namespace="k8ssandra",resources=stargates,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=reaper.k8ssandra.io,namespace="k8ssandra",resources=reapers,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,namespace="k8ssandra",resources=pods;secrets,verbs=get;list;watch @@ -135,17 +136,9 @@ func (r *K8ssandraClusterReconciler) reconcile(ctx context.Context, kc *api.K8ss actualDcs = dcs } - kcLogger.Info("All dcs reconciled") + kcLogger.Info("All DCs reconciled") - if recResult := r.reconcileStargateAuthSchema(ctx, kc, actualDcs, kcLogger); recResult.Completed() { - return recResult.Output() - } - - if recResult := r.reconcileReaperSchema(ctx, kc, actualDcs, kcLogger); recResult.Completed() { - return recResult.Output() - } - - if recResult := r.reconcileStargateAndReaper(ctx, kc, actualDcs, kcLogger); recResult.Completed() { + if recResult := r.afterCassandraReconciled(ctx, kc, actualDcs, kcLogger); recResult.Completed() { return recResult.Output() } @@ -154,7 +147,7 @@ func (r *K8ssandraClusterReconciler) reconcile(ctx context.Context, kc *api.K8ss return result.Done().Output() } -func (r *K8ssandraClusterReconciler) reconcileStargateAndReaper(ctx context.Context, kc *api.K8ssandraCluster, dcs []*cassdcapi.CassandraDatacenter, logger logr.Logger) result.ReconcileResult { +func (r *K8ssandraClusterReconciler) afterCassandraReconciled(ctx context.Context, kc *api.K8ssandraCluster, dcs []*cassdcapi.CassandraDatacenter, logger logr.Logger) result.ReconcileResult { for i, dcTemplate := range kc.Spec.Cassandra.Datacenters { dc := dcs[i] dcKey := utils.GetKey(dc) diff --git a/controllers/k8ssandra/k8ssandracluster_controller_test.go b/controllers/k8ssandra/k8ssandracluster_controller_test.go index cb438124f..78ce08808 100644 --- a/controllers/k8ssandra/k8ssandracluster_controller_test.go +++ b/controllers/k8ssandra/k8ssandracluster_controller_test.go @@ -8,19 +8,14 @@ import ( "testing" "time" - "github.com/k8ssandra/cass-operator/pkg/httphelper" telemetryapi "github.com/k8ssandra/k8ssandra-operator/apis/telemetry/v1alpha1" - "github.com/k8ssandra/k8ssandra-operator/pkg/mocks" - "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" + "github.com/k8ssandra/k8ssandra-operator/pkg/labels" "github.com/k8ssandra/k8ssandra-operator/pkg/utils" promapi "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - "github.com/stretchr/testify/mock" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/utils/pointer" "github.com/Jeffail/gabs" - "github.com/go-logr/logr" - "strconv" "strings" @@ -52,18 +47,26 @@ import ( const ( timeout = time.Second * 5 interval = time.Millisecond * 500 + + k8sCtx0 = "cluster-0" + k8sCtx1 = "cluster-1" + k8sCtx2 = "cluster-2" ) var ( - defaultStorageClass = "default" - testEnv *testutils.MultiClusterTestEnv - managementApi = &fakeManagementApiFactory{} + defaultStorageClass = "default" + testEnv *testutils.MultiClusterTestEnv + managementApiFactory = &testutils.FakeManagementApiFactory{} ) func TestK8ssandraCluster(t *testing.T) { ctx := testutils.TestSetup(t) ctx, cancel := context.WithCancel(ctx) - testEnv = &testutils.MultiClusterTestEnv{} + testEnv = &testutils.MultiClusterTestEnv{ + BeforeTest: func() { + managementApiFactory.Reset() + }, + } reconcilerConfig := config.InitConfig() @@ -76,7 +79,7 @@ func TestK8ssandraCluster(t *testing.T) { Client: mgr.GetClient(), Scheme: scheme.Scheme, ClientCache: clientCache, - ManagementApi: managementApi, + ManagementApi: managementApiFactory, }).SetupWithManager(mgr, clusters) return err }) @@ -89,6 +92,7 @@ func TestK8ssandraCluster(t *testing.T) { t.Run("CreateSingleDcCluster", testEnv.ControllerTest(ctx, createSingleDcCluster)) t.Run("CreateMultiDcCluster", testEnv.ControllerTest(ctx, createMultiDcCluster)) + t.Run("AddDcToExistingCluster", testEnv.ControllerTest(ctx, addDc)) t.Run("ApplyClusterTemplateConfigs", testEnv.ControllerTest(ctx, applyClusterTemplateConfigs)) t.Run("ApplyDatacenterTemplateConfigs", testEnv.ControllerTest(ctx, applyDatacenterTemplateConfigs)) t.Run("ApplyClusterTemplateAndDatacenterTemplateConfigs", testEnv.ControllerTest(ctx, applyClusterTemplateAndDatacenterTemplateConfigs)) @@ -296,9 +300,6 @@ func applyClusterTemplateConfigs(t *testing.T, ctx context.Context, f *framework require := require.New(t) assert := assert.New(t) - k8sCtx0 := "cluster-0" - k8sCtx1 := "cluster-1" - clusterName := "cluster-configs" superUserSecretName := "test-superuser" serverVersion := "4.0.0" @@ -424,9 +425,6 @@ func applyDatacenterTemplateConfigs(t *testing.T, ctx context.Context, f *framew require := require.New(t) assert := assert.New(t) - k8sCtx0 := "cluster-0" - k8sCtx1 := "cluster-1" - clusterName := "cluster-configs" serverVersion := "4.0.0" dc1Size := int32(12) @@ -582,9 +580,6 @@ func applyClusterTemplateAndDatacenterTemplateConfigs(t *testing.T, ctx context. require := require.New(t) assert := assert.New(t) - k8sCtx0 := "cluster-0" - k8sCtx1 := "cluster-1" - clusterName := "cluster-configs" serverVersion := "4.0.0" dc1Size := int32(12) @@ -862,11 +857,6 @@ func createMultiDcCluster(t *testing.T, ctx context.Context, f *framework.Framew t.Log("check that dc2 was created") require.Eventually(f.DatacenterExists(ctx, dc2Key), timeout, interval) - t.Log("check that remote seeds are set on dc2") - dc2 = &cassdcapi.CassandraDatacenter{} - err = f.Get(ctx, dc2Key, dc2) - require.NoError(err, "failed to get dc2") - t.Log("update dc2 status to ready") err = f.SetDatacenterStatusReady(ctx, dc2Key) require.NoError(err, "failed to set dc2 status ready") @@ -917,6 +907,76 @@ func createMultiDcCluster(t *testing.T, ctx context.Context, f *framework.Framew verifyObjectDoesNotExist(ctx, t, f, dc2Key, &cassdcapi.CassandraDatacenter{}) } +func createSuperuserSecret(ctx context.Context, t *testing.T, f *framework.Framework, kcKey client.ObjectKey, secretName string) { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kcKey.Namespace, + Name: secretName, + }, + Data: map[string][]byte{}, + } + labels.SetManagedBy(secret, kcKey) + + err := f.Client.Create(ctx, secret) + require.NoError(t, err, "failed to create superuser secret") +} + +func createReplicatedSecret(ctx context.Context, t *testing.T, f *framework.Framework, kcKey client.ObjectKey, replicationTargets ...string) { + rsec := &replicationapi.ReplicatedSecret{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kcKey.Namespace, + Name: kcKey.Name, + }, + Spec: replicationapi.ReplicatedSecretSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels.ManagedByLabels(kcKey), + }, + ReplicationTargets: []replicationapi.ReplicationTarget{}, + }, + Status: replicationapi.ReplicatedSecretStatus{ + Conditions: []replicationapi.ReplicationCondition{ + { + Type: replicationapi.ReplicationDone, + Status: corev1.ConditionTrue, + }, + }, + }, + } + labels.SetManagedBy(rsec, kcKey) + + for _, val := range replicationTargets { + rsec.Spec.ReplicationTargets = append(rsec.Spec.ReplicationTargets, replicationapi.ReplicationTarget{ + Namespace: kcKey.Namespace, + K8sContextName: val, + }) + } + + err := f.Client.Create(ctx, rsec) + require.NoError(t, err, "failed to create replicated secret") +} + +func setReplicationStatusDone(ctx context.Context, t *testing.T, f *framework.Framework, key client.ObjectKey) { + rsec := &replicationapi.ReplicatedSecret{} + err := f.Client.Get(ctx, key, rsec) + require.NoError(t, err, "failed to get ReplicatedSecret", "key", key) + + now := metav1.Now() + conditions := make([]replicationapi.ReplicationCondition, 0) + + for _, target := range rsec.Spec.ReplicationTargets { + conditions = append(conditions, replicationapi.ReplicationCondition{ + Cluster: target.K8sContextName, + Type: replicationapi.ReplicationDone, + Status: corev1.ConditionTrue, + LastTransitionTime: &now, + }) + } + rsec.Status.Conditions = conditions + err = f.Client.Status().Update(ctx, rsec) + + require.NoError(t, err, "Failed to update ReplicationSecret status", "key", key) +} + func createMultiDcClusterWithStargate(t *testing.T, ctx context.Context, f *framework.Framework, namespace string) { require := require.New(t) @@ -1047,14 +1107,7 @@ func createMultiDcClusterWithStargate(t *testing.T, ctx context.Context, f *fram require.True(err != nil && errors.IsNotFound(err), "dc2 should not be created until dc1 is ready") t.Log("update dc1 status to ready") - err = f.PatchDatacenterStatus(ctx, dc1Key, func(dc *cassdcapi.CassandraDatacenter) { - dc.Status.CassandraOperatorProgress = cassdcapi.ProgressReady - dc.SetCondition(cassdcapi.DatacenterCondition{ - Type: cassdcapi.DatacenterReady, - Status: corev1.ConditionTrue, - LastTransitionTime: metav1.Now(), - }) - }) + err = f.SetDatacenterStatusReady(ctx, dc1Key) require.NoError(err, "failed to update dc1 status to ready") t.Log("check that dc2 was created") @@ -1068,14 +1121,7 @@ func createMultiDcClusterWithStargate(t *testing.T, ctx context.Context, f *fram } t.Log("update dc2 status to ready") - err = f.PatchDatacenterStatus(ctx, dc2Key, func(dc *cassdcapi.CassandraDatacenter) { - dc.Status.CassandraOperatorProgress = cassdcapi.ProgressReady - dc.SetCondition(cassdcapi.DatacenterCondition{ - Type: cassdcapi.DatacenterReady, - Status: corev1.ConditionTrue, - LastTransitionTime: metav1.Now(), - }) - }) + err = f.SetDatacenterStatusReady(ctx, dc2Key) require.NoError(err, "failed to update dc2 status to ready") t.Log("check that stargate sg1 is created") @@ -1338,21 +1384,21 @@ func secretExists(f *framework.Framework, ctx context.Context, namespace, secret } func verifySystemReplicationAnnotationSet(ctx context.Context, t *testing.T, f *framework.Framework, kc *api.K8ssandraCluster) { - t.Logf("check that the %s annotation is set", api.SystemReplicationAnnotation) + t.Logf("check that the %s annotation is set", api.InitialSystemReplicationAnnotation) assert.Eventually(t, systemReplicationAnnotationIsSet(t, f, ctx, kc), timeout, interval, "Failed to verify that the system replication annotation was set correctly") } func systemReplicationAnnotationIsSet(t *testing.T, f *framework.Framework, ctx context.Context, kc *api.K8ssandraCluster) func() bool { return func() bool { key := client.ObjectKey{Namespace: kc.Namespace, Name: kc.Name} - expectedReplication := cassandra.ComputeSystemReplication(kc) + expectedReplication := cassandra.ComputeInitialSystemReplication(kc) kc = &api.K8ssandraCluster{} if err := f.Client.Get(ctx, key, kc); err != nil { t.Logf("Failed to check system replication annotation. Could not retrieve the K8ssandraCluster: %v", err) return false } - val, found := kc.Annotations[api.SystemReplicationAnnotation] + val, found := kc.Annotations[api.InitialSystemReplicationAnnotation] if !found { return false } @@ -1440,16 +1486,3 @@ func parseResource(quantity string) *resource.Quantity { parsed := resource.MustParse(quantity) return &parsed } - -type fakeManagementApiFactory struct { -} - -func (f fakeManagementApiFactory) NewManagementApiFacade(context.Context, *cassdcapi.CassandraDatacenter, client.Client, logr.Logger) (cassandra.ManagementApiFacade, error) { - m := new(mocks.ManagementApiFacade) - m.On("EnsureKeyspaceReplication", mock.Anything, mock.Anything).Return(nil) - m.On("ListTables", stargate.AuthKeyspace).Return([]string{"token"}, nil) - m.On("CreateTable", mock.MatchedBy(func(def *httphelper.TableDefinition) bool { - return def.KeyspaceName == stargate.AuthKeyspace && def.TableName == stargate.AuthTable - })).Return(nil) - return m, nil -} diff --git a/controllers/k8ssandra/reaper.go b/controllers/k8ssandra/reaper.go index 34c618bb3..7d9da76cb 100644 --- a/controllers/k8ssandra/reaper.go +++ b/controllers/k8ssandra/reaper.go @@ -18,7 +18,6 @@ package k8ssandra import ( "context" - "github.com/go-logr/logr" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" @@ -37,43 +36,32 @@ import ( func (r *K8ssandraClusterReconciler) reconcileReaperSchema( ctx context.Context, kc *api.K8ssandraCluster, - dcs []*cassdcapi.CassandraDatacenter, - logger logr.Logger, -) result.ReconcileResult { + mgmtApi cassandra.ManagementApiFacade, + logger logr.Logger) result.ReconcileResult { + if !kc.HasReapers() { return result.Continue() } logger.Info("Reconciling Reaper schema") - dcTemplate := kc.Spec.Cassandra.Datacenters[0] - - if remoteClient, err := r.ClientCache.GetRemoteClient(dcTemplate.K8sContext); err != nil { - logger.Error(err, "Failed to get remote client") - return result.Error(err) - } else { - dc := dcs[0] - managementApiFacade, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger) - if err != nil { - logger.Error(err, "Failed to create ManagementApiFacade") - return result.Error(err) - } - keyspace := reaperapi.DefaultKeyspace - if kc.Spec.Reaper != nil && kc.Spec.Reaper.Keyspace != "" { - keyspace = kc.Spec.Reaper.Keyspace - } + if recResult := r.versionCheck(ctx, kc); recResult.Completed() { + return recResult + } - err = managementApiFacade.EnsureKeyspaceReplication( - keyspace, - cassandra.ComputeReplication(3, dcs...), - ) - if err != nil { - logger.Error(err, "Failed to ensure keyspace replication") - return result.Error(err) - } + keyspace := getReaperKeyspace(kc) - return result.Continue() + datacenters := kc.GetReadyDatacenters() + err := mgmtApi.EnsureKeyspaceReplication( + keyspace, + cassandra.ComputeReplicationFromDcTemplates(3, datacenters...), + ) + if err != nil { + logger.Error(err, "Failed to ensure keyspace replication") + return result.Error(err) } + + return result.Continue() } func (r *K8ssandraClusterReconciler) reconcileReaper( @@ -225,3 +213,11 @@ func (r *K8ssandraClusterReconciler) removeReaperStatus(kc *api.K8ssandraCluster } } } + +func getReaperKeyspace(kc *api.K8ssandraCluster) string { + keyspace := reaperapi.DefaultKeyspace + if kc.Spec.Reaper != nil && kc.Spec.Reaper.Keyspace != "" { + keyspace = kc.Spec.Reaper.Keyspace + } + return keyspace +} diff --git a/controllers/k8ssandra/replication.go b/controllers/k8ssandra/replication.go deleted file mode 100644 index 229ae086b..000000000 --- a/controllers/k8ssandra/replication.go +++ /dev/null @@ -1,73 +0,0 @@ -package k8ssandra - -import ( - "context" - "encoding/json" - "github.com/go-logr/logr" - cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" - api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" - "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" - "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" - "github.com/k8ssandra/k8ssandra-operator/pkg/result" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -// checkSystemReplication checks for the SystemReplicationAnnotation on kc. If found, the -// JSON value is unmarshalled and returned. If not found, the SystemReplication is computed -// and is stored in the SystemReplicationAnnotation on kc. The value is JSON-encoded. -// Lastly, kc is patched so that the changes are persisted, -func (r *K8ssandraClusterReconciler) checkSystemReplication(ctx context.Context, kc *api.K8ssandraCluster, logger logr.Logger) (*cassandra.SystemReplication, error) { - if val := annotations.GetAnnotation(kc, api.SystemReplicationAnnotation); val != "" { - replication := &cassandra.SystemReplication{} - if err := json.Unmarshal([]byte(val), replication); err == nil { - return replication, nil - } else { - return nil, err - } - } - - replication := cassandra.ComputeSystemReplication(kc) - bytes, err := json.Marshal(replication) - - if err != nil { - logger.Error(err, "Failed to marshal SystemReplication", "SystemReplication", replication) - return nil, err - } - - patch := client.MergeFromWithOptions(kc.DeepCopy()) - if kc.Annotations == nil { - kc.Annotations = make(map[string]string) - } - kc.Annotations[api.SystemReplicationAnnotation] = string(bytes) - if err = r.Patch(ctx, kc, patch); err != nil { - logger.Error(err, "Failed to apply "+api.SystemReplicationAnnotation+" patch") - return nil, err - } - - return &replication, nil -} - -// updateReplicationOfSystemKeyspaces ensures that the replication for the system_auth, -// system_traces, and system_distributed keyspaces is up to date. It ensures that there are -// replicas for each DC and that there is a max of 3 replicas per DC. -func (r *K8ssandraClusterReconciler) updateReplicationOfSystemKeyspaces(ctx context.Context, kc *api.K8ssandraCluster, dc *cassdcapi.CassandraDatacenter, remoteClient client.Client, logger logr.Logger) result.ReconcileResult { - managementApiFacade, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger) - if err != nil { - logger.Error(err, "Failed to create ManagementApiFacade") - return result.Error(err) - } - - keyspaces := []string{"system_traces", "system_distributed", "system_auth"} - replication := cassandra.ComputeReplicationFromDcTemplates(3, kc.Spec.Cassandra.Datacenters...) - - logger.Info("Preparing to update replication for system keyspaces", "replication", replication) - - for _, ks := range keyspaces { - if err := managementApiFacade.EnsureKeyspaceReplication(ks, replication); err != nil { - logger.Error(err, "Failed to update replication", "keyspace", ks) - return result.Error(err) - } - } - - return result.Continue() -} diff --git a/controllers/k8ssandra/schemas.go b/controllers/k8ssandra/schemas.go new file mode 100644 index 000000000..4669f3916 --- /dev/null +++ b/controllers/k8ssandra/schemas.go @@ -0,0 +1,305 @@ +package k8ssandra + +import ( + "context" + "encoding/json" + "fmt" + "github.com/go-logr/logr" + cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" + "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" + "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" + "github.com/k8ssandra/k8ssandra-operator/pkg/result" + "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" + "github.com/k8ssandra/k8ssandra-operator/pkg/utils" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "time" +) + +func (r *K8ssandraClusterReconciler) checkSchemas( + ctx context.Context, + kc *api.K8ssandraCluster, + dc *cassdcapi.CassandraDatacenter, + remoteClient client.Client, + logger logr.Logger) result.ReconcileResult { + + mgmtApi, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger) + if err != nil { + return result.Error(err) + } + + if recResult := r.checkSchemaAgreement(mgmtApi, logger); recResult.Completed() { + return recResult + } + + if recResult := r.updateReplicationOfSystemKeyspaces(ctx, kc, mgmtApi, logger); recResult.Completed() { + return recResult + } + + if recResult := r.reconcileStargateAuthSchema(ctx, kc, mgmtApi, logger); recResult.Completed() { + return recResult + } + + if recResult := r.reconcileReaperSchema(ctx, kc, mgmtApi, logger); recResult.Completed() { + return recResult + } + + if annotations.HasAnnotationWithValue(kc, api.RebuildDcAnnotation, dc.Name) { + if recResult := r.updateUserKeyspacesReplication(kc, dc, mgmtApi, logger); recResult.Completed() { + return recResult + } + } + + return result.Continue() +} + +func (r *K8ssandraClusterReconciler) checkSchemaAgreement(mgmtApi cassandra.ManagementApiFacade, logger logr.Logger) result.ReconcileResult { + + versions, err := mgmtApi.GetSchemaVersions() + if err != nil { + return result.Error(err) + } + + if len(versions) == 1 { + return result.Continue() + } + + logger.Info("There is schema disagreement", "versions", len(versions)) + + return result.RequeueSoon(r.DefaultDelay) +} + +// checkInitialSystemReplication checks for the InitialSystemReplicationAnnotation on kc. If found, the +// JSON value is unmarshalled and returned. If not found, the SystemReplication is computed +// and is stored in the InitialSystemReplicationAnnotation on kc. The value is JSON-encoded. +// Lastly, kc is patched so that the changes are persisted, +func (r *K8ssandraClusterReconciler) checkInitialSystemReplication( + ctx context.Context, + kc *api.K8ssandraCluster, + logger logr.Logger) (*cassandra.SystemReplication, error) { + + if val := annotations.GetAnnotation(kc, api.InitialSystemReplicationAnnotation); val != "" { + replication := &cassandra.SystemReplication{} + if err := json.Unmarshal([]byte(val), replication); err == nil { + return replication, nil + } else { + return nil, err + } + } + + replication := cassandra.ComputeInitialSystemReplication(kc) + bytes, err := json.Marshal(replication) + + if err != nil { + logger.Error(err, "Failed to marshal SystemReplication", "SystemReplication", replication) + return nil, err + } + + patch := client.MergeFromWithOptions(kc.DeepCopy()) + if kc.Annotations == nil { + kc.Annotations = make(map[string]string) + } + kc.Annotations[api.InitialSystemReplicationAnnotation] = string(bytes) + if err = r.Patch(ctx, kc, patch); err != nil { + logger.Error(err, "Failed to apply "+api.InitialSystemReplicationAnnotation+" patch") + return nil, err + } + + return &replication, nil +} + +// updateReplicationOfSystemKeyspaces ensures that the replication for the system_auth, +// system_traces, and system_distributed keyspaces is up to date. It ensures that there are +// replicas for each DC and that there is a max of 3 replicas per DC. +func (r *K8ssandraClusterReconciler) updateReplicationOfSystemKeyspaces( + ctx context.Context, + kc *api.K8ssandraCluster, + mgmtApi cassandra.ManagementApiFacade, + logger logr.Logger) result.ReconcileResult { + + if recResult := r.versionCheck(ctx, kc); recResult.Completed() { + return recResult + } + + datacenters := cassandra.GetDatacentersForSystemReplication(kc) + replication := cassandra.ComputeReplicationFromDcTemplates(3, datacenters...) + + logger.Info("Preparing to update replication for system keyspaces", "replication", replication) + + for _, ks := range api.SystemKeyspaces { + if err := mgmtApi.EnsureKeyspaceReplication(ks, replication); err != nil { + logger.Error(err, "Failed to update replication", "keyspace", ks) + return result.Error(err) + } + } + + return result.Continue() +} + +// updateUserKeyspacesReplication updates the replication factor of user-defined keyspaces. +// The K8ssandraCluster must specify the k8ssandra.io/dc-replication in order for any +// updates to be applied. The annotation can specify multiple DCs but only the DC just +// added will be considered for replication changes. For example, if dc2 is added to a +// cluster that has dc1 and if the annotation specifies changes for both dc1 and dc2, only +// changes for dc2 will be applied. Replication for all user-defined keyspaces must be +// specified; otherwise an error is returned. This is required to avoid surprises for the +// user. +func (r *K8ssandraClusterReconciler) updateUserKeyspacesReplication( + kc *api.K8ssandraCluster, + dc *cassdcapi.CassandraDatacenter, + mgmtApi cassandra.ManagementApiFacade, + logger logr.Logger) result.ReconcileResult { + + jsonReplication := annotations.GetAnnotation(kc, api.DcReplicationAnnotation) + if jsonReplication == "" { + logger.Info(api.DcReplicationAnnotation + " not set. Replication for user keyspaces will not be updated") + return result.Continue() + } + + logger.Info("Updating replication for user keyspaces") + + userKeyspaces, err := getUserKeyspaces(mgmtApi, kc) + if err != nil { + logger.Error(err, "Failed to get user keyspaces") + return result.Error(err) + } + + replication, err := cassandra.ParseReplication([]byte(jsonReplication)) + if err != nil { + logger.Error(err, "Failed to parse replication") + return result.Error(err) + } + + // The replication object can specify multiple DCs, new ones to be added to the cluster + // as well as existing DCs. We need to be careful about a couple of things. First, we do + // not want to modify the replication for existing DCs since this is not intended as a + // general purpose mechanism for managing keyspace replication. Secondly, we need to + // make sure we only update the replication for the DC just added to the C* cluster and + // not others which have been added to the K8ssandraCluster but not yet rolled out. This + // is because Cassandra 4 does not allow you to update a keyspace's replication with a + // non-existent DC. + + // This is validation check to make sure the user specifies all user keyspaces for each + // DC listed in the annotation. We want to force the user to be explicit to avoid any + // surprises. + if !replication.EachDcContainsKeyspaces(userKeyspaces...) { + err = fmt.Errorf("the %s annotation must include all user keyspaces for each specified DC", api.DcReplicationAnnotation) + logger.Error(err, "Invalid "+api.DcReplicationAnnotation+" annotation") + return result.Error(err) + } + + replication = getReplicationForDeployedDcs(kc, replication) + + logger.Info("computed replication") + + for _, ks := range userKeyspaces { + replicationFactor := replication.ReplicationFactor(dc.Name, ks) + logger.Info("computed replication factor", "keyspace", ks, "replication_factor", replicationFactor) + if replicationFactor == 0 { + continue + } + if err = ensureKeyspaceReplication(mgmtApi, ks, dc.Name, replicationFactor); err != nil { + logger.Error(err, "Keyspace replication check failed", "Keyspace", ks) + return result.Error(err) + } + } + + return result.Continue() +} + +func getUserKeyspaces(mgmtApi cassandra.ManagementApiFacade, kc *api.K8ssandraCluster) ([]string, error) { + keyspaces, err := mgmtApi.ListKeyspaces("") + if err != nil { + return nil, err + } + + internalKeyspaces := getInternalKeyspaces(kc) + userKeyspaces := make([]string, 0) + + for _, ks := range keyspaces { + if !utils.SliceContains(internalKeyspaces, ks) { + userKeyspaces = append(userKeyspaces, ks) + } + } + + return userKeyspaces, nil +} + +// getInternalKeyspaces returns all internal Cassandra keyspaces as well as the Stargate +// auth and Reaper keyspaces if Stargate and Reaper are enabled. +func getInternalKeyspaces(kc *api.K8ssandraCluster) []string { + keyspaces := api.SystemKeyspaces + + if kc.HasStargates() { + keyspaces = append(keyspaces, stargate.AuthKeyspace) + } + + if kc.HasReapers() { + keyspaces = append(keyspaces, getReaperKeyspace(kc)) + } + + keyspaces = append(keyspaces, "system", "system_schema", "system_views", "system_virtual_schema") + return keyspaces +} + +// getReplicationForDeployedDcs gets the replication for only those DCs that have already +// been deployed. The replication argument may include DCs that have not yet been deployed. +func getReplicationForDeployedDcs(kc *api.K8ssandraCluster, replication *cassandra.Replication) *cassandra.Replication { + dcNames := make([]string, 0) + for _, template := range kc.GetInitializedDatacenters() { + dcNames = append(dcNames, template.Meta.Name) + } + + return replication.ForDcs(dcNames...) +} + +func ensureKeyspaceReplication(mgmtApi cassandra.ManagementApiFacade, ks, dcName string, replicationFactor int) error { + replication, err := getKeyspaceReplication(mgmtApi, ks) + if err != nil { + return err + } + + replication[dcName] = replicationFactor + + return mgmtApi.EnsureKeyspaceReplication(ks, replication) +} + +// getKeyspaceReplication returns a map of DCs to their replica counts for ks. +func getKeyspaceReplication(mgmtApi cassandra.ManagementApiFacade, ks string) (map[string]int, error) { + settings, err := mgmtApi.GetKeyspaceReplication(ks) + if err != nil { + return nil, err + } + + replication := make(map[string]int) + for k, v := range settings { + if k == "class" { + continue + } + count, err := strconv.Atoi(v) + if err != nil { + return nil, err + } + replication[k] = count + } + + return replication, nil +} + +func (r *K8ssandraClusterReconciler) versionCheck(ctx context.Context, kc *api.K8ssandraCluster) result.ReconcileResult { + kcCopy := kc.DeepCopy() + patch := client.MergeFromWithOptions(kc.DeepCopy(), client.MergeFromWithOptimisticLock{}) + if err := r.ClientCache.GetLocalClient().Patch(ctx, kc, patch); err != nil { + if errors.IsConflict(err) { + return result.RequeueSoon(1 * time.Second) + } + return result.Error(fmt.Errorf("k8ssandracluster version check failed: %v", err)) + } + // Need to copy the status here as in-memory status updates can be lost by results + // returned from the api server. + kc.Status = kcCopy.Status + + return result.Continue() +} diff --git a/controllers/k8ssandra/stargate.go b/controllers/k8ssandra/stargate.go index ea06a57e9..c5ce8b489 100644 --- a/controllers/k8ssandra/stargate.go +++ b/controllers/k8ssandra/stargate.go @@ -7,6 +7,7 @@ import ( api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" stargateapi "github.com/k8ssandra/k8ssandra-operator/apis/stargate/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" + "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" "github.com/k8ssandra/k8ssandra-operator/pkg/labels" "github.com/k8ssandra/k8ssandra-operator/pkg/result" "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" @@ -157,30 +158,25 @@ func (r *K8ssandraClusterReconciler) setStatusForStargate(kc *api.K8ssandraClust func (r *K8ssandraClusterReconciler) reconcileStargateAuthSchema( ctx context.Context, kc *api.K8ssandraCluster, - dcs []*cassdcapi.CassandraDatacenter, - logger logr.Logger, -) result.ReconcileResult { + mgmtApi cassandra.ManagementApiFacade, + logger logr.Logger) result.ReconcileResult { + if !kc.HasStargates() { return result.Continue() } - dcTemplate := kc.Spec.Cassandra.Datacenters[0] + if recResult := r.versionCheck(ctx, kc); recResult.Completed() { + return recResult + } + + datacenters := kc.GetReadyDatacenters() + replication := cassandra.ComputeReplicationFromDcTemplates(3, datacenters...) - if remoteClient, err := r.ClientCache.GetRemoteClient(dcTemplate.K8sContext); err != nil { - logger.Error(err, "Failed to get remote client") + if err := stargate.ReconcileAuthKeyspace(mgmtApi, replication, logger); err != nil { return result.Error(err) - } else { - dc := dcs[0] - managementApi, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger) - if err != nil { - logger.Error(err, "Failed to create ManagementApiFacade") - return result.Error(err) - } - if err = stargate.ReconcileAuthKeyspace(dcs, managementApi, logger); err != nil { - return result.Error(err) - } - return result.Continue() } + + return result.Continue() } func (r *K8ssandraClusterReconciler) removeStargateStatus(kc *api.K8ssandraCluster, dcName string) { diff --git a/controllers/medusa/controllers_test.go b/controllers/medusa/controllers_test.go index 2dab2a442..4b50e8109 100644 --- a/controllers/medusa/controllers_test.go +++ b/controllers/medusa/controllers_test.go @@ -5,16 +5,10 @@ import ( "testing" "time" - "github.com/go-logr/logr" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" - "github.com/k8ssandra/cass-operator/pkg/httphelper" ctrl "github.com/k8ssandra/k8ssandra-operator/controllers/k8ssandra" - "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" "github.com/k8ssandra/k8ssandra-operator/pkg/clientcache" "github.com/k8ssandra/k8ssandra-operator/pkg/config" - "github.com/k8ssandra/k8ssandra-operator/pkg/mocks" - "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" - "github.com/stretchr/testify/mock" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -32,7 +26,7 @@ var ( defaultStorageClass = "default" testEnv *testutils.MultiClusterTestEnv seedsResolver = &fakeSeedsResolver{} - managementApi = &fakeManagementApiFactory{} + managementApi = &testutils.FakeManagementApiFactory{} medusaClientFactory *fakeMedusaClientFactory ) @@ -135,16 +129,3 @@ type fakeSeedsResolver struct { func (r *fakeSeedsResolver) ResolveSeedEndpoints(ctx context.Context, dc *cassdcapi.CassandraDatacenter, remoteClient client.Client) ([]string, error) { return r.callback(dc) } - -type fakeManagementApiFactory struct { -} - -func (f fakeManagementApiFactory) NewManagementApiFacade(context.Context, *cassdcapi.CassandraDatacenter, client.Client, logr.Logger) (cassandra.ManagementApiFacade, error) { - m := new(mocks.ManagementApiFacade) - m.On("EnsureKeyspaceReplication", mock.Anything, mock.Anything).Return(nil) - m.On("ListTables", stargate.AuthKeyspace).Return([]string{"token"}, nil) - m.On("CreateTable", mock.MatchedBy(func(def *httphelper.TableDefinition) bool { - return def.KeyspaceName == stargate.AuthKeyspace && def.TableName == stargate.AuthTable - })).Return(nil) - return m, nil -} diff --git a/controllers/stargate/stargate_controller.go b/controllers/stargate/stargate_controller.go index 45a00b0af..0439515ca 100644 --- a/controllers/stargate/stargate_controller.go +++ b/controllers/stargate/stargate_controller.go @@ -151,7 +151,7 @@ func (r *StargateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err != nil { logger.Error(err, "Failed to create ManagementApiFacade") return ctrl.Result{}, err - } else if err = stargateutil.ReconcileAuthKeyspace([]*cassdcapi.CassandraDatacenter{actualDc}, managementApi, logger); err != nil { + } else if err = stargateutil.ReconcileAuthKeyspace(managementApi, cassandra.ComputeReplication(3, actualDc), logger); err != nil { return ctrl.Result{}, err } } diff --git a/go.mod b/go.mod index dfaebb6ad..e67b613f9 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,6 @@ require ( ) replace ( - github.com/k8ssandra/cass-operator => github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220108141909-eb7bbb91f9bb k8s.io/api => k8s.io/api v0.22.2 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.22.2 k8s.io/apimachinery => k8s.io/apimachinery v0.22.2 diff --git a/go.sum b/go.sum index f9977a999..bf893a402 100644 --- a/go.sum +++ b/go.sum @@ -482,8 +482,8 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= -github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220108141909-eb7bbb91f9bb h1:ZMScu7aAIwCdCw3UJnWqUMyA7iqXGGdDRKG0xPb6ayA= -github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220108141909-eb7bbb91f9bb/go.mod h1:zsEoRc3auDcbY3GZAGv14mBZ5KTbj3EwMQzmM8/Kl4A= +github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220124145237-9d1c58a5dec6 h1:Mos1nQAFW96Wvmwg/BjoAieTNMJYQSM51vq07SG2+Ks= +github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220124145237-9d1c58a5dec6/go.mod h1:zsEoRc3auDcbY3GZAGv14mBZ5KTbj3EwMQzmM8/Kl4A= github.com/k8ssandra/reaper-client-go v0.3.1-0.20220114183114-6923e077c4f5 h1:Dq0VdM960G3AbhYwFuaebmsE08IzOYHYhngUfDmWaAc= github.com/k8ssandra/reaper-client-go v0.3.1-0.20220114183114-6923e077c4f5/go.mod h1:WsQymIaVT39xbcstZhdqynUS13AGzP2p6U9Hsk1oy5M= github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk= diff --git a/main.go b/main.go index 4db660254..1ed0d98af 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( "strings" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + cassctl "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" "github.com/k8ssandra/k8ssandra-operator/pkg/clientcache" @@ -69,6 +70,7 @@ func init() { utilruntime.Must(k8ssandraiov1alpha1.AddToScheme(scheme)) utilruntime.Must(cassdcapi.AddToScheme(scheme)) + utilruntime.Must(cassctl.AddToScheme(scheme)) utilruntime.Must(replicationapi.AddToScheme(scheme)) utilruntime.Must(stargateapi.AddToScheme(scheme)) utilruntime.Must(configapi.AddToScheme(scheme)) diff --git a/pkg/cassandra/datacenter.go b/pkg/cassandra/datacenter.go index 38a68cd74..51f9b5a30 100644 --- a/pkg/cassandra/datacenter.go +++ b/pkg/cassandra/datacenter.go @@ -2,11 +2,11 @@ package cassandra import ( "fmt" - cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" "github.com/k8ssandra/cass-operator/pkg/reconciliation" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" "github.com/k8ssandra/k8ssandra-operator/pkg/images" + "github.com/k8ssandra/k8ssandra-operator/pkg/utils" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -34,6 +34,52 @@ type SystemReplication struct { ReplicationFactor int `json:"replicationFactor"` } +// Replication provides a mapping of DCs to a mapping of keyspaces and their +// replica counts. NetworkTopologyStrategy is assumed for all keyspaces. +type Replication struct { + datacenters map[string]keyspacesReplication +} + +type keyspacesReplication map[string]int + +// EachDcContainsKeyspaces if every DC contains all the keyspaces. +func (r *Replication) EachDcContainsKeyspaces(keyspaces ...string) bool { + for _, ksMap := range r.datacenters { + for _, ks := range keyspaces { + if _, found := ksMap[ks]; !found { + return false + } + } + } + return true +} + +// ForDcs returns a new Replication that contains only the specifics dcs. +func (r *Replication) ForDcs(dcs ...string) *Replication { + replication := &Replication{datacenters: map[string]keyspacesReplication{}} + + for dc, ksReplication := range r.datacenters { + if utils.SliceContains(dcs, dc) { + ksMap := map[string]int{} + for ks, val := range ksReplication { + ksMap[ks] = val + } + replication.datacenters[dc] = ksMap + } + } + + return replication +} + +func (r *Replication) ReplicationFactor(dc, ks string) int { + if ksMap, found := r.datacenters[dc]; found { + if rf, found := ksMap[ks]; found { + return rf + } + } + return 0 +} + // DatacenterConfig provides the configuration to be applied to the CassandraDatacenter. // A DatacenterConfig is essentially a coalescence of an api.CassandraClusterTemplate and // an api.CassandraDatacenterTemplate. There are global, cluster-wide settings that need @@ -308,3 +354,23 @@ func FindAdditionalVolume(dcConfig *DatacenterConfig, volumeName string) (int, b return -1, false } + +func ValidateConfig(desiredDc, actualDc *cassdcapi.CassandraDatacenter) error { + desiredConfig, err := utils.UnmarshalToMap(desiredDc.Spec.Config) + if err != nil { + return err + } + actualConfig, err := utils.UnmarshalToMap(actualDc.Spec.Config) + if err != nil { + return err + } + + actualCassYaml, foundActualYaml := actualConfig["cassandra-yaml"].(map[string]interface{}) + desiredCassYaml, foundDesiredYaml := desiredConfig["cassandra-yaml"].(map[string]interface{}) + + if (foundActualYaml && foundDesiredYaml) && actualCassYaml["num_tokens"] != desiredCassYaml["num_tokens"] { + return fmt.Errorf("tried to change num_tokens in an existing datacenter") + } + + return nil +} diff --git a/pkg/cassandra/datacenter_test.go b/pkg/cassandra/datacenter_test.go index 21851aa56..b57ebf070 100644 --- a/pkg/cassandra/datacenter_test.go +++ b/pkg/cassandra/datacenter_test.go @@ -355,6 +355,53 @@ func TestNewDatacenter_Fail_NoStorageConfig(t *testing.T) { assert.IsType(t, DCConfigIncomplete{}, err) } +func TestDatacentersReplication(t *testing.T) { + assert := assert.New(t) + + replication := &Replication{ + datacenters: map[string]keyspacesReplication{ + "dc2": { + "ks1": 3, + "ks2": 3, + }, + "dc3": { + "ks1": 5, + "ks2": 1, + "ks3": 7, + }, + "dc4": { + "ks1": 1, + "ks2": 3, + "ks3": 3, + "ks4": 1, + }, + }, + } + + assert.True(replication.EachDcContainsKeyspaces("ks1", "ks2")) + assert.False(replication.EachDcContainsKeyspaces("ks1", "ks2", "ks3")) + + expected := &Replication{ + datacenters: map[string]keyspacesReplication{ + "dc3": { + "ks1": 5, + "ks2": 1, + "ks3": 7, + }, + "dc4": { + "ks1": 1, + "ks2": 3, + "ks3": 3, + "ks4": 1, + }, + }, + } + assert.Equal(expected, replication.ForDcs("dc3", "dc4", "dc5")) + + expected = &Replication{datacenters: map[string]keyspacesReplication{}} + assert.Equal(expected, replication.ForDcs("dc5", "dc6")) +} + // GetDatacenterConfig returns a minimum viable DataCenterConfig. func GetDatacenterConfig() DatacenterConfig { storageClass := "default" diff --git a/pkg/cassandra/management.go b/pkg/cassandra/management.go index d1bbbf56c..d0509df71 100644 --- a/pkg/cassandra/management.go +++ b/pkg/cassandra/management.go @@ -3,6 +3,7 @@ package cassandra import ( "context" "fmt" + "github.com/k8ssandra/k8ssandra-operator/pkg/utils" "strconv" "github.com/go-logr/logr" @@ -91,6 +92,10 @@ type ManagementApiFacade interface { // EnsureKeyspaceReplication checks if the given keyspace has the given replication, and if it does not, // alters it to match the desired replication. EnsureKeyspaceReplication(keyspaceName string, replication map[string]int) error + + // GetSchemaVersions list all of the schema versions know to this node. The map keys are schema version UUIDs. + // The values are list of node IPs. + GetSchemaVersions() (map[string][]string, error) } type defaultManagementApiFacade struct { @@ -290,3 +295,20 @@ func (r *defaultManagementApiFacade) EnsureKeyspaceReplication(keyspaceName stri } } } + +func (r *defaultManagementApiFacade) GetSchemaVersions() (map[string][]string, error) { + pods, err := r.fetchDatacenterPods() + if err != nil { + return nil, err + } + + for _, pod := range pods { + if schemaVersions, err := r.nodeMgmtClient.CallSchemaVersionsEndpoint(&pod); err != nil { + r.logger.V(4).Error(err, "failed to list schema versions", "Pod", pod.Name) + } else { + return schemaVersions, nil + } + } + + return nil, fmt.Errorf("failed to get schema version on all pods in CassandraDatacenter %v", utils.GetKey(r.dc)) +} diff --git a/pkg/cassandra/util.go b/pkg/cassandra/util.go index 35507d30e..04e0be7e2 100644 --- a/pkg/cassandra/util.go +++ b/pkg/cassandra/util.go @@ -1,6 +1,8 @@ package cassandra import ( + "encoding/json" + "fmt" "math" "strconv" "time" @@ -31,14 +33,38 @@ func DatacenterStopping(dc *cassdcapi.CassandraDatacenter) bool { return dc.GetConditionStatus(cassdcapi.DatacenterStopped) == corev1.ConditionTrue && dc.Status.CassandraOperatorProgress == cassdcapi.ProgressUpdating } -func ComputeSystemReplication(kluster *api.K8ssandraCluster) SystemReplication { +// GetDatacentersForSystemReplication determines the DCs that should be included for +// replication. This function should only be used for system keyspaces. Replication for +// system keyspaces is initially set through the management-api, not CQL. This allows us +// to specify non-existent DCs for replication even though Cassandra 4 does not allow that. +// That cannot be done when configuration replication through CQL which is why this func +// should only be used for system keyspaces. +func GetDatacentersForSystemReplication(kc *api.K8ssandraCluster) []api.CassandraDatacenterTemplate { + var datacenters []api.CassandraDatacenterTemplate + if kc.Status.GetConditionStatus(api.CassandraInitialized) == corev1.ConditionTrue { + datacenters = make([]api.CassandraDatacenterTemplate, 0) + for _, dc := range kc.Spec.Cassandra.Datacenters { + if status, found := kc.Status.Datacenters[dc.Meta.Name]; found && status.Cassandra.GetConditionStatus(cassdcapi.DatacenterReady) == corev1.ConditionTrue { + datacenters = append(datacenters, dc) + } + } + } else { + datacenters = kc.Spec.Cassandra.Datacenters + } + return datacenters +} + +func ComputeInitialSystemReplication(kc *api.K8ssandraCluster) SystemReplication { rf := 3.0 - for _, dc := range kluster.Spec.Cassandra.Datacenters { + + datacenters := GetDatacentersForSystemReplication(kc) + + for _, dc := range datacenters { rf = math.Min(rf, float64(dc.Size)) } - dcNames := make([]string, 0, len(kluster.Spec.Cassandra.Datacenters)) - for _, dc := range kluster.Spec.Cassandra.Datacenters { + dcNames := make([]string, 0, len(datacenters)) + for _, dc := range datacenters { dcNames = append(dcNames, dc.Meta.Name) } @@ -66,12 +92,12 @@ func ComputeReplicationFromDcTemplates(maxReplicationPerDc int, datacenters ...a return desiredReplication } -const networkTopology = "org.apache.cassandra.locator.NetworkTopologyStrategy" +const NetworkTopology = "org.apache.cassandra.locator.NetworkTopologyStrategy" func CompareReplications(actualReplication map[string]string, desiredReplication map[string]int) bool { if len(actualReplication) == 0 { return false - } else if class := actualReplication["class"]; class != networkTopology { + } else if class := actualReplication["class"]; class != NetworkTopology { return false } else if len(actualReplication) != len(desiredReplication)+1 { return false @@ -87,3 +113,35 @@ func CompareReplications(actualReplication map[string]string, desiredReplication } return true } + +func ParseReplication(val []byte) (*Replication, error) { + var result map[string]interface{} + + if err := json.Unmarshal(val, &result); err != nil { + return nil, err + } + + dcsReplication := Replication{datacenters: map[string]keyspacesReplication{}} + + for k, v := range result { + ksMap, ok := v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("failed to parse replication") + } + ksReplication := keyspacesReplication{} + for keyspace, replicasVal := range ksMap { + freplicas, ok := replicasVal.(float64) + if !ok { + return nil, fmt.Errorf("failed to parse replication") + } + replicas := int(freplicas) + if replicas < 0 { + return nil, fmt.Errorf("invalid replication") + } + ksReplication[keyspace] = replicas + } + dcsReplication.datacenters[k] = ksReplication + } + + return &dcsReplication, nil +} diff --git a/pkg/cassandra/util_test.go b/pkg/cassandra/util_test.go index f95a11946..831620919 100644 --- a/pkg/cassandra/util_test.go +++ b/pkg/cassandra/util_test.go @@ -99,7 +99,7 @@ func TestComputeSystemReplication(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.got = ComputeSystemReplication(tc.kluster) + tc.got = ComputeInitialSystemReplication(tc.kluster) require.Equal(t, tc.want, tc.got) }) } @@ -207,11 +207,11 @@ func TestCompareReplications(t *testing.T) { {"nil", nil, map[string]int{"dc1": 3}, false}, {"empty", map[string]string{}, map[string]int{"dc1": 3}, false}, {"wrong class", map[string]string{"class": "wrong"}, map[string]int{"dc1": 3}, false}, - {"wrong length", map[string]string{"class": networkTopology, "dc1": "3", "dc2": "3"}, map[string]int{"dc1": 3}, false}, - {"missing dc", map[string]string{"class": networkTopology, "dc2": "3"}, map[string]int{"dc1": 3}, false}, - {"invalid rf", map[string]string{"class": networkTopology, "dc1": "not a number"}, map[string]int{"dc1": 3}, false}, - {"wrong rf", map[string]string{"class": networkTopology, "dc1": "1"}, map[string]int{"dc1": 3}, false}, - {"success", map[string]string{"class": networkTopology, "dc1": "1", "dc2": "3"}, map[string]int{"dc1": 1, "dc2": 3}, true}, + {"wrong length", map[string]string{"class": NetworkTopology, "dc1": "3", "dc2": "3"}, map[string]int{"dc1": 3}, false}, + {"missing dc", map[string]string{"class": NetworkTopology, "dc2": "3"}, map[string]int{"dc1": 3}, false}, + {"invalid rf", map[string]string{"class": NetworkTopology, "dc1": "not a number"}, map[string]int{"dc1": 3}, false}, + {"wrong rf", map[string]string{"class": NetworkTopology, "dc1": "1"}, map[string]int{"dc1": 3}, false}, + {"success", map[string]string{"class": NetworkTopology, "dc1": "1", "dc2": "3"}, map[string]int{"dc1": 1, "dc2": 3}, true}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -220,3 +220,75 @@ func TestCompareReplications(t *testing.T) { }) } } + +func TestParseReplication(t *testing.T) { + tests := []struct { + name string + replication []byte + want *Replication + valid bool + }{ + { + name: "valid replication - single DC", + replication: []byte(`{"dc2": {"ks1": 3, "ks2": 3}}`), + want: &Replication{ + datacenters: map[string]keyspacesReplication{ + "dc2": { + "ks1": 3, + "ks2": 3, + }, + }, + }, + valid: true, + }, + { + name: "valid replication - multiple DCs", + replication: []byte(`{"dc2": {"ks1": 3, "ks2": 3}, "dc3": {"ks1": 5, "ks2": 1}}`), + want: &Replication{ + datacenters: map[string]keyspacesReplication{ + "dc2": { + "ks1": 3, + "ks2": 3, + }, + "dc3": { + "ks1": 5, + "ks2": 1, + }, + }, + }, + valid: true, + }, + { + name: "invalid replication - wrong type", + replication: []byte(`{"dc2": {"ks1": 3, "ks2": 3}, "dc3": {"ks1": 5, "ks2": "1"}}`), + want: nil, + valid: false, + }, + { + name: "invalid replication - replica count < 0", + replication: []byte(`{"dc2": {"ks1": 3, "ks2": 3}, "dc3": {"ks1": 5, "ks2": -1}}`), + want: nil, + valid: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseReplication(tt.replication) + if tt.valid { + require.NoError(t, err) + assert.Equal(t, tt.want, got) + } else { + require.Error(t, err) + } + }) + } +} + +func TestReplicationFactor(t *testing.T) { + replication, err := ParseReplication([]byte(`{"dc2": {"ks1": 3, "ks2": 5}}`)) + require.NoError(t, err) + + assert.Equal(t, 3, replication.ReplicationFactor("dc2", "ks1")) + assert.Equal(t, 0, replication.ReplicationFactor("dc2", "ks3")) + assert.Equal(t, 0, replication.ReplicationFactor("dc3", "ks1")) +} diff --git a/pkg/mocks/ManagementApiFacade.go b/pkg/mocks/ManagementApiFacade.go index 42d65788c..a11d70c39 100644 --- a/pkg/mocks/ManagementApiFacade.go +++ b/pkg/mocks/ManagementApiFacade.go @@ -91,6 +91,29 @@ func (_m *ManagementApiFacade) GetKeyspaceReplication(keyspaceName string) (map[ return r0, r1 } +// GetSchemaVersions provides a mock function with given fields: +func (_m *ManagementApiFacade) GetSchemaVersions() (map[string][]string, error) { + ret := _m.Called() + + var r0 map[string][]string + if rf, ok := ret.Get(0).(func() map[string][]string); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string][]string) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // ListKeyspaces provides a mock function with given fields: keyspaceName func (_m *ManagementApiFacade) ListKeyspaces(keyspaceName string) ([]string, error) { ret := _m.Called(keyspaceName) diff --git a/pkg/stargate/auth_schema.go b/pkg/stargate/auth_schema.go index 437d41821..47cfeebd4 100644 --- a/pkg/stargate/auth_schema.go +++ b/pkg/stargate/auth_schema.go @@ -3,7 +3,6 @@ package stargate import ( "fmt" "github.com/go-logr/logr" - cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" "github.com/k8ssandra/cass-operator/pkg/httphelper" "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" ) @@ -25,8 +24,7 @@ var authTableDefinition = &httphelper.TableDefinition{ // ReconcileAuthKeyspace ensures that the Stargate auth schema exists, has the appropriate replication, and contains the // appropriate tables. -func ReconcileAuthKeyspace(dcs []*cassdcapi.CassandraDatacenter, managementApi cassandra.ManagementApiFacade, logger logr.Logger) error { - replication := cassandra.ComputeReplication(3, dcs...) +func ReconcileAuthKeyspace(managementApi cassandra.ManagementApiFacade, replication map[string]int, logger logr.Logger) error { logger.Info(fmt.Sprintf("Reconciling Stargate auth keyspace %v", AuthKeyspace)) if err := managementApi.EnsureKeyspaceReplication(AuthKeyspace, replication); err != nil { logger.Error(err, "Failed to ensure keyspace replication") diff --git a/pkg/test/mgmtapi.go b/pkg/test/mgmtapi.go new file mode 100644 index 000000000..23b75575a --- /dev/null +++ b/pkg/test/mgmtapi.go @@ -0,0 +1,127 @@ +package test + +import ( + "context" + "github.com/go-logr/logr" + cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + "github.com/k8ssandra/cass-operator/pkg/httphelper" + "github.com/k8ssandra/k8ssandra-operator/pkg/cassandra" + "github.com/k8ssandra/k8ssandra-operator/pkg/mocks" + "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" + "github.com/stretchr/testify/mock" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ManagementApiFactoryAdapter func( + ctx context.Context, + datacenter *cassdcapi.CassandraDatacenter, + client client.Client, + logger logr.Logger) (cassandra.ManagementApiFacade, error) + +var defaultAdapter ManagementApiFactoryAdapter = func( + ctx context.Context, + datacenter *cassdcapi.CassandraDatacenter, + client client.Client, + logger logr.Logger) (cassandra.ManagementApiFacade, error) { + + m := new(mocks.ManagementApiFacade) + m.On(EnsureKeyspaceReplication, mock.Anything, mock.Anything).Return(nil) + m.On(ListTables, stargate.AuthKeyspace).Return([]string{"token"}, nil) + m.On(CreateTable, mock.MatchedBy(func(def *httphelper.TableDefinition) bool { + return def.KeyspaceName == stargate.AuthKeyspace && def.TableName == stargate.AuthTable + })).Return(nil) + m.On(ListKeyspaces, "").Return([]string{}, nil) + m.On(GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil) + return m, nil +} + +type FakeManagementApiFactory struct { + adapter ManagementApiFactoryAdapter +} + +func (f *FakeManagementApiFactory) Reset() { + f.adapter = nil +} + +func (f *FakeManagementApiFactory) SetAdapter(a ManagementApiFactoryAdapter) { + f.adapter = a +} + +func (f *FakeManagementApiFactory) NewManagementApiFacade( + ctx context.Context, + dc *cassdcapi.CassandraDatacenter, + client client.Client, + logger logr.Logger) (cassandra.ManagementApiFacade, error) { + + if f.adapter != nil { + return f.adapter(ctx, dc, client, logger) + } + return defaultAdapter(ctx, dc, client, logger) +} + +type ManagementApiMethod string + +const ( + EnsureKeyspaceReplication = "EnsureKeyspaceReplication" + GetKeyspaceReplication = "GetKeyspaceReplication" + CreateKeyspaceIfNotExists = "CreateKeyspaceIfNotExists" + AlterKeyspace = "AlterKeyspace" + ListKeyspaces = "ListKeyspaces" + CreateTable = "CreateTable" + ListTables = "ListTables" + GetSchemaVersions = "GetSchemaVersions" +) + +type FakeManagementApiFacade struct { + *mocks.ManagementApiFacade +} + +func NewFakeManagementApiFacade() *FakeManagementApiFacade { + return &FakeManagementApiFacade{ManagementApiFacade: new(mocks.ManagementApiFacade)} +} + +func (f *FakeManagementApiFacade) GetLastCall(method ManagementApiMethod, args ...interface{}) int { + idx := -1 + + calls := make([]mock.Call, 0) + for _, call := range f.Calls { + if call.Method == string(method) { + calls = append(calls, call) + } + } + + for i, call := range calls { + if _, count := call.Arguments.Diff(args); count == 0 { + idx = i + } + } + + return idx +} + +func (f *FakeManagementApiFacade) GetFirstCall(method ManagementApiMethod, args ...interface{}) int { + calls := make([]mock.Call, 0) + for _, call := range f.Calls { + if call.Method == string(method) { + calls = append(calls, call) + } + } + + for i, call := range calls { + if _, count := call.Arguments.Diff(args); count == 0 { + return i + } + } + + return -1 +} + +func (f *FakeManagementApiFacade) getCallsForMethod(method ManagementApiMethod) []mock.Call { + calls := make([]mock.Call, 0) + for _, call := range f.Calls { + if call.Method == string(method) { + calls = append(calls, call) + } + } + return calls +} diff --git a/pkg/test/testenv.go b/pkg/test/testenv.go index f463d5622..bc260bf72 100644 --- a/pkg/test/testenv.go +++ b/pkg/test/testenv.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/metrics" cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1" + cassctlapi "github.com/k8ssandra/cass-operator/apis/control/v1alpha1" configapi "github.com/k8ssandra/k8ssandra-operator/apis/config/v1beta1" api "github.com/k8ssandra/k8ssandra-operator/apis/k8ssandra/v1alpha1" medusaapi "github.com/k8ssandra/k8ssandra-operator/apis/medusa/v1alpha1" @@ -112,6 +113,8 @@ type MultiClusterTestEnv struct { testEnvs []*envtest.Environment clustersToCreate int + + BeforeTest func() } func (e *MultiClusterTestEnv) Start(ctx context.Context, t *testing.T, initReconcilers func(mgr manager.Manager, clientCache *clientcache.ClientCache, clusters []cluster.Cluster) error) error { @@ -221,6 +224,10 @@ func (e *MultiClusterTestEnv) ControllerTest(ctx context.Context, test Controlle t.Fatalf("failed to create namespace %s: %v", namespace, err) } + if e.BeforeTest != nil { + e.BeforeTest() + } + test(t, ctx, f, namespace) } } @@ -297,6 +304,10 @@ func registerApis() error { return err } + if err := cassctlapi.AddToScheme(scheme.Scheme); err != nil { + return err + } + if err := stargateapi.AddToScheme(scheme.Scheme); err != nil { return err } diff --git a/scripts/setup-kind-multicluster.sh b/scripts/setup-kind-multicluster.sh index 80c09ab68..44870e61d 100755 --- a/scripts/setup-kind-multicluster.sh +++ b/scripts/setup-kind-multicluster.sh @@ -18,11 +18,21 @@ fi OPTS=$(getopt -o ho --long clusters:,cluster-names:,kind-node-version:,kind-worker-nodes:,overwrite:,help -n 'create-kind-clusters' -- "$@") eval set -- "$OPTS" +function help() { + echo + echo "Syntax: create-kind-clusters.sh [options]" + echo "Options:" + echo "clusters The number of clusters to create." + echo "cluster-names A comma-delimited list of cluster names to create. Takes precedence over clusters option." + echo "kind-node-version The image version of the kind nodes." + echo "kind-worker-nodes The number of worker nodes to deploy." +} + registry_name='kind-registry' registry_port='5000' num_clusters=1 cluster_names="kind" -kind_node_version="v1.22.1" +kind_node_version="v1.22.4" kind_worker_nodes=3 overwrite_clusters="no" @@ -99,7 +109,7 @@ function delete_clusters() { } function create_clusters() { - echo "Creating clusters..." + echo "Creating $num_clusters clusters..." for ((i=0; i<$num_clusters; i++)) do diff --git a/test/e2e/suite_test.go b/test/e2e/suite_test.go index 143cb82d7..dbaee21cb 100644 --- a/test/e2e/suite_test.go +++ b/test/e2e/suite_test.go @@ -4,9 +4,13 @@ import ( "context" "flag" "fmt" + reaperapi "github.com/k8ssandra/k8ssandra-operator/apis/reaper/v1alpha1" + "github.com/k8ssandra/k8ssandra-operator/pkg/annotations" "github.com/k8ssandra/k8ssandra-operator/pkg/labels" + "github.com/k8ssandra/k8ssandra-operator/pkg/stargate" "os" "path/filepath" + "strings" "testing" "time" @@ -71,6 +75,10 @@ func TestOperator(t *testing.T) { testFunc: createMultiDatacenterCluster, fixture: "multi-dc", })) + t.Run("AddDcToCluster", e2eTest(ctx, &e2eTestOpts{ + testFunc: addDcToCluster, + fixture: "add-dc", + })) t.Run("CreateMultiStargateAndDatacenter", e2eTest(ctx, &e2eTestOpts{ testFunc: createStargateAndDatacenter, fixture: "multi-stargate", @@ -616,6 +624,123 @@ func createMultiDatacenterCluster(t *testing.T, ctx context.Context, namespace s assert.NoError(t, err, "timed out waiting for nodetool status check against "+pod) } +func addDcToCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) { + require := require.New(t) + assert := assert.New(t) + + t.Log("check that the K8ssandraCluster was created") + kcKey := client.ObjectKey{Namespace: namespace, Name: "test"} + kc := &api.K8ssandraCluster{} + err := f.Client.Get(ctx, kcKey, kc) + require.NoError(err, "failed to get K8ssandraCluster in namespace %s", namespace) + + k8sCtx0 := "kind-k8ssandra-0" + k8sCtx1 := "kind-k8ssandra-1" + + dc1Key := framework.ClusterKey{ + K8sContext: k8sCtx0, + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "dc1", + }, + } + checkDatacenterReady(t, ctx, dc1Key, f) + + sg1Key := framework.ClusterKey{ + K8sContext: k8sCtx0, + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "test-dc1-stargate", + }, + } + checkStargateReady(t, f, ctx, sg1Key) + + reaper1Key := framework.ClusterKey{ + K8sContext: k8sCtx0, + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "test-dc1-reaper", + }, + } + checkReaperReady(t, f, ctx, reaper1Key) + + t.Log("create keyspaces") + _, err = f.ExecuteCql(ctx, k8sCtx0, namespace, "test", "test-dc1-default-sts-0", + "CREATE KEYSPACE ks1 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 1}") + require.NoError(err, "failed to create keyspace") + + _, err = f.ExecuteCql(ctx, k8sCtx0, namespace, "test", "test-dc1-default-sts-0", + "CREATE KEYSPACE ks2 WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'dc1' : 1}") + require.NoError(err, "failed to create keyspace") + + t.Log("add dc2 to cluster") + err = f.Client.Get(ctx, kcKey, kc) + require.NoError(err, "failed to get K8ssandraCluster %s", kcKey) + + kc.Spec.Cassandra.Datacenters = append(kc.Spec.Cassandra.Datacenters, api.CassandraDatacenterTemplate{ + Meta: api.EmbeddedObjectMeta{ + Name: "dc2", + }, + K8sContext: k8sCtx1, + Size: 1, + }) + annotations.AddAnnotation(kc, api.DcReplicationAnnotation, `{"dc2": {"ks1": 1, "ks2": 1}}`) + + err = f.Client.Update(ctx, kc) + require.NoError(err, "failed to update K8ssandraCluster") + + dc2Key := framework.ClusterKey{K8sContext: "kind-k8ssandra-1", NamespacedName: types.NamespacedName{Namespace: namespace, Name: "dc2"}} + checkDatacenterReady(t, ctx, dc2Key, f) + + t.Log("retrieve database credentials") + username, password, err := f.RetrieveDatabaseCredentials(ctx, namespace, kc.Name) + require.NoError(err, "failed to retrieve database credentials") + + t.Log("check that nodes in dc1 see nodes in dc2") + pod := "test-dc1-default-sts-0" + count := 2 + checkNodeToolStatusUN(t, f, "kind-k8ssandra-0", namespace, pod, count, "-u", username, "-pw", password) + + assert.NoError(err, "timed out waiting for nodetool status check against "+pod) + + t.Log("check nodes in dc2 see nodes in dc1") + pod = "test-dc2-default-sts-0" + checkNodeToolStatusUN(t, f, "kind-k8ssandra-1", namespace, pod, count, "-u", username, "-pw", password) + + assert.NoError(err, "timed out waiting for nodetool status check against "+pod) + + keyspaces := []string{"system_auth", stargate.AuthKeyspace, reaperapi.DefaultKeyspace, "ks1", "ks2"} + for _, ks := range keyspaces { + assert.Eventually(func() bool { + output, err := f.ExecuteCql(ctx, k8sCtx0, namespace, "test", "test-dc1-default-sts-0", + fmt.Sprintf("SELECT replication FROM system_schema.keyspaces WHERE keyspace_name = '%s'", ks)) + if err != nil { + t.Logf("replication check for keyspace %s failed: %v", ks, err) + return false + } + return strings.Contains(output, "'dc1': '1'") && strings.Contains(output, "'dc2': '1'") + }, 1*time.Minute, 5*time.Second, "failed to verify replication updated for keyspace %s", ks) + } + + sg2Key := framework.ClusterKey{ + K8sContext: k8sCtx1, + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "test-dc2-stargate", + }, + } + checkStargateReady(t, f, ctx, sg2Key) + + reaper2Key := framework.ClusterKey{ + K8sContext: k8sCtx1, + NamespacedName: types.NamespacedName{ + Namespace: namespace, + Name: "test-dc2-reaper", + }, + } + checkReaperReady(t, f, ctx, reaper2Key) +} + func checkStargateApisWithMultiDcCluster(t *testing.T, ctx context.Context, namespace string, f *framework.E2eFramework) { require := require.New(t) @@ -885,9 +1010,14 @@ func checkKeyspaceExists( ctx context.Context, k8sContext, namespace, clusterName, pod, keyspace string, ) { - keyspaces, err := f.ExecuteCql(ctx, k8sContext, namespace, clusterName, pod, "describe keyspaces") - require.NoError(t, err, "failed to describe keyspaces") - assert.Contains(t, keyspaces, keyspace) + assert.Eventually(t, func() bool { + keyspaces, err := f.ExecuteCql(ctx, k8sContext, namespace, clusterName, pod, "describe keyspaces") + if err != nil { + t.Logf("failed to desctibe keyspaces: %v", err) + return false + } + return strings.Contains(keyspaces, keyspace) + }, 1*time.Minute, 3*time.Second) } func configureZeroLog() { diff --git a/test/framework/framework.go b/test/framework/framework.go index e32619f09..9146ab30f 100644 --- a/test/framework/framework.go +++ b/test/framework/framework.go @@ -135,6 +135,32 @@ func (f *Framework) List(ctx context.Context, key ClusterKey, obj client.ObjectL return remoteClient.List(ctx, obj, opts...) } +func (f *Framework) Update(ctx context.Context, key ClusterKey, obj client.Object) error { + if len(key.K8sContext) == 0 { + return fmt.Errorf("the K8sContext must be specified for key %s", key) + } + + remoteClient, found := f.remoteClients[key.K8sContext] + if !found { + return fmt.Errorf("no remote client found for context %s", key.K8sContext) + } + + return remoteClient.Update(ctx, obj) +} + +func (f *Framework) UpdateStatus(ctx context.Context, key ClusterKey, obj client.Object) error { + if len(key.K8sContext) == 0 { + return fmt.Errorf("the K8sContext must be specified for key %s", key) + } + + remoteClient, found := f.remoteClients[key.K8sContext] + if !found { + return fmt.Errorf("no remote client found for context %s", key.K8sContext) + } + + return remoteClient.Status().Update(ctx, obj) +} + func (f *Framework) Patch(ctx context.Context, obj client.Object, patch client.Patch, key ClusterKey, opts ...client.PatchOption) error { if len(key.K8sContext) == 0 { return fmt.Errorf("the K8sContext must be specified for key %s", key) @@ -180,7 +206,8 @@ func (f *Framework) k8sContextNotFound(k8sContext string) error { } // SetDatacenterStatusReady fetches the CassandraDatacenter specified by key and persists -// a status update to make the CassandraDatacenter ready. +// a status update to make the CassandraDatacenter ready. It sets the DatacenterReady and +// DatacenterInitialized conditions to true. func (f *Framework) SetDatacenterStatusReady(ctx context.Context, key ClusterKey) error { now := metav1.Now() return f.PatchDatacenterStatus(ctx, key, func(dc *cassdcapi.CassandraDatacenter) { @@ -190,6 +217,11 @@ func (f *Framework) SetDatacenterStatusReady(ctx context.Context, key ClusterKey Status: corev1.ConditionTrue, LastTransitionTime: now, }) + dc.Status.SetCondition(cassdcapi.DatacenterCondition{ + Type: cassdcapi.DatacenterInitialized, + Status: corev1.ConditionTrue, + LastTransitionTime: now, + }) }) } diff --git a/test/testdata/fixtures/add-dc/k8ssandra.yaml b/test/testdata/fixtures/add-dc/k8ssandra.yaml new file mode 100644 index 000000000..f222136c3 --- /dev/null +++ b/test/testdata/fixtures/add-dc/k8ssandra.yaml @@ -0,0 +1,65 @@ +apiVersion: k8ssandra.io/v1alpha1 +kind: K8ssandraCluster +metadata: + name: test +spec: + cassandra: + serverVersion: "4.0.1" + storageConfig: + cassandraDataVolumeClaimSpec: + storageClassName: standard + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 5Gi + config: + cassandraYaml: + auto_snapshot: false + memtable_flush_writers: 1 + commitlog_segment_size_in_mb: 2 + concurrent_compactors: 1 + compaction_throughput_mb_per_sec: 0 + sstable_preemptive_open_interval_in_mb: 0 + key_cache_size_in_mb: 0 + thrift_prepared_statements_cache_size_mb: 1 + prepared_statements_cache_size_mb: 1 + slow_query_log_timeout_in_ms: 0 + counter_cache_size_in_mb: 0 + concurrent_reads: 2 + concurrent_writes: 2 + concurrent_counter_writes: 2 + jvmOptions: + heapSize: 384Mi + networking: + hostNetwork: true + resources: + limits: + memory: 512Mi + datacenters: + - metadata: + name: dc1 + k8sContext: kind-k8ssandra-0 + size: 1 + reaper: {} + stargate: + size: 1 + heapSize: 384Mi + cassandraConfigMapRef: + name: cassandra-config + resources: + limits: + memory: 512Mi + livenessProbe: + initialDelaySeconds: 60 + periodSeconds: 10 + failureThreshold: 20 + successThreshold: 1 + timeoutSeconds: 20 + readinessProbe: + initialDelaySeconds: 30 + periodSeconds: 10 + failureThreshold: 20 + successThreshold: 1 + timeoutSeconds: 20 +