Skip to content

Commit

Permalink
Termination grace time and prestop drain (#269)
Browse files Browse the repository at this point in the history
* default DefaultTerminationGracePeriodSeconds to 120

* adding test for prestop

* upgraded the sleep test

* comment fixes

* Extracted function BuildMgmtApiWgetAction

* Set prestop hook

* WIP - introduced secure and insecure methods for making wget mgmt api actions

* WIP - default to insecure mgmt api if nothing is specified

* WIP - insecure node drain in prestop works

* WIP - insecure node drain in prestop works

* Test now looks for successful drain

* WIP - working on secure node drain call

* WIP - updating test

* rename

* WIP - fixing tests

* disabled drain check for now

* introduced new oss test file

* WIP - moving to oss

* removed test-jks

* always remove test-jks
  • Loading branch information
respringer authored Oct 5, 2020
1 parent 08ecb09 commit 3686fd1
Show file tree
Hide file tree
Showing 10 changed files with 330 additions and 46 deletions.
4 changes: 4 additions & 0 deletions mage/kubectl/lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func CreateFromFiles(paths ...string) KCmd {
return KCmd{Command: "create", Args: args}
}

func Logs(args ...string) KCmd {
return KCmd{Command: "logs", Args: args}
}

func Get(args ...string) KCmd {
return KCmd{Command: "get", Args: args}
}
Expand Down
115 changes: 86 additions & 29 deletions operator/pkg/httphelper/security.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
WgetNodeDrainEndpoint = "localhost:8080/api/v0/ops/node/drain"
// TODO: Get endpoint from configured HTTPGet probe
livenessEndpoint = "localhost:8080/api/v0/probes/liveness"
// TODO: Get endpoint from configured HTTPGet probe
readinessEndpoint = "localhost:8080/api/v0/probes/readiness"

caCertPath = "/management-api-certs/ca.crt"
tlsCrt = "/management-api-certs/tls.crt"
tlsKey = "/management-api-certs/tls.key"
)

// API for Node Management mAuth Config
func GetManagementApiProtocol(dc *api.CassandraDatacenter) (string, error) {
provider, err := BuildManagmenetApiSecurityProvider(dc)
Expand Down Expand Up @@ -84,6 +96,8 @@ func ValidateManagementApiConfig(dc *api.CassandraDatacenter, client client.Clie
// SPI for adding new mechanisms for securing the management API
type ManagementApiSecurityProvider interface {
BuildHttpClient(client client.Client, ctx context.Context) (HttpClient, error)
BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction
BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction
AddServerSecurity(pod *corev1.PodTemplateSpec) error
GetProtocol() string
ValidateConfig(client client.Client, ctx context.Context) []error
Expand All @@ -93,7 +107,8 @@ type InsecureManagementApiSecurityProvider struct {
}

func buildInsecureManagementApiSecurityProvider(dc *api.CassandraDatacenter) (ManagementApiSecurityProvider, error) {
if dc.Spec.ManagementApiAuth.Insecure != nil {
// If both are nil, then default to insecure
if dc.Spec.ManagementApiAuth.Insecure != nil || (dc.Spec.ManagementApiAuth.Manual == nil && dc.Spec.ManagementApiAuth.Insecure == nil) {
return &InsecureManagementApiSecurityProvider{}, nil
}
return nil, nil
Expand Down Expand Up @@ -134,10 +149,75 @@ func (provider *ManualManagementApiSecurityProvider) GetProtocol() string {
return "https"
}

func GetMgmtApiWgetAction(dc *api.CassandraDatacenter, endpoint string) (*corev1.ExecAction, error) {
provider, err := BuildManagmenetApiSecurityProvider(dc)
if err != nil {
return nil, err
}
return provider.BuildMgmtApiWgetAction(endpoint), nil
}

func GetMgmtApiWgetPostAction(dc *api.CassandraDatacenter, endpoint string, postData string) (*corev1.ExecAction, error) {
provider, err := BuildManagmenetApiSecurityProvider(dc)
if err != nil {
return nil, err
}
return provider.BuildMgmtApiWgetPostAction(endpoint, postData), nil
}

func (provider *InsecureManagementApiSecurityProvider) BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction {
return &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
fmt.Sprintf("http://%s", endpoint),
},
}
}

func (provider *ManualManagementApiSecurityProvider) BuildMgmtApiWgetAction(endpoint string) *corev1.ExecAction {
return &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
"--certificate", tlsCrt,
"--private-key", tlsKey,
"--ca-certificate", caCertPath,
fmt.Sprintf("https://%s", endpoint),
},
}
}

func (provider *InsecureManagementApiSecurityProvider) BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction {
return &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
fmt.Sprintf("--post-data='%s'", postData),
fmt.Sprintf("http://%s", endpoint),
},
}
}

