Skip to content

Commit

Permalink
fall back to direct client if split client GET returns 404
Browse files Browse the repository at this point in the history
Cf. bluek8s#267

To ensure this, remove the direct export of the client from the shared pkg, and instead require calling shared.Get, shared.Create, etc.

For the most part those functions will call the split client, but Get will have the desired special behavior.
  • Loading branch information
joel-bluedata committed Feb 11, 2020
1 parent 46b55d4 commit 7b4744f
Show file tree
Hide file tree
Showing 11 changed files with 117 additions and 39 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/kubedirectorcluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *ReconcileKubeDirectorCluster) syncCluster(
if (updateErr == nil) && finalizersChanged {
// See https://github.com/bluek8s/kubedirector/issues/194
// Migrate Client().Update() calls back to Patch() calls.
updateErr = shared.Client().Update(context.TODO(), cr)
updateErr = shared.Update(context.TODO(), cr)
}
// Bail out if we're done.
if updateErr == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (r *ReconcileKubeDirectorCluster) Reconcile(request reconcile.Request) (rec

// Fetch the KubeDirectorCluster instance.
cr := &kdv1.KubeDirectorCluster{}
err := shared.Client().Get(context.TODO(), request.NamespacedName, cr)
err := shared.Get(context.TODO(), request.NamespacedName, cr)
if err != nil {
// If the resource is not found, that means all of the finalizers have
// been removed, and the kubedirectorcluster resource has been deleted,
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/kubedirectorconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *ReconcileKubeDirectorConfig) syncConfig(
if statusChanged {
cr.Status.GenerationUID = uuid.New().String()
StatusGens.WriteStatusGen(cr.UID, cr.Status.GenerationUID)
updateErr = shared.Client().Status().Update(context.TODO(), cr)
updateErr = shared.StatusUpdate(context.TODO(), cr)
// If this succeeded, no need to do it again on next iteration
// if we're just cycling because of a failure to update the
// finalizer.
Expand All @@ -90,7 +90,7 @@ func (r *ReconcileKubeDirectorConfig) syncConfig(
if (updateErr == nil) && finalizersChanged {
// See https://github.com/bluek8s/kubedirector/issues/194
// Migrate Client().Update() calls back to Patch() calls.
updateErr = shared.Client().Update(context.TODO(), cr)
updateErr = shared.Update(context.TODO(), cr)
}
// Bail out if we're done.
if updateErr == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (r *ReconcileKubeDirectorConfig) Reconcile(request reconcile.Request) (reco

// Fetch the KubeDirectorConfig instance.
cr := &kdv1.KubeDirectorConfig{}
err := shared.Client().Get(context.TODO(), request.NamespacedName, cr)
err := shared.Get(context.TODO(), request.NamespacedName, cr)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func UpdateClusterStatus(
// emptystring, and remove any MemberStatus where Pod is emptystring.
compact(&(cr.Status.Roles))

return shared.Client().Status().Update(context.TODO(), cr)
return shared.StatusUpdate(context.TODO(), cr)
}

// compact edits the input slice of role statuses so that any elements that
Expand Down
12 changes: 6 additions & 6 deletions pkg/executor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func CreateHeadlessService(
} else {
service.ObjectMeta.Name = cr.Status.ClusterService
}
err := shared.Client().Create(context.TODO(), service)
err := shared.Create(context.TODO(), service)

return service, err
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func CreatePodService(
}
service.Spec.Ports = append(service.Spec.Ports, servicePort)
}
createErr := shared.Client().Create(context.TODO(), service)
createErr := shared.Create(context.TODO(), service)
return service, createErr
}

Expand Down Expand Up @@ -223,7 +223,7 @@ func DeletePodService(
},
}

return shared.Client().Delete(context.TODO(), toDelete)
return shared.Delete(context.TODO(), toDelete)
}

// UpdateService updates a service
Expand All @@ -233,7 +233,7 @@ func UpdateService(
service *corev1.Service,
) error {

err := shared.Client().Update(context.TODO(), service)
err := shared.Update(context.TODO(), service)
if err == nil {
return nil
}
Expand All @@ -254,7 +254,7 @@ func UpdateService(
// If there was a resourceVersion conflict then fetch a more
// recent version of the object and attempt to update that.
currentService := &corev1.Service{}
err = shared.Client().Get(
err = shared.Get(
context.TODO(),
types.NamespacedName{
Namespace: service.Namespace,
Expand All @@ -275,7 +275,7 @@ func UpdateService(
}

currentService.Spec.Type = service.Spec.Type
err = shared.Client().Update(context.TODO(), currentService)
err = shared.Update(context.TODO(), currentService)
if err != nil {
shared.LogErrorf(
reqLogger,
Expand Down
10 changes: 5 additions & 5 deletions pkg/executor/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func CreateStatefulSet(
if err != nil {
return nil, err
}
return statefulSet, shared.Client().Create(context.TODO(), statefulSet)
return statefulSet, shared.Create(context.TODO(), statefulSet)
}

// UpdateStatefulSetReplicas modifies an existing statefulset in k8s to have
Expand All @@ -67,7 +67,7 @@ func UpdateStatefulSetReplicas(
) error {

*statefulSet.Spec.Replicas = replicas
err := shared.Client().Update(context.TODO(), statefulSet)
err := shared.Update(context.TODO(), statefulSet)
if err == nil {
return nil
}
Expand All @@ -93,7 +93,7 @@ func UpdateStatefulSetReplicas(
Name: statefulSet.Name,
}
*statefulSet = appsv1.StatefulSet{}
err = shared.Client().Get(context.TODO(), name, statefulSet)
err = shared.Get(context.TODO(), name, statefulSet)
if err != nil {
shared.LogError(
reqLogger,
Expand All @@ -106,7 +106,7 @@ func UpdateStatefulSetReplicas(
}

*statefulSet.Spec.Replicas = replicas
err = shared.Client().Update(context.TODO(), statefulSet)
err = shared.Update(context.TODO(), statefulSet)
if err != nil {
shared.LogError(
reqLogger,
Expand Down Expand Up @@ -157,7 +157,7 @@ func DeleteStatefulSet(
Namespace: namespace,
},
}
return shared.Client().Delete(context.TODO(), toDelete)
return shared.Delete(context.TODO(), toDelete)
}

// getStatefulset composes the spec for creating a statefulset in k8s, based
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ func DeletePVC(
Namespace: namespace,
},
}
return shared.Client().Delete(context.TODO(), toDelete)
return shared.Delete(context.TODO(), toDelete)
}
24 changes: 12 additions & 12 deletions pkg/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GetCluster(
) (*kdv1.KubeDirectorCluster, error) {

result := &kdv1.KubeDirectorCluster{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: clusterName},
result,
Expand All @@ -54,7 +54,7 @@ func GetStatefulSet(
) (*appsv1.StatefulSet, error) {

result := &appsv1.StatefulSet{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: statefulSetName},
result,
Expand All @@ -69,7 +69,7 @@ func GetService(
) (*corev1.Service, error) {

result := &corev1.Service{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: serviceName},
result,
Expand All @@ -84,7 +84,7 @@ func GetPod(
) (*corev1.Pod, error) {

result := &corev1.Pod{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: podName},
result,
Expand All @@ -100,7 +100,7 @@ func GetPVC(
) (*corev1.PersistentVolumeClaim, error) {

result := &corev1.PersistentVolumeClaim{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: pvcName},
result,
Expand All @@ -116,7 +116,7 @@ func GetApp(
) (*kdv1.KubeDirectorApp, error) {

result := &kdv1.KubeDirectorApp{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: appName},
result,
Expand All @@ -136,7 +136,7 @@ func GetValidatorWebhook(
return nil, err
}
result := &v1beta1.MutatingWebhookConfiguration{}
err = shared.Client().Get(
err = shared.Get(
context.TODO(),
types.NamespacedName{Namespace: kdNamespace, Name: validator},
result,
Expand All @@ -151,7 +151,7 @@ func GetSecret(
) (*corev1.Secret, error) {

result := &corev1.Secret{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Namespace: namespace, Name: secretName},
result,
Expand All @@ -169,7 +169,7 @@ func GetDeployment(
return nil, err
}
result := &appsv1.Deployment{}
err = shared.Client().Get(
err = shared.Get(
context.TODO(),
types.NamespacedName{Namespace: kdNamespace, Name: deploymentName},
result,
Expand Down Expand Up @@ -210,7 +210,7 @@ func GetKDConfig(
}

result := &kdv1.KubeDirectorConfig{}
err = shared.Client().Get(
err = shared.Get(
context.TODO(),
types.NamespacedName{Namespace: kdNamespace, Name: kdConfigName},
result,
Expand All @@ -224,7 +224,7 @@ func GetStorageClass(
) (*storagev1.StorageClass, error) {

result := &storagev1.StorageClass{}
err := shared.Client().Get(
err := shared.Get(
context.TODO(),
types.NamespacedName{Name: storageClassName},
result,
Expand All @@ -238,7 +238,7 @@ func GetDefaultStorageClass() (*storagev1.StorageClass, error) {

// Namespace does not matter for this query; leave blank.
result := &storagev1.StorageClassList{}
err := shared.Client().List(context.TODO(), &client.ListOptions{}, result)
err := shared.List(context.TODO(), &client.ListOptions{}, result)
if err != nil {
return nil, err
}
Expand Down
90 changes: 84 additions & 6 deletions pkg/shared/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@
package shared

import (
"context"
"os"

"github.com/operator-framework/operator-sdk/pkg/k8sutil"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -33,9 +37,13 @@ var (
// config is a config to talk to the apiserver.
config *rest.Config

// client is a k8s client to perform K8s CRUD operations.
// client is a k8s client to perform K8s CRUD operations. Will be set to
// the split client (caching reads) after manager startup.
client k8sClient.Client

// directClient is always a non-caching client.
directClient k8sClient.Client

// clientSet is a REST API client that will be used for actions not
// supported through the operator SDK.
clientSet kubernetes.Interface
Expand All @@ -51,6 +59,7 @@ func init() {

config = getConfigFromServiceAccount()
client = getClient(config)
directClient = getClient(config)
clientSet = getClientSet(config)
eventRecorder = getEventRecorder()
}
Expand Down Expand Up @@ -116,11 +125,6 @@ func Config() *rest.Config {
return config
}

// Client getter ...
func Client() k8sClient.Client {
return client
}

// SetClient setter ...
func SetClient(c k8sClient.Client) {
client = c
Expand All @@ -130,3 +134,77 @@ func SetClient(c k8sClient.Client) {
func ClientSet() kubernetes.Interface {
return clientSet
}

// Create uses the split client. Should write back directly to K8s, but we'll
// use the split client in case it ever wants to use the knowledge that we
// are changing the object.
func Create(
ctx context.Context,
obj runtime.Object,
) error {

return client.Create(ctx, obj)
}

// Get will first try a GET through the split client. If this returns 404,
// it will try the direct client.
// Cf. https://github.com/bluek8s/kubedirector/issues/267
func Get(
ctx context.Context,
key types.NamespacedName,
obj runtime.Object,
) error {

getErr := client.Get(ctx, key, obj)
if (getErr == nil) || (!errors.IsNotFound(getErr)) {
return getErr
}
return directClient.Get(ctx, key, obj)
}

// List uses the split client. Currently we don't have usecases where we
// would need to fall back to the direct client if the list has zero items,
// and it would be somewhat involved to examine the list object here to
// determine the zero-items case.
func List(
ctx context.Context,
opts *k8sClient.ListOptions,
list runtime.Object,
) error {

return client.List(ctx, opts, list)
}

// Update uses the split client. Should write back directly to K8s, but we'll
// use the split client in case it ever wants to use the knowledge that we
// are changing the object.
func Update(
ctx context.Context,
obj runtime.Object,
) error {

return client.Update(ctx, obj)
}

// StatusUpdate uses the split client. Should write back directly to K8s, but
// we'll use the split client in case it ever wants to use the knowledge that
// we are changing the object.
func StatusUpdate(
ctx context.Context,
obj runtime.Object,
) error {

return client.Status().Update(ctx, obj)
}

// Delete uses the split client. Should write back directly to K8s, but
// we'll use the split client in case it ever wants to use the knowledge that
// we are deleting the object.
func Delete(
ctx context.Context,
obj runtime.Object,
opts ...k8sClient.DeleteOptionFunc,
) error {

return client.Delete(ctx, obj, opts...)
}
Loading

0 comments on commit 7b4744f

Please sign in to comment.