Skip to content

Commit

Permalink
Support safely scaling down dc/decommissioning nodes (#242)
Browse files Browse the repository at this point in the history
* support scaling down cluster by decommissioning nodes

* add integ test helper functions for executing cql

*  use reason and message fields on conditions
  • Loading branch information
sandoichi authored Sep 17, 2020
1 parent f7ef6c8 commit 8d0516a
Show file tree
Hide file tree
Showing 18 changed files with 823 additions and 39 deletions.
1 change: 1 addition & 0 deletions charts/cass-operator-chart/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ rules:
- ""
resources:
- nodes
- persistentvolumes
verbs:
- get
- list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6247,11 +6247,17 @@ spec:
lastTransitionTime:
format: date-time
type: string
message:
type: string
reason:
type: string
status:
type: string
type:
type: string
required:
- message
- reason
- status
- type
type: object
Expand Down
43 changes: 43 additions & 0 deletions mage/ginkgo/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package ginkgo_util

import (
"encoding/base64"
"fmt"
"os"
"regexp"
Expand Down Expand Up @@ -223,6 +224,14 @@ func (ns *NsWrapper) WaitForDatacenterCondition(dcName string, conditionType str
ns.WaitForOutputAndLog(step, k, value, 600)
}

func (ns *NsWrapper) WaitForDatacenterConditionWithReason(dcName string, conditionType string, value string, reason string) {
step := fmt.Sprintf("checking that dc condition %s has value %s", conditionType, value)
json := fmt.Sprintf("jsonpath={.status.conditions[?(.type=='%s')].status}", conditionType)
k := kubectl.Get("cassandradatacenter", dcName).
FormatOutput(json)
ns.WaitForOutputAndLog(step, k, value, 600)
}

func (ns *NsWrapper) WaitForDatacenterToHaveNoPods(dcName string) {
step := "checking that no dc pods remain"
json := "jsonpath={.items}"
Expand Down Expand Up @@ -512,3 +521,37 @@ func (ns NsWrapper) RetrieveStatusFromNodetool(podName string) []NodetoolNodeInf
}
return nodeInfo
}

func (ns NsWrapper) RetrieveSuperuserCreds(clusterName string) (string, string) {
secretName := fmt.Sprintf("%s-superuser", clusterName)
secretResource := fmt.Sprintf("secret/%s", secretName)

ginkgo.By("get superuser username")
json := "jsonpath={.data.username}"
k := kubectl.Get(secretResource).FormatOutput(json)
usernameBase64 := ns.OutputPanic(k)
Expect(usernameBase64).ToNot(Equal(""), "Expected secret to specify a username")
usernameDecoded, err := base64.StdEncoding.DecodeString(usernameBase64)
Expect(err).ToNot(HaveOccurred())

ginkgo.By("get superuser password")
json = "jsonpath={.data.password}"
k = kubectl.Get(secretResource).FormatOutput(json)
passwordBase64 := ns.OutputPanic(k)
Expect(passwordBase64).ToNot(Equal(""), "Expected secret to specify a password")
passwordDecoded, err := base64.StdEncoding.DecodeString(passwordBase64)
Expect(err).ToNot(HaveOccurred())

return string(usernameDecoded), string(passwordDecoded)
}

func (ns NsWrapper) CqlExecute(podName string, stepDesc string, cql string, user string, pw string) {
k := kubectl.ExecOnPod(
podName, "--", "cqlsh",
"--user", user,
"--password", pw,
"-e", cql).
WithFlag("container", "cassandra")
ginkgo.By(stepDesc)
ns.ExecVPanic(k)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6237,11 +6237,17 @@ spec:
lastTransitionTime:
format: date-time
type: string
message:
type: string
reason:
type: string
status:
type: string
type:
type: string
required:
- message
- reason
- status
- type
type: object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,7 @@ spec:
jvm-options:
initial_heap_size: "800M"
max_heap_size: "800M"
additional-jvm-opts:
# As the database comes up for the first time, set system keyspaces to RF=3
- "-Ddse.system_distributed_replication_dc_names=dc1"
- "-Ddse.system_distributed_replication_per_dc=3"
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ spec:
jvm-server-options:
initial_heap_size: "800M"
max_heap_size: "800M"
additional-jvm-opts:
# As the database comes up for the first time, set system keyspaces to RF=3
- "-Ddse.system_distributed_replication_dc_names=dc1"
- "-Ddse.system_distributed_replication_per_dc=3"
21 changes: 18 additions & 3 deletions operator/pkg/apis/cassandra/v1beta1/cassandradatacenter_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,37 @@ const (
DatacenterInitialized DatacenterConditionType = "Initialized"
DatacenterReplacingNodes DatacenterConditionType = "ReplacingNodes"
DatacenterScalingUp DatacenterConditionType = "ScalingUp"
DatacenterScalingDown DatacenterConditionType = "ScalingDown"
DatacenterUpdating DatacenterConditionType = "Updating"
DatacenterStopped DatacenterConditionType = "Stopped"
DatacenterResuming DatacenterConditionType = "Resuming"
DatacenterRollingRestart DatacenterConditionType = "RollingRestart"
DatacenterConditionValid DatacenterConditionType = "DatacenterConditionValid"
)

type DatacenterCondition struct {
Type DatacenterConditionType `json:"type"`
Status corev1.ConditionStatus `json:"status"`
Reason string `json:"reason"`
Message string `json:"message"`
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
}

func NewDatacenterCondition(conditionType DatacenterConditionType, status corev1.ConditionStatus) *DatacenterCondition {
return &DatacenterCondition{
Type: conditionType,
Status: status,
Type: conditionType,
Status: status,
Reason: "",
Message: "",
}
}

func NewDatacenterConditionWithReason(conditionType DatacenterConditionType, status corev1.ConditionStatus, reason string, message string) *DatacenterCondition {
return &DatacenterCondition{
Type: conditionType,
Status: status,
Reason: reason,
Message: message,
}
}

Expand Down Expand Up @@ -391,7 +406,7 @@ func (status *CassandraDatacenterStatus) GetConditionStatus(conditionType Datace
return condition.Status
}
}
return corev1.ConditionFalse
return corev1.ConditionUnknown
}