func (provider *ManualManagementApiSecurityProvider) BuildMgmtApiWgetPostAction(endpoint string, postData string) *corev1.ExecAction {
return &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
"--certificate", tlsCrt,
"--private-key", tlsKey,
"--ca-certificate", caCertPath,
fmt.Sprintf("--post-data='%s'", postData),
fmt.Sprintf("https://%s", endpoint),
},
}
}

func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *corev1.PodTemplateSpec) error {
caCertPath := "/management-api-certs/ca.crt"
tlsCrt := "/management-api-certs/tls.crt"
tlsKey := "/management-api-certs/tls.key"

// find the container
var container *corev1.Container = nil
Expand Down Expand Up @@ -217,48 +297,25 @@ func (provider *ManualManagementApiSecurityProvider) AddServerSecurity(pod *core
container.Env = append(envVars, container.Env...)

// Update Liveness probe to account for mutual auth (can't just use HTTP probe now)
// TODO: Get endpoint from configured HTTPGet probe
livenessEndpoint := "https://localhost:8080/api/v0/probes/liveness"
if container.LivenessProbe == nil {
container.LivenessProbe = &corev1.Probe{
Handler: corev1.Handler{},
}
}
container.LivenessProbe.Handler.HTTPGet = nil
container.LivenessProbe.Handler.TCPSocket = nil
container.LivenessProbe.Handler.Exec = &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
"--certificate", tlsCrt,
"--private-key", tlsKey,
"--ca-certificate", caCertPath,
livenessEndpoint,
},
}
container.LivenessProbe.Handler.Exec = provider.BuildMgmtApiWgetAction(livenessEndpoint)

// Update Readiness probe to account for mutual auth (can't just use HTTP probe now)
// TODO: Get endpoint from configured HTTPGet probe
readinessEndpoint := "https://localhost:8080/api/v0/probes/readiness"
if container.ReadinessProbe == nil {
container.ReadinessProbe = &corev1.Probe{
Handler: corev1.Handler{},
}
}
container.ReadinessProbe.Handler.HTTPGet = nil
container.ReadinessProbe.Handler.TCPSocket = nil
container.ReadinessProbe.Handler.Exec = &corev1.ExecAction{
Command: []string{
"wget",
"--output-document", "/dev/null",
"--no-check-certificate",
"--certificate", tlsCrt,
"--private-key", tlsKey,
"--ca-certificate", caCertPath,
readinessEndpoint,
},
}
container.ReadinessProbe.Handler.Exec = provider.BuildMgmtApiWgetAction(readinessEndpoint)

return nil
}
Expand Down
30 changes: 26 additions & 4 deletions operator/pkg/reconciliation/construct_podtemplatespec.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"reflect"

api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"
"github.com/datastax/cass-operator/operator/pkg/httphelper"
"github.com/datastax/cass-operator/operator/pkg/images"
"github.com/datastax/cass-operator/operator/pkg/oplabels"
"github.com/datastax/cass-operator/operator/pkg/utils"
Expand All @@ -20,10 +21,11 @@ import (
)

