Skip to content

Commit

Permalink
Headless ClusterIP service for additional seeds (#175)
Browse files Browse the repository at this point in the history
* initial implementation of externalname services for additional seeds

* Sprintf flag fix

* WIP - test

* WIP - create endpoints for additional seed service

* reference fix

* test fixes

* removed unneeded function

* removed comment

* re-added yaml file

* removed comment

* added guard clause
  • Loading branch information
respringer authored Jul 24, 2020
1 parent bac509a commit 5f72962
Show file tree
Hide file tree
Showing 8 changed files with 490 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,10 @@ func (dc *CassandraDatacenter) GetSeedServiceName() string {
return dc.Spec.ClusterName + "-seed-service"
}

func (dc *CassandraDatacenter) GetAdditionalSeedsServiceName() string {
return dc.Spec.ClusterName + "-" + dc.Name + fmt.Sprintf("-additional-seed-service")
}

func (dc *CassandraDatacenter) GetAllPodsServiceName() string {
return dc.Spec.ClusterName + "-" + dc.Name + "-all-pods-service"
}
Expand Down
48 changes: 48 additions & 0 deletions operator/pkg/reconciliation/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,54 @@ func newSeedServiceForCassandraDatacenter(dc *api.CassandraDatacenter) *corev1.S
return service
}

// newAdditionalSeedServiceForCassandraDatacenter creates a headless service owned by the CassandraDatacenter,
// whether the additional seed pods are ready or not
func newAdditionalSeedServiceForCassandraDatacenter(dc *api.CassandraDatacenter) *corev1.Service {
labels := dc.GetDatacenterLabels()
oplabels.AddManagedByLabel(labels)
var service corev1.Service
service.ObjectMeta.Name = dc.GetAdditionalSeedsServiceName()
service.ObjectMeta.Namespace = dc.Namespace
service.ObjectMeta.Labels = labels
// We omit the label selector because we will create the endpoints manually
service.Spec.Type = "ClusterIP"
service.Spec.ClusterIP = "None"
service.Spec.PublishNotReadyAddresses = true

addHashAnnotation(&service)

return &service
}

func newEndpointsForAdditionalSeeds(dc *api.CassandraDatacenter) *corev1.Endpoints {
labels := dc.GetDatacenterLabels()
oplabels.AddManagedByLabel(labels)
var endpoints corev1.Endpoints
endpoints.ObjectMeta.Name = dc.GetAdditionalSeedsServiceName()
endpoints.ObjectMeta.Namespace = dc.Namespace
endpoints.ObjectMeta.Labels = labels

var addresses []corev1.EndpointAddress

for seedIdx := range dc.Spec.AdditionalSeeds {
additionalSeed := dc.Spec.AdditionalSeeds[seedIdx]
addresses = append(addresses, corev1.EndpointAddress{
IP: additionalSeed,
})
}

// See: https://godoc.org/k8s.io/api/core/v1#Endpoints
endpoints.Subsets = []corev1.EndpointSubset{
{
Addresses: addresses,
},
}

addHashAnnotation(&endpoints)

return &endpoints
}