func (dc *CassandraDatacenter) GetConditionStatus(conditionType DatacenterConditionType) corev1.ConditionStatus {
Expand Down
4 changes: 0 additions & 4 deletions operator/pkg/apis/cassandra/v1beta1/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,6 @@ func ValidateDatacenterFieldChanges(oldDc CassandraDatacenter, newDc CassandraDa
return attemptedTo("change storageConfig")
}

if oldDc.Spec.Size > newDc.Spec.Size {
return attemptedTo("decrease size")
}

// Topology changes - Racks
// - Rack Name and Zone changes are disallowed.
// - Removing racks is not supported.
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/apis/cassandra/v1beta1/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func Test_ValidateDatacenterFieldChanges(t *testing.T) {
Size: 3,
},
},
errString: "decrease size",
errString: "",
},
{
name: "Changed a rack name",
Expand Down
6 changes: 5 additions & 1 deletion operator/pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package events

import (
"fmt"

"github.com/go-logr/logr"
"k8s.io/client-go/tools/record"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
)

const (
Expand All @@ -16,9 +17,12 @@ const (
CreatedResource string = "CreatedResource"
StartedCassandra string = "StartedCassandra"
LabeledPodAsSeed string = "LabeledPodAsSeed"
LabeledPodAsDecommissioning string = "LabeledPodAsDecommissioning"
DeletedPvc string = "DeletedPvc"
UnlabeledPodAsSeed string = "UnlabeledPodAsSeed"
LabeledRackResource string = "LabeledRackResource"
ScalingUpRack string = "ScalingUpRack"
ScalingDownRack string = "ScalingDownRack"
CreatedSuperuser string = "CreatedSuperuser" // deprecated
CreatedUsers string = "CreatedUsers"
FinishedReplaceNode string = "FinishedReplaceNode"
Expand Down
23 changes: 23 additions & 0 deletions operator/pkg/httphelper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type EndpointState struct {
IsAlive string `json:"IS_ALIVE"`
NativeTransportAddress string `json:"NATIVE_TRANSPORT_ADDRESS"`
RpcAddress string `json:"RPC_ADDRESS"`
Status string `json:"STATUS"`
Load string `json:"LOAD"`
}

func (x *EndpointState) GetRpcAddress() string {
Expand Down Expand Up @@ -290,6 +292,27 @@ func (client *NodeMgmtClient) CallReloadSeedsEndpoint(pod *corev1.Pod) error {
return err
}

func (client *NodeMgmtClient) CallDecommissionNodeEndpoint(pod *corev1.Pod) error {
client.Log.Info(
"calling Management API decommission node - POST /api/v0/ops/node/decommission",
"pod", pod.Name,
)

podHost, err := BuildPodHostFromPod(pod)
if err != nil {
return err
}

request := nodeMgmtRequest{
endpoint: "/api/v0/ops/node/decommission",
host: podHost,
method: http.MethodPost,
}

_, err = callNodeMgmtEndpoint(client, request, "")
return err
}

func callNodeMgmtEndpoint(client *NodeMgmtClient, request nodeMgmtRequest, contentType string) ([]byte, error) {
client.Log.Info("client::callNodeMgmtEndpoint")

Expand Down
Loading

0 comments on commit 8d0516a

Please sign in to comment.