const (
ServerConfigContainerName = "server-config-init"
CassandraContainerName = "cassandra"
PvcName = "server-data"
SystemLoggerContainerName = "server-system-logger"
DefaultTerminationGracePeriodSeconds = 120
ServerConfigContainerName = "server-config-init"
CassandraContainerName = "cassandra"
PvcName = "server-data"
SystemLoggerContainerName = "server-system-logger"
)

// calculateNodeAffinity provides a way to pin all pods within a statefulset to the same zone
Expand Down Expand Up @@ -316,6 +318,20 @@ func buildContainers(dc *api.CassandraDatacenter, baseTemplate *corev1.PodTempla
cassContainer.ReadinessProbe = probe(8080, "/api/v0/probes/readiness", 20, 10)
}

if cassContainer.Lifecycle == nil {
cassContainer.Lifecycle = &corev1.Lifecycle{}
}

if cassContainer.Lifecycle.PreStop == nil {
action, err := httphelper.GetMgmtApiWgetPostAction(dc, httphelper.WgetNodeDrainEndpoint, "")
if err != nil {
return err
}
cassContainer.Lifecycle.PreStop = &corev1.Handler{
Exec: action,
}
}

// Combine env vars

envDefaults := []corev1.EnvVar{
Expand Down Expand Up @@ -437,6 +453,12 @@ func buildPodTemplateSpec(dc *api.CassandraDatacenter, zone string, rackName str
baseTemplate.Spec.DNSPolicy = corev1.DNSClusterFirstWithHostNet
}

if baseTemplate.Spec.TerminationGracePeriodSeconds == nil {
// Note: we cannot take the address of a constant
gracePeriodSeconds := int64(DefaultTerminationGracePeriodSeconds)
baseTemplate.Spec.TerminationGracePeriodSeconds = &gracePeriodSeconds
}

// Adds custom registry pull secret if needed

_ = images.AddDefaultRegistryImagePullSecrets(&baseTemplate.Spec)
Expand Down
17 changes: 11 additions & 6 deletions operator/pkg/utils/crypto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@ package utils

import (
"crypto/rsa"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"crypto/x509"
"encoding/pem"
"fmt"
"testing"
"io/ioutil"
"os"
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func Test_newCA(t *testing.T) {
var verify_key *rsa.PrivateKey
pem_key, cert, err := GetNewCAandKey("cassandradatacenter-webhook-service","somenamespace")
pem_key, cert, err := GetNewCAandKey("cassandradatacenter-webhook-service", "somenamespace")
certpool := x509.NewCertPool()
key, _ := pem.Decode([]byte(pem_key))
if block, _ := pem.Decode([]byte(cert)); block != nil {
Expand Down Expand Up @@ -42,7 +44,6 @@ func Test_newCA(t *testing.T) {
if verify_cert_key, ok := cert.PublicKey.(*rsa.PublicKey); !ok {
t.Errorf("Error: couldn't typecast cert key")


} else {
verify_key_public, _ := verify_key.Public().(*rsa.PublicKey)
if fmt.Sprintf("%+v", verify_key_public) != fmt.Sprintf("%+v", verify_cert_key) {
Expand Down Expand Up @@ -79,6 +80,10 @@ func Test_GetJKS(t *testing.T) {
if len(jks) == 0 {
t.Errorf("JKS blob too small")
}
ioutil.WriteFile("test-jks", jks, 0644)

defer func() {
_ = os.Remove("test-jks")
}()

ioutil.WriteFile("test-jks", jks, 0644)
}
9 changes: 7 additions & 2 deletions tests/terminate/terminate_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ import (
var (
testName = "Cluster resource cleanup after termination"
namespace = "test-terminate-cleanup"
dcName = "dc2"
dcYaml = "../testdata/default-single-rack-single-node-dc.yaml"
dcName = "dc1"
dcYaml = "../testdata/oss-one-node-dc-without-mtls.yaml"
operatorYaml = "../testdata/operator.yaml"
dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName)
dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName)
Expand Down Expand Up @@ -77,6 +77,11 @@ var _ = Describe(testName, func() {
k = kubectl.DeleteFromFiles(dcYaml)
ns.ExecAndLog(step, k)

k = kubectl.Logs().
WithLabel("statefulset.kubernetes.io/pod-name=cluster1-dc1-r1-sts-0").
WithFlag("container", "cassandra")
ns.WaitForOutputContainsAndLog(step, k, "node/drain status=200 OK", 30)

step = "checking that the dc no longer exists"
json = "jsonpath={.items}"
k = kubectl.Get("CassandraDatacenter").
Expand Down
12 changes: 10 additions & 2 deletions tests/test_mtls_mgmt_api/test_mtls_mgmt_api_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
testName = "test mtls protecting mgmt api"
namespace = "test-mtls-for-mgmt-api"
dcName = "dc1"
dcYaml = "../testdata/dse-one-node-dc-with-mtls.yaml"
dcYaml = "../testdata/oss-one-node-dc-with-mtls.yaml"
operatorYaml = "../testdata/operator.yaml"
dcResource = fmt.Sprintf("CassandraDatacenter/%s", dcName)
dcLabel = fmt.Sprintf("cassandra.datastax.com/datacenter=%s", dcName)
Expand Down Expand Up @@ -61,7 +61,8 @@ var _ = Describe(testName, func() {
k = kubectl.ApplyFiles(dcYaml)
ns.ExecAndLog(step, k)

ns.WaitForDatacenterReady(dcName)
// This takes a while sometimes in my dev environment
ns.WaitForDatacenterReadyWithTimeouts(dcName, 600, 120)

step = "scale up to 2 nodes"
json := "{\"spec\": {\"size\": 2}}"
Expand All @@ -75,6 +76,13 @@ var _ = Describe(testName, func() {
k = kubectl.DeleteFromFiles(dcYaml)
ns.ExecAndLog(step, k)

// TODO FIXME: re-enable this when the following issue is fixed:
// https://github.com/datastax/management-api-for-apache-cassandra/issues/42
// k = kubectl.Logs().
// WithLabel("statefulset.kubernetes.io/pod-name=cluster1-dc1-r1-sts-0").
// WithFlag("container", "cassandra")
// ns.WaitForOutputContainsAndLog(step, k, "node/drain status=200 OK", 30)

step = "checking that the dc no longer exists"
json = "jsonpath={.items}"
k = kubectl.Get("CassandraDatacenter").
Expand Down
36 changes: 36 additions & 0 deletions tests/testdata/default-single-rack-single-node-prestop-dc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
apiVersion: cassandra.datastax.com/v1beta1
kind: CassandraDatacenter
metadata:
name: dc2
spec:
clusterName: cluster2
serverType: dse
serverVersion: "6.8.4"
managementApiAuth:
insecure: {}
size: 1
podTemplateSpec:
spec:
containers:
- name: "cassandra"
lifecycle:
preStop:
exec:
command: ["/bin/sleep", "6000s"]
storageConfig:
cassandraDataVolumeClaimSpec:
storageClassName: server-storage
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
racks:
- name: r1
config:
jvm-server-options:
initial_heap_size: "800m"
max_heap_size: "800m"
cassandra-yaml:
file_cache_size_in_mb: 100
memtable_space_in_mb: 100
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ metadata:
name: dc1
spec:
clusterName: cluster1
serverType: dse
serverVersion: "6.8.4"
serverType: cassandra
serverVersion: "3.11.7"
managementApiAuth:
manual:
clientSecretName: mgmt-api-client-credentials
Expand All @@ -22,6 +22,6 @@ spec:
racks:
- name: r1
config:
jvm-server-options:
jvm-options:
initial_heap_size: "800m"
max_heap_size: "800m"
Loading

0 comments on commit 3686fd1

Please sign in to comment.