Skip to content

Commit

Permalink
Fix to handle hash collisions correctly for DaemonSet
Browse files Browse the repository at this point in the history
  • Loading branch information
mortent committed Aug 8, 2018
1 parent e626d35 commit a93ea43
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 5 deletions.
10 changes: 5 additions & 5 deletions pkg/controller/daemon/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,16 +330,16 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
}

history, err = dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Create(history)
if errors.IsAlreadyExists(err) {
if outerErr := err; errors.IsAlreadyExists(outerErr) {
// TODO: Is it okay to get from historyLister?
existedHistory, getErr := dsc.kubeClient.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
if getErr != nil {
return nil, getErr
}
// Check if we already created it
done, err := Match(ds, existedHistory)
if err != nil {
return nil, err
done, matchErr := Match(ds, existedHistory)
if matchErr != nil {
return nil, matchErr
}
if done {
return existedHistory, nil
Expand All @@ -360,7 +360,7 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (*
return nil, updateErr
}
glog.V(2).Infof("Found a hash collision for DaemonSet %q - bumping collisionCount to %d to resolve it", ds.Name, *currDS.Status.CollisionCount)
return nil, err
return nil, outerErr
}
return history, err
}
Expand Down
2 changes: 2 additions & 0 deletions test/integration/daemonset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ go_test(
deps = [
"//pkg/api/legacyscheme:go_default_library",
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/daemon:go_default_library",
"//pkg/features:go_default_library",
"//pkg/scheduler:go_default_library",
"//pkg/scheduler/algorithm:go_default_library",
"//pkg/scheduler/algorithmprovider:go_default_library",
"//pkg/scheduler/factory:go_default_library",
"//pkg/util/labels:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/apps/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
Expand Down
157 changes: 157 additions & 0 deletions test/integration/daemonset/daemonset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/legacyscheme"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/algorithm"
"k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
_ "k8s.io/kubernetes/pkg/scheduler/algorithmprovider"
"k8s.io/kubernetes/pkg/scheduler/factory"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/test/integration/framework"
)
Expand Down Expand Up @@ -372,6 +374,52 @@ func waitForPodsCreated(podInformer cache.SharedIndexInformer, num int) error {
})
}

func waitForDaemonSetAndControllerRevisionCreated(c clientset.Interface, name string, namespace string) error {
return wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
ds, err := c.AppsV1().DaemonSets(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ds == nil {
return false, nil
}

revs, err := c.AppsV1().ControllerRevisions(namespace).List(metav1.ListOptions{})
if err != nil {
return false, err
}
if revs.Size() == 0 {
return false, nil
}

for _, rev := range revs.Items {
for _, oref := range rev.OwnerReferences {
if oref.Kind == "DaemonSet" && oref.UID == ds.UID {
return true, nil
}
}
}
return false, nil
})
}

func hashAndNameForDaemonSet(ds *apps.DaemonSet) (string, string) {
hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
name := ds.Name + "-" + hash
return hash, name
}

func validateDaemonSetCollisionCount(dsClient appstyped.DaemonSetInterface, dsName string, expCount int32, t *testing.T) {
ds, err := dsClient.Get(dsName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to look up DaemonSet: %v", err)
}
collisionCount := ds.Status.CollisionCount
if *collisionCount != expCount {
t.Fatalf("Expected collisionCount to be %d, but found %d", expCount, *collisionCount)
}
}

func validateDaemonSetStatus(
dsClient appstyped.DaemonSetInterface,
dsName string,
Expand Down Expand Up @@ -740,3 +788,112 @@ func TestInsufficientCapacityNodeWhenScheduleDaemonSetPodsEnabled(t *testing.T)
validateDaemonSetStatus(dsClient, ds.Name, 1, t)
})
}

// TestLaunchWithHashCollision tests that a DaemonSet can be updated even if there is a
// hash collision with an existing ControllerRevision
func TestLaunchWithHashCollision(t *testing.T) {
server, closeFn, dc, informers, clientset := setup(t)
defer closeFn()
ns := framework.CreateTestingNamespace("one-node-daemonset-test", server, t)
defer framework.DeleteTestingNamespace(ns, server, t)

dsClient := clientset.AppsV1().DaemonSets(ns.Name)
podInformer := informers.Core().V1().Pods().Informer()
nodeClient := clientset.CoreV1().Nodes()

stopCh := make(chan struct{})
defer close(stopCh)

informers.Start(stopCh)
go dc.Run(1, stopCh)

setupScheduler(t, clientset, informers, stopCh)

// Create single node
_, err := nodeClient.Create(newNode("single-node", nil))
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}

// Create new DaemonSet with RollingUpdate strategy
orgDs := newDaemonSet("foo", ns.Name)
oneIntString := intstr.FromInt(1)
orgDs.Spec.UpdateStrategy = apps.DaemonSetUpdateStrategy{
Type: apps.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &apps.RollingUpdateDaemonSet{
MaxUnavailable: &oneIntString,
},
}
ds, err := dsClient.Create(orgDs)
if err != nil {
t.Fatalf("Failed to create DaemonSet: %v", err)
}

// Wait for the DaemonSet to be created before proceeding
err = waitForDaemonSetAndControllerRevisionCreated(clientset, ds.Name, ds.Namespace)
if err != nil {
t.Fatalf("Failed to create DeamonSet: %v", err)
}

ds, err = dsClient.Get(ds.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get DaemonSet: %v", err)
}
var orgCollisionCount int32
if ds.Status.CollisionCount != nil {
orgCollisionCount = *ds.Status.CollisionCount
}

// Look up the ControllerRevision for the DaemonSet
_, name := hashAndNameForDaemonSet(ds)
revision, err := clientset.AppsV1().ControllerRevisions(ds.Namespace).Get(name, metav1.GetOptions{})
if err != nil || revision == nil {
t.Fatalf("Failed to look up ControllerRevision: %v", err)
}

// Create a "fake" ControllerRevision that we know will create a hash collision when we make
// the next update
one := int64(1)
ds.Spec.Template.Spec.TerminationGracePeriodSeconds = &one

newHash, newName := hashAndNameForDaemonSet(ds)
newRevision := &apps.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Name: newName,
Namespace: ds.Namespace,
Labels: labelsutil.CloneAndAddLabel(ds.Spec.Template.Labels, apps.DefaultDaemonSetUniqueLabelKey, newHash),
Annotations: ds.Annotations,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(ds, apps.SchemeGroupVersion.WithKind("DaemonSet"))},
},
Data: revision.Data,
Revision: revision.Revision + 1,
}
_, err = clientset.AppsV1().ControllerRevisions(ds.Namespace).Create(newRevision)
if err != nil {
t.Fatalf("Failed to create ControllerRevision: %v", err)
}

// Make an update of the DaemonSet which we know will create a hash collision when
// the next ControllerRevision is created.
_, err = dsClient.Update(ds)
if err != nil {
t.Fatalf("Failed to update DaemonSet: %v", err)
}

// Wait for any pod with the latest Spec to exist
err = wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) {
objects := podInformer.GetIndexer().List()
for _, object := range objects {
pod := object.(*v1.Pod)
if *pod.Spec.TerminationGracePeriodSeconds == *ds.Spec.Template.Spec.TerminationGracePeriodSeconds {
return true, nil
}
}
return false, nil
})
if err != nil {
t.Fatalf("Failed to wait for Pods with the latest Spec to be created: %v", err)
}

validateDaemonSetCollisionCount(dsClient, ds.Name, orgCollisionCount+1, t)
}

0 comments on commit a93ea43

Please sign in to comment.