Skip to content

Commit

Permalink
requeue after the last node has its node-state label set to Started d…
Browse files Browse the repository at this point in the history
…uring cluster creation (#77)

* requeue after the last node has its node-state label set to Started

This commit fixes an issue in which we basically see out of order status
updates. Specifically, the .Status.SuperUserUpserted property is getting set
before the last node is added to .Status.NodeStatuses. The actual order of
operations is correct though. The super user is not created until all C* nodes
are started. It is just that the status updates basically occur out of order.

* add integration test (WIP)

* fix file name

* a few updates based on PR review

* add comment
  • Loading branch information
jsanda authored May 8, 2020
1 parent 5ae9bee commit b96bfd7
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 5 deletions.
19 changes: 19 additions & 0 deletions mage/ginkgo/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,25 @@ func (ns *NsWrapper) WaitForDatacenterOperatorProgress(dcName string, progressVa
ns.WaitForOutputAndLog(step, k, progressValue, timeout)
}

func (ns *NsWrapper) WaitForSuperUserUpserted(dcName string, timeout int) {
json := "jsonpath={.status.superUserUpserted}"
k := kubectl.Get("CassandraDatacenter", dcName).
FormatOutput(json)
execErr := ns.WaitForOutputPattern(k, `\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z`, timeout)
Expect(execErr).ToNot(HaveOccurred())
}

func (ns *NsWrapper) GetNodeStatusesHostIds(dcName string) []string {
json := "jsonpath={.status.nodeStatuses['*'].hostID}"
k := kubectl.Get("CassandraDatacenter", dcName).
FormatOutput(json)

output := ns.OutputPanic(k)
hostIds := strings.Split(output, " ")

return hostIds
}

func (ns *NsWrapper) WaitForDatacenterReadyPodCount(dcName string, count int) {
timeout := count * 400
step := "waiting for the node to become ready"
Expand Down
23 changes: 18 additions & 5 deletions operator/pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func (rc *ReconciliationContext) CheckPodsReady(endpointData httphelper.CassMeta

// step 1 - see if any nodes are already coming up

nodeIsStarting, err := rc.findStartingNodes()
nodeIsStarting, nodeStarted, err := rc.findStartingNodes()

if err != nil {
return result.Error(err)
Expand Down Expand Up @@ -476,6 +476,11 @@ func (rc *ReconciliationContext) CheckPodsReady(endpointData httphelper.CassMeta
desiredSize := int(rc.Datacenter.Spec.Size)

if desiredSize == readyPodCount && desiredSize == startedLabelCount {
// When the ready and started counts match the desired counts and nodeStarted is true, then that means we have
// just started the last node in the data center.
if nodeStarted {
return result.RequeueSoon(2)
}
return result.Continue()
} else {
err := fmt.Errorf("checks failed desired:%d, ready:%d, started:%d", desiredSize, readyPodCount, startedLabelCount)
Expand Down Expand Up @@ -1332,7 +1337,13 @@ func (rc *ReconciliationContext) callNodeManagementStart(pod *corev1.Pod) error
return err
}

func (rc *ReconciliationContext) findStartingNodes() (bool, error) {
// Checks to see if any node is starting. This is done by checking to see if the cassandra.datastax.com/node-state label
// has a value of Starting. If it does then check to see if the C* node is ready. If the node is ready, the pod's
// cassandra.datastax.com/node-state label is set to a value of Started. This function returns two bools and an error.
// The first bool is true if there is a C* node that is Starting. The second bool is set to true if a C* node has just
// transitioned to the Started state by having its cassandra.datastax.com/node-state label set to Started. The error is
// non-nil if updating the pod's labels fails.
func (rc *ReconciliationContext) findStartingNodes() (bool, bool, error) {
rc.ReqLogger.Info("reconcile_racks::findStartingNodes")

for _, pod := range rc.clusterPods {
Expand All @@ -1341,7 +1352,9 @@ func (rc *ReconciliationContext) findStartingNodes() (bool, error) {
rc.Recorder.Eventf(rc.Datacenter, corev1.EventTypeNormal, events.StartedCassandra,
"Started Cassandra for pod %s", pod.Name)
if err := rc.labelServerPodStarted(pod); err != nil {
return false, err
return false, false, err
} else {
return false, true, nil
}
} else {
// TODO Calling start again on the pod seemed like a good defensive practice
Expand All @@ -1351,11 +1364,11 @@ func (rc *ReconciliationContext) findStartingNodes() (bool, error) {
// if err := rc.callNodeManagementStart(pod); err != nil {
// return false, err
// }
return true, nil
return true, false, nil
}
}
}
return false, nil
return false, false, nil
}

func (rc *ReconciliationContext) findStartedNotReadyNodes() (bool, error) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package scale_up_status_updates

import (
"fmt"
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

ginkgo_util "github.com/datastax/cass-operator/mage/ginkgo"
"github.com/datastax/cass-operator/mage/kubectl"
)

var (
testName = "Scale up status updates"
namespace = "test-scale-up-status-updates"
dcName = "dc1"
dcYaml = "../testdata/oss-two-rack-six-node-dc.yaml"
operatorYaml = "../testdata/operator.yaml"
dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName)
dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName)
ns = ginkgo_util.NewWrapper(testName, namespace)
)

func TestLifecycle(t *testing.T) {
AfterSuite(func() {
logPath := fmt.Sprintf("%s/aftersuite", ns.LogDir)
kubectl.DumpAllLogs(logPath).ExecV()
fmt.Printf("\n\tPost-run logs dumped at: %s\n\n", logPath)
ns.Terminate()
})

RegisterFailHandler(Fail)
RunSpecs(t, testName)
}

var _ = Describe(testName, func() {
Context("when in a new cluster", func() {
Specify("the operator can scale up a datacenter and does not upsert the super user until all nodes have been started", func() {
By("creating a namespace")
err := kubectl.CreateNamespace(namespace).ExecV()
Expect(err).ToNot(HaveOccurred())

step := "setting up cass-operator resources via helm chart"
ns.HelmInstall("../../charts/cass-operator-chart")

ns.WaitForOperatorReady()

step = "creating a datacenter resource with 2 racks/6 nodes"
k := kubectl.ApplyFiles(dcYaml)
ns.ExecAndLog(step, k)

ns.WaitForSuperUserUpserted(dcName, 600)

step = "checking that all nodes have been started"
nodeStatusesHostIds := ns.GetNodeStatusesHostIds(dcName)
Expect(len(nodeStatusesHostIds), 6)

step = "deleting the dc"
k = kubectl.DeleteFromFiles(dcYaml)
ns.ExecAndLog(step, k)

step = "checking that the dc no longer exists"
json := "jsonpath={.items}"
k = kubectl.Get("CassandraDatacenter").
WithLabel(dcLabel).
FormatOutput(json)
ns.WaitForOutputAndLog(step, k, "[]", 300)
})
})
})
28 changes: 28 additions & 0 deletions tests/testdata/oss-two-rack-six-node-dc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
name: dc1
spec:
clusterName: cluster1
serverType: cassandra
serverVersion: "3.11.6"
serverImage: datastax/cassandra-mgmtapi-3_11_6:v0.1.0
configBuilderImage: datastax/cass-config-builder:1.0.0
managementApiAuth:
insecure: {}
size: 6
storageConfig:
cassandraDataVolumeClaimSpec:
storageClassName: server-storage
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
racks:
- name: r1
- name: r2
config:
jvm-options:
initial_heap_size: "800m"
max_heap_size: "800m"

0 comments on commit b96bfd7

Please sign in to comment.