Skip to content

Commit

Permalink
Add quietPeriod and observedGeneration to the status (#190)
Browse files Browse the repository at this point in the history
* Put quietPeriod in the status

* Use quiet period at the very end of reconciling, and when we update an existing statefulset

* TODO

* gofmt

* Add observedGeneration to the CassDC status

* more gofmt

* When updating a statefulset, check the ObservedGeneration

* Ran codegen

* comment
  • Loading branch information
jimdickinson authored Aug 12, 2020
1 parent 4aaaff7 commit 8cf67a6
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6191,6 +6191,12 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
quietPeriod:
format: date-time
type: string
superUserUpserted:
description: Deprecated. Use usersUpserted instead. The timestamp at
which CQL superuser credentials were last upserted to the management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6181,6 +6181,12 @@ spec:
type: string
type: object
type: object
observedGeneration:
format: int64
type: integer
quietPeriod:
format: date-time
type: string
superUserUpserted:
description: Deprecated. Use usersUpserted instead. The timestamp at
which CQL superuser credentials were last upserted to the management
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,11 @@ type CassandraDatacenterStatus struct {
// +optional
NodeReplacements []string `json:"nodeReplacements"`

// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// +optional
QuietPeriod metav1.Time `json:"quietPeriod,omitempty"`

// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
}

// +genclient
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions operator/pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ package reconciliation

import (
"fmt"
"k8s.io/api/batch/v1"
"os"

v1 "k8s.io/api/batch/v1"

api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"
"github.com/datastax/cass-operator/operator/pkg/httphelper"
"github.com/datastax/cass-operator/operator/pkg/oplabels"
Expand Down Expand Up @@ -367,6 +368,10 @@ func setOperatorProgressStatus(rc *ReconciliationContext, newState api.ProgressS

patch := client.MergeFrom(rc.Datacenter.DeepCopy())
rc.Datacenter.Status.CassandraOperatorProgress = newState
// TODO there may be a better place to push status.observedGeneration in the reconcile loop
if newState == api.ProgressReady {
rc.Datacenter.Status.ObservedGeneration = rc.Datacenter.Generation
}
if err := rc.Client.Status().Patch(rc.Ctx, rc.Datacenter, patch); err != nil {
rc.ReqLogger.Error(err, "error updating the Cassandra Operator Progress state")
return err
Expand Down Expand Up @@ -673,7 +678,7 @@ func buildInitReaperSchemaJob(dc *api.CassandraDatacenter) *v1.Job {
}
}

func addVolumes(dc *api.CassandraDatacenter,baseTemplate *corev1.PodTemplateSpec) []corev1.Volume {
func addVolumes(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTemplateSpec) []corev1.Volume {
vServerConfig := corev1.Volume{}
vServerConfig.Name = "server-config"
vServerConfig.VolumeSource = corev1.VolumeSource{
Expand All @@ -690,10 +695,9 @@ func addVolumes(dc *api.CassandraDatacenter,baseTemplate *corev1.PodTemplateSpec
vServerEncryption.Name = "encryption-cred-storage"
vServerEncryption.VolumeSource = corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName:fmt.Sprintf("%s-keystore", dc.Name)},
SecretName: fmt.Sprintf("%s-keystore", dc.Name)},
}


volumes := []corev1.Volume{vServerConfig, vServerLogs, vServerEncryption}
baseTemplate.Spec.Volumes = append(baseTemplate.Spec.Volumes, volumes...)
return volumes
Expand Down
8 changes: 8 additions & 0 deletions operator/pkg/reconciliation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (r *ReconcileCassandraDatacenter) Reconcile(request reconcile.Request) (rec
return result.Error(err).Output()
}

// TODO fold this into the quiet period
twentySecs := time.Second * 20
lastNodeStart := rc.Datacenter.Status.LastServerNodeStarted
cooldownTime := time.Until(lastNodeStart.Add(twentySecs))
Expand All @@ -141,6 +142,13 @@ func (r *ReconcileCassandraDatacenter) Reconcile(request reconcile.Request) (rec
return result.RequeueSoon(secs).Output()
}

if rc.Datacenter.Status.QuietPeriod.After(time.Now()) {
logger.Info("Ending reconciliation early because the datacenter is in a quiet period")
cooldownTime = rc.Datacenter.Status.QuietPeriod.Sub(time.Now())
secs := 1 + int(cooldownTime.Seconds())
return result.RequeueSoon(secs).Output()
}

res, err := rc.calculateReconciliationActions()
if err != nil {
logger.Error(err, "calculateReconciliationActions returned an error")
Expand Down
34 changes: 29 additions & 5 deletions operator/pkg/reconciliation/reconcile_racks.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,15 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
return result.Error(err)
}

if err := rc.enableQuietPeriod(20); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err)
}

// we just updated k8s and pods will be knocked out of ready state, so let k8s
// call us back when these changes are done and the new pods are back to ready
// TODO should we requeue for some amount of time in the future instead?
return result.Done()
} else {

Expand All @@ -275,7 +281,8 @@ func (rc *ReconciliationContext) CheckRackPodTemplate() result.ReconcileResult {
// because there's an upgrade in progress

status := statefulSet.Status
if status.Replicas != status.ReadyReplicas ||
if statefulSet.Generation != status.ObservedGeneration ||
status.Replicas != status.ReadyReplicas ||
status.Replicas != status.CurrentReplicas ||
status.Replicas != status.UpdatedReplicas {

Expand Down Expand Up @@ -1473,6 +1480,16 @@ func (rc *ReconciliationContext) labelServerPodStarting(pod *corev1.Pod) error {
return err
}

func (rc *ReconciliationContext) enableQuietPeriod(seconds int) error {
dc := rc.Datacenter

dur := time.Second * time.Duration(seconds)
statusPatch := client.MergeFrom(dc.DeepCopy())
dc.Status.QuietPeriod = metav1.NewTime(time.Now().Add(dur))
err := rc.Client.Status().Patch(rc.Ctx, dc, statusPatch)
return err
}

func (rc *ReconciliationContext) labelServerPodStarted(pod *corev1.Pod) error {
patch := client.MergeFrom(pod.DeepCopy())
pod.Labels[api.CassNodeState] = stateStarted
Expand Down Expand Up @@ -1546,9 +1563,9 @@ func (rc *ReconciliationContext) findStartedNotReadyNodes() (bool, error) {

func (rc *ReconciliationContext) copyPodCredentials(pod *corev1.Pod, jksBlob []byte) error {
_, err := rc.retrieveSecret(types.NamespacedName{
Name: fmt.Sprintf("%s-keystore", rc.Datacenter.Name),
Namespace: rc.Datacenter.Namespace,
})
Name: fmt.Sprintf("%s-keystore", rc.Datacenter.Name),
Namespace: rc.Datacenter.Namespace,
})

if err == nil { // This secret already exists, nothing to do
return nil
Expand Down Expand Up @@ -2100,6 +2117,13 @@ func (rc *ReconciliationContext) ReconcileAllRacks() (reconcile.Result, error) {
return result.Error(err).Output()
}

if err := rc.enableQuietPeriod(5); err != nil {
logger.Error(
err,
"Error when enabling quiet period")
return result.Error(err).Output()
}

rc.ReqLogger.Info("All StatefulSets should now be reconciled.")

return result.Done().Output()
Expand Down

0 comments on commit 8cf67a6

Please sign in to comment.