// newAllPodsServiceForCassandraDatacenter creates a headless service owned by the CassandraDatacenter,
// which covers all server pods in the datacenter, whether they are ready or not
func newAllPodsServiceForCassandraDatacenter(dc *api.CassandraDatacenter) *corev1.Service {
Expand Down
1 change: 1 addition & 0 deletions operator/pkg/reconciliation/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type ReconciliationContext struct {
Ctx context.Context

Services []*corev1.Service
Endpoints *corev1.Endpoints
desiredRackInformation []*RackInformation
statefulSets []*appsv1.StatefulSet
dcPods []*corev1.Pod
Expand Down
14 changes: 9 additions & 5 deletions operator/pkg/reconciliation/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (

"github.com/datastax/cass-operator/operator/internal/result"
api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"
"github.com/datastax/cass-operator/operator/pkg/httphelper"
"github.com/datastax/cass-operator/operator/pkg/dynamicwatch"
"github.com/datastax/cass-operator/operator/pkg/httphelper"
)

// Use a var so we can mock this function
Expand All @@ -48,6 +48,10 @@ func (rc *ReconciliationContext) calculateReconciliationActions() (reconcile.Res
return result.Output()
}

if result := rc.CheckAdditionalSeedEndpoints(); result.Completed() {
return result.Output()
}

if err := rc.CalculateRackInformation(); err != nil {
return result.Error(err).Output()
}
Expand All @@ -69,12 +73,12 @@ var log = logf.Log.WithName("reconciliation_handler")
type ReconcileCassandraDatacenter struct {
// This client, initialized using mgr.client() above, is a split client
// that reads objects from the cache and writes to the apiserver
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder
client client.Client
scheme *runtime.Scheme
recorder record.EventRecorder

// SecretWatches is used in the controller when setting up the watches and
// during reconciliation where we update the mappings for the watches.
// during reconciliation where we update the mappings for the watches.
// Putting it here allows us to get it to both places.
SecretWatches dynamicwatch.DynamicWatches
}
Expand Down
111 changes: 111 additions & 0 deletions operator/pkg/reconciliation/reconcile_endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright DataStax, Inc.
// Please see the included license file for details.

package reconciliation

import (
"github.com/datastax/cass-operator/operator/internal/result"
api "github.com/datastax/cass-operator/operator/pkg/apis/cassandra/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"

"github.com/datastax/cass-operator/operator/pkg/utils"
)

func (rc *ReconciliationContext) CreateEndpointsForAdditionalSeedService() result.ReconcileResult {
// unpacking
logger := rc.ReqLogger
client := rc.Client
endpoints := rc.Endpoints

logger.Info(
"Creating endpoints for additional seed service",
"endpointsNamespace", endpoints.Namespace,
"endpointsName", endpoints.Name)

if err := setOperatorProgressStatus(rc, api.ProgressUpdating); err != nil {
return result.Error(err)
}

if err := client.Create(rc.Ctx, endpoints); err != nil {
logger.Error(err, "Could not create endpoints for additional seed service")

return result.Error(err)
}

rc.Recorder.Eventf(rc.Datacenter, "Normal", "CreatedResource", "Created endpoints %s", endpoints.Name)

return result.Continue()
}

func (rc *ReconciliationContext) CheckAdditionalSeedEndpoints() result.ReconcileResult {
// unpacking
logger := rc.ReqLogger
dc := rc.Datacenter
client := rc.Client

logger.Info("reconcile_endpoints::CheckAdditionalSeedEndpoints")

if len(dc.Spec.AdditionalSeeds) == 0 {
return result.Continue()
}

desiredEndpoints := newEndpointsForAdditionalSeeds(dc)

createNeeded := false

// Set CassandraDatacenter dc as the owner and controller
err := setControllerReference(dc, desiredEndpoints, rc.Scheme)
if err != nil {
logger.Error(err, "Could not set controller reference for endpoints for additional seed service")
return result.Error(err)
}

// See if the Endpoints already exists
nsName := types.NamespacedName{Name: desiredEndpoints.Name, Namespace: desiredEndpoints.Namespace}
currentEndpoints := &corev1.Endpoints{}
err = client.Get(rc.Ctx, nsName, currentEndpoints)

if err != nil && errors.IsNotFound(err) {
// if it's not found, we need to create it
createNeeded = true

} else if err != nil {
// if we hit a k8s error, log it and error out
logger.Error(err, "Could not get endoints for additional seed service",
"name", nsName,
)
return result.Error(err)

} else {
// if we found the endpoints already, check if it needs updating
if !resourcesHaveSameHash(currentEndpoints, desiredEndpoints) {
resourceVersion := currentEndpoints.GetResourceVersion()
// preserve any labels and annotations that were added to the Endpoints post-creation
desiredEndpoints.Labels = utils.MergeMap(map[string]string{}, currentEndpoints.Labels, desiredEndpoints.Labels)
desiredEndpoints.Annotations = utils.MergeMap(map[string]string{}, currentEndpoints.Annotations, desiredEndpoints.Annotations)

logger.Info("Updating endpoints for additional seed service",
"endpoints", currentEndpoints,
"desired", desiredEndpoints)

desiredEndpoints.DeepCopyInto(currentEndpoints)

currentEndpoints.SetResourceVersion(resourceVersion)

if err := client.Update(rc.Ctx, currentEndpoints); err != nil {
logger.Error(err, "Unable to update endpoints for additional seed service",
"endpoints", currentEndpoints)
return result.Error(err)
}
}
}

if createNeeded {
rc.Endpoints = desiredEndpoints
return rc.CreateEndpointsForAdditionalSeedService()
}

return result.Continue()
}
5 changes: 5 additions & 0 deletions operator/pkg/reconciliation/reconcile_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ func (rc *ReconciliationContext) CheckHeadlessServices() result.ReconcileResult

services := []*corev1.Service{cqlService, seedService, allPodsService}

if len(dc.Spec.AdditionalSeeds) > 0 {
additionalSeedService := newAdditionalSeedServiceForCassandraDatacenter(dc)
services = append(services, additionalSeedService)
}

createNeeded := []*corev1.Service{}

for idx := range services {
Expand Down
Loading

0 comments on commit 5f72962

Please sign in to comment.