Skip to content

Commit

Permalink
add schema agreement check and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jsanda committed Jan 15, 2022
1 parent 2ad44d1 commit c7a5072
Show file tree
Hide file tree
Showing 11 changed files with 97 additions and 66 deletions.
4 changes: 4 additions & 0 deletions controllers/k8ssandra/add_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func withUserKeyspaces(ctx context.Context, t *testing.T, f *framework.Framework
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil)
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil)
mockMgmtApi.On(testutils.ListKeyspaces, "").Return(userKeyspaces, nil)
mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil)

for _, ks := range userKeyspaces {
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, ks, updatedReplication).Return(nil)
Expand Down Expand Up @@ -195,6 +196,7 @@ func withStargateAndReaper(ctx context.Context, t *testing.T, f *framework.Frame
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, reaperapi.DefaultKeyspace, updatedReplication).Return(nil)
mockMgmtApi.On(testutils.ListTables, stargate.AuthKeyspace).Return([]string{stargate.AuthTable}, nil)
mockMgmtApi.On(testutils.ListKeyspaces, "").Return([]string{}, nil)
mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil)

adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) {
return mockMgmtApi, nil
Expand Down Expand Up @@ -303,6 +305,7 @@ func failSystemKeyspaceUpdate(ctx context.Context, t *testing.T, f *framework.Fr
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_distributed", updatedReplication).Return(replicationCheckErr)
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil)
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(replicationCheckErr)
mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil)

adapter := func(ctx context.Context, datacenter *cassdcapi.CassandraDatacenter, client client.Client, logger logr.Logger) (cassandra.ManagementApiFacade, error) {
return mockMgmtApi, nil
Expand Down Expand Up @@ -348,6 +351,7 @@ func failUserKeyspaceUpdate(ctx context.Context, t *testing.T, f *framework.Fram
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", replication).Return(nil)
mockMgmtApi.On(testutils.EnsureKeyspaceReplication, "system_traces", updatedReplication).Return(nil)
mockMgmtApi.On(testutils.ListKeyspaces, "").Return(userKeyspaces, nil)
mockMgmtApi.On(testutils.GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil)

for _, ks := range userKeyspaces {
mockMgmtApi.On(testutils.GetKeyspaceReplication, ks).Return(updatedReplicationStr, nil)
Expand Down
17 changes: 13 additions & 4 deletions controllers/k8ssandra/datacenters.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,22 +152,31 @@ func (r *K8ssandraClusterReconciler) reconcileDatacenters(ctx context.Context, k

actualDcs = append(actualDcs, actualDc)

if recResult := r.updateReplicationOfSystemKeyspaces(ctx, kc, desiredDc, remoteClient, logger); recResult.Completed() {
mgmtApi, err := r.ManagementApi.NewManagementApiFacade(ctx, actualDc, remoteClient, logger)
if err != nil {
return result.Error(err), actualDcs
}

if recResult := r.checkSchemaAgreement(mgmtApi, logger); recResult.Completed() {
return recResult, actualDcs
}

if recResult := r.updateReplicationOfSystemKeyspaces(ctx, kc, mgmtApi, logger); recResult.Completed() {
return recResult, actualDcs
}

if recResult := r.reconcileStargateAuthSchema(ctx, kc, desiredDc, remoteClient, logger); recResult.Completed() {
if recResult := r.reconcileStargateAuthSchema(ctx, kc, mgmtApi, logger); recResult.Completed() {
return recResult, actualDcs
}

if recResult := r.reconcileReaperSchema(ctx, kc, desiredDc, remoteClient, logger); recResult.Completed() {
if recResult := r.reconcileReaperSchema(ctx, kc, mgmtApi, logger); recResult.Completed() {
return recResult, actualDcs
}

if rebuildNeeded {
// TODO We need to handle the Stargate auth and Reaper keyspaces here.

if recResult := r.updateUserKeyspacesReplication(ctx, kc, desiredDc, remoteClient, logger); recResult.Completed() {
if recResult := r.updateUserKeyspacesReplication(ctx, kc, desiredDc, mgmtApi, logger); recResult.Completed() {
return recResult, actualDcs
}

Expand Down
10 changes: 2 additions & 8 deletions controllers/k8ssandra/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import (
func (r *K8ssandraClusterReconciler) reconcileReaperSchema(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
mgmtApi cassandra.ManagementApiFacade,
logger logr.Logger) result.ReconcileResult {

if !kc.HasReapers() {
Expand All @@ -51,11 +50,6 @@ func (r *K8ssandraClusterReconciler) reconcileReaperSchema(
return recResult
}

managementApiFacade, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger)
if err != nil {
logger.Error(err, "Failed to create ManagementApiFacade")
return result.Error(err)
}
keyspace := reaperapi.DefaultKeyspace

if kc.Spec.Reaper != nil && kc.Spec.Reaper.Keyspace != "" {
Expand All @@ -68,7 +62,7 @@ func (r *K8ssandraClusterReconciler) reconcileReaperSchema(
datacenters = append(datacenters, dc)
}
}
err = managementApiFacade.EnsureKeyspaceReplication(
err := mgmtApi.EnsureKeyspaceReplication(
keyspace,
cassandra.ComputeReplication(3, datacenters...),
)
Expand Down
33 changes: 18 additions & 15 deletions controllers/k8ssandra/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,22 @@ import (
"time"
)

func (r *K8ssandraClusterReconciler) checkSchemaAgreement(mgmtApi cassandra.ManagementApiFacade, logger logr.Logger) result.ReconcileResult {

versions, err := mgmtApi.GetSchemaVersions()
if err != nil {
return result.Error(err)
}

if len(versions) == 1 {
return result.Continue()
}

logger.Info("There is schema disagreement", "versions", len(versions))

return result.RequeueSoon(r.DefaultDelay)
}

// checkSystemReplication checks for the SystemReplicationAnnotation on kc. If found, the
// JSON value is unmarshalled and returned. If not found, the SystemReplication is computed
// and is stored in the SystemReplicationAnnotation on kc. The value is JSON-encoded.
Expand Down Expand Up @@ -61,20 +77,13 @@ func (r *K8ssandraClusterReconciler) checkSystemReplication(ctx context.Context,
func (r *K8ssandraClusterReconciler) updateReplicationOfSystemKeyspaces(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
mgmtApi cassandra.ManagementApiFacade,
logger logr.Logger) result.ReconcileResult {

if recResult := r.versionCheck(ctx, kc); recResult.Completed() {
return recResult
}

mgmtApi, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger)
if err != nil {
logger.Error(err, "Failed to create ManagementApiFacade")
return result.Error(err)
}

keyspaces := []string{"system_traces", "system_distributed", "system_auth"}
datacenters := cassandra.GetDatacentersForSystemReplication(kc)
replication := cassandra.ComputeReplication(3, datacenters...)
Expand Down Expand Up @@ -103,7 +112,7 @@ func (r *K8ssandraClusterReconciler) updateUserKeyspacesReplication(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
mgmtApi cassandra.ManagementApiFacade,
logger logr.Logger) result.ReconcileResult {

jsonReplication := annotations.GetAnnotation(kc, api.DcReplicationAnnotation)
Expand All @@ -114,12 +123,6 @@ func (r *K8ssandraClusterReconciler) updateUserKeyspacesReplication(

logger.Info("Updating replication for user keyspaces")

mgmtApi, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger)
if err != nil {
logger.Error(err, "Failed to create ManagementApiFacade")
return result.Error(err)
}

userKeyspaces, err := getUserKeyspaces(mgmtApi, kc)
if err != nil {
logger.Error(err, "Failed to get user keyspaces")
Expand Down
13 changes: 3 additions & 10 deletions controllers/k8ssandra/stargate.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ func (r *K8ssandraClusterReconciler) setStatusForStargate(kc *api.K8ssandraClust
func (r *K8ssandraClusterReconciler) reconcileStargateAuthSchema(
ctx context.Context,
kc *api.K8ssandraCluster,
dc *cassdcapi.CassandraDatacenter,
remoteClient client.Client,
mgmtApi cassandra.ManagementApiFacade,
logger logr.Logger) result.ReconcileResult {

if !kc.HasStargates() {
Expand All @@ -170,25 +169,19 @@ func (r *K8ssandraClusterReconciler) reconcileStargateAuthSchema(
return recResult
}

managementApi, err := r.ManagementApi.NewManagementApiFacade(ctx, dc, remoteClient, logger)
if err != nil {
logger.Error(err, "Failed to create ManagementApiFacade")
return result.Error(err)
}

datacenters := make([]api.CassandraDatacenterTemplate, 0)
for _, dc := range kc.Spec.Cassandra.Datacenters {
if status, found := kc.Status.Datacenters[dc.Meta.Name]; found && status.Cassandra.GetConditionStatus(cassdcapi.DatacenterReady) == corev1.ConditionTrue {
datacenters = append(datacenters, dc)
}
}
replication := cassandra.ComputeReplication(3, datacenters...)
if err = managementApi.EnsureKeyspaceReplication(stargate.AuthKeyspace, replication); err != nil {
if err := mgmtApi.EnsureKeyspaceReplication(stargate.AuthKeyspace, replication); err != nil {
logger.Error(err, "Failed to ensure keyspace replication")
return result.Error(err)
}

if err = stargate.ReconcileAuthTable(managementApi, logger); err != nil {
if err := stargate.ReconcileAuthTable(mgmtApi, logger); err != nil {
logger.Error(err, "Failed to reconcile Stargate auth table")
return result.Error(err)
}
Expand Down
21 changes: 1 addition & 20 deletions controllers/medusa/controllers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@ import (
"testing"
"time"

"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
"github.com/k8ssandra/cass-operator/pkg/httphelper"
ctrl "github.com/k8ssandra/k8ssandra-operator/controllers/k8ssandra"
"github.com/k8ssandra/k8ssandra-operator/pkg/cassandra"
"github.com/k8ssandra/k8ssandra-operator/pkg/clientcache"
"github.com/k8ssandra/k8ssandra-operator/pkg/config"
"github.com/k8ssandra/k8ssandra-operator/pkg/mocks"
"github.com/k8ssandra/k8ssandra-operator/pkg/stargate"
"github.com/stretchr/testify/mock"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
Expand All @@ -32,7 +26,7 @@ var (
defaultStorageClass = "default"
testEnv *testutils.MultiClusterTestEnv
seedsResolver = &fakeSeedsResolver{}
managementApi = &fakeManagementApiFactory{}
managementApi = &testutils.FakeManagementApiFactory{}
medusaClientFactory *fakeMedusaClientFactory
)

Expand Down Expand Up @@ -135,16 +129,3 @@ type fakeSeedsResolver struct {
func (r *fakeSeedsResolver) ResolveSeedEndpoints(ctx context.Context, dc *cassdcapi.CassandraDatacenter, remoteClient client.Client) ([]string, error) {
return r.callback(dc)
}

type fakeManagementApiFactory struct {
}

func (f fakeManagementApiFactory) NewManagementApiFacade(context.Context, *cassdcapi.CassandraDatacenter, client.Client, logr.Logger) (cassandra.ManagementApiFacade, error) {
m := new(mocks.ManagementApiFacade)
m.On("EnsureKeyspaceReplication", mock.Anything, mock.Anything).Return(nil)
m.On("ListTables", stargate.AuthKeyspace).Return([]string{"token"}, nil)
m.On("CreateTable", mock.MatchedBy(func(def *httphelper.TableDefinition) bool {
return def.KeyspaceName == stargate.AuthKeyspace && def.TableName == stargate.AuthTable
})).Return(nil)
return m, nil
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
)

replace (
github.com/k8ssandra/cass-operator => github.com/jsanda/cass-operator-1 v1.6.1-0.20220114212340-bb9f5b39297d
k8s.io/api => k8s.io/api v0.22.2
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.22.2
k8s.io/apimachinery => k8s.io/apimachinery v0.22.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,8 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/jsanda/cass-operator-1 v1.6.1-0.20220114212340-bb9f5b39297d h1:fiWmJ6b0FUR39NIHOk/598XQhiho0kJ3bebsqNo7Yfs=
github.com/jsanda/cass-operator-1 v1.6.1-0.20220114212340-bb9f5b39297d/go.mod h1:zsEoRc3auDcbY3GZAGv14mBZ5KTbj3EwMQzmM8/Kl4A=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
Expand All @@ -482,8 +484,6 @@ github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfV
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220108141909-eb7bbb91f9bb h1:ZMScu7aAIwCdCw3UJnWqUMyA7iqXGGdDRKG0xPb6ayA=
github.com/k8ssandra/cass-operator v1.8.0-rc.2.0.20220108141909-eb7bbb91f9bb/go.mod h1:zsEoRc3auDcbY3GZAGv14mBZ5KTbj3EwMQzmM8/Kl4A=
github.com/k8ssandra/reaper-client-go v0.3.1-0.20210617111910-fe2ba92f8efb h1:RvFhovAfMgALqozmpVo2F/9gSITPIUubI1153mQNiPA=
github.com/k8ssandra/reaper-client-go v0.3.1-0.20210617111910-fe2ba92f8efb/go.mod h1:WsQymIaVT39xbcstZhdqynUS13AGzP2p6U9Hsk1oy5M=
github.com/karrick/godirwalk v1.16.1/go.mod h1:j4mkqPuvaLI8mp1DroR3P6ad7cyYd4c1qeJ3RV7ULlk=
Expand Down
22 changes: 22 additions & 0 deletions pkg/cassandra/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cassandra
import (
"context"
"fmt"
"github.com/k8ssandra/k8ssandra-operator/pkg/utils"
"strconv"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -91,6 +92,10 @@ type ManagementApiFacade interface {
// EnsureKeyspaceReplication checks if the given keyspace has the given replication, and if it does not,
// alters it to match the desired replication.
EnsureKeyspaceReplication(keyspaceName string, replication map[string]int) error

// GetSchemaVersions list all of the schema versions know to this node. The map keys are schema version UUIDs.
// The values are list of node IPs.
GetSchemaVersions() (map[string][]string, error)
}

type defaultManagementApiFacade struct {
Expand Down Expand Up @@ -290,3 +295,20 @@ func (r *defaultManagementApiFacade) EnsureKeyspaceReplication(keyspaceName stri
}
}
}

func (r *defaultManagementApiFacade) GetSchemaVersions() (map[string][]string, error) {
pods, err := r.fetchDatacenterPods()
if err != nil {
return nil, err
}

for _, pod := range pods {
if schemaVersions, err := r.nodeMgmtClient.CallSchemaVersionsEndpoint(&pod); err != nil {
r.logger.V(4).Error(err, "failed to list schema versions", "Pod", pod.Name)
} else {
return schemaVersions, nil
}
}

return nil, fmt.Errorf("failed to get schema version on all pods in CassandraDatacenter %v", utils.GetKey(r.dc))
}
25 changes: 24 additions & 1 deletion pkg/mocks/ManagementApiFacade.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 7 additions & 6 deletions pkg/test/mgmtapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package test

import (
"context"
"fmt"
"github.com/go-logr/logr"
cassdcapi "github.com/k8ssandra/cass-operator/apis/cassandra/v1beta1"
"github.com/k8ssandra/cass-operator/pkg/httphelper"
Expand All @@ -26,12 +25,13 @@ var defaultAdapater ManagementApiFactoryAdapter = func(
logger logr.Logger) (cassandra.ManagementApiFacade, error) {

m := new(mocks.ManagementApiFacade)
m.On("EnsureKeyspaceReplication", mock.Anything, mock.Anything).Return(nil)
m.On("ListTables", stargate.AuthKeyspace).Return([]string{"token"}, nil)
m.On("CreateTable", mock.MatchedBy(func(def *httphelper.TableDefinition) bool {
m.On(EnsureKeyspaceReplication, mock.Anything, mock.Anything).Return(nil)
m.On(ListTables, stargate.AuthKeyspace).Return([]string{"token"}, nil)
m.On(CreateTable, mock.MatchedBy(func(def *httphelper.TableDefinition) bool {
return def.KeyspaceName == stargate.AuthKeyspace && def.TableName == stargate.AuthTable
})).Return(nil)
m.On("ListKeyspaces", "").Return([]string{}, nil)
m.On(ListKeyspaces, "").Return([]string{}, nil)
m.On(GetSchemaVersions).Return(map[string][]string{"fake": {"test"}}, nil)
return m, nil
}

Expand All @@ -40,7 +40,6 @@ type FakeManagementApiFactory struct {
}

func (f *FakeManagementApiFactory) Reset() {
fmt.Println("RESET")
f.adapter = nil
}

Expand Down Expand Up @@ -68,7 +67,9 @@ const (
CreateKeyspaceIfNotExists = "CreateKeyspaceIfNotExists"
AlterKeyspace = "AlterKeyspace"
ListKeyspaces = "ListKeyspaces"
CreateTable = "CreateTable"
ListTables = "ListTables"
GetSchemaVersions = "GetSchemaVersions"
)

type FakeManagementApiFacade struct {
Expand Down

0 comments on commit c7a5072

Please sign in to comment.