Skip to content

Commit

Permalink
use owner references
Browse files Browse the repository at this point in the history
  • Loading branch information
tejal29 committed Aug 6, 2021
1 parent 76e6a29 commit ce0e102
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 106 deletions.
30 changes: 19 additions & 11 deletions pkg/diag/validator/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,15 @@ var (

// PodValidator implements the Validator interface for Pods
type PodValidator struct {
k kubernetes.Interface
recos []Recommender
ignore map[string]struct{}
k kubernetes.Interface
ownerRef metav1.OwnerReference
recos []Recommender
}

// NewPodValidator initializes a PodValidator ignoring pods mentioned.
func NewPodValidator(k kubernetes.Interface, ignore []string) *PodValidator {
// NewPodValidator initializes a PodValidator
func NewPodValidator(k kubernetes.Interface, ownerRef metav1.OwnerReference) *PodValidator {
rs := []Recommender{recommender.ContainerError{}}
ignoreMap := map[string]struct{}{}
for _, s := range ignore {
ignoreMap[s] = struct{}{}
}
return &PodValidator{k: k, recos: rs, ignore: ignoreMap}
return &PodValidator{k: k, recos: rs, ownerRef: ownerRef}
}

// Validate implements the Validate method for Validator interface
Expand All @@ -95,7 +91,7 @@ func (p *PodValidator) Validate(ctx context.Context, ns string, opts metav1.List
eventsClient := p.k.CoreV1().Events(ns)
var rs []Resource
for _, po := range pods.Items {
if _, found := p.ignore[po.Name]; found {
if !isPodOwnedBy(po, p.ownerRef) {
continue
}
ps := p.getPodStatus(&po)
Expand Down Expand Up @@ -422,3 +418,15 @@ func executeCLI(cmdName string, args []string) ([]byte, error) {
cmd := exec.Command(cmdName, args...)
return cmd.CombinedOutput()
}

func isPodOwnedBy(po v1.Pod, ownerRef metav1.OwnerReference) bool {
if ownerRef.UID == "" {
return true
}
for _, ref := range po.OwnerReferences {
if ref.UID == ownerRef.UID {
return true
}
}
return false
}
26 changes: 11 additions & 15 deletions pkg/diag/validator/pod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestRun(t *testing.T) {
after := before.Add(3 * time.Second)
tests := []struct {
description string
ownerRef metav1.OwnerReference
pods []*v1.Pod
ignore map[string]struct{}
logOutput mockLogOutput
events []v1.Event
expected []Resource
Expand Down Expand Up @@ -726,13 +726,16 @@ func TestRun(t *testing.T) {
ErrCode: proto.StatusCode_STATUSCHECK_CONTAINER_EXEC_ERROR,
}, []string{"[foo foo-container] standard_init_linux.go:219: exec user process caused: exec format error"})},
},
// ignore pods

// Check to diagnose pods with owner references
{
description: "ignore pods returns no resources.",
description: "pods owned by a uuid",
ownerRef: metav1.OwnerReference{UID: "foo"},
pods: []*v1.Pod{{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "test",
Name: "foo",
Namespace: "test",
OwnerReferences: []metav1.OwnerReference{{UID: "none"}},
},
TypeMeta: metav1.TypeMeta{Kind: "Pod"},
Status: v1.PodStatus{
Expand All @@ -748,13 +751,6 @@ func TestRun(t *testing.T) {
},
}},
}},
ignore: map[string]struct{}{
"foo": {},
},
logOutput: mockLogOutput{
output: []byte("standard_init_linux.go:219: exec user process caused: exec format error"),
},
expected: nil,
},
}

Expand All @@ -775,7 +771,7 @@ func TestRun(t *testing.T) {
rs = append(rs, &v1.EventList{Items: test.events})
f := fakekubeclientset.NewSimpleClientset(rs...)

actual, err := testPodValidator(f, test.ignore).Validate(context.Background(), "test", metav1.ListOptions{})
actual, err := testPodValidator(f, test.ownerRef).Validate(context.Background(), "test", metav1.ListOptions{})
t.CheckNoError(err)
t.CheckDeepEqual(test.expected, actual, cmp.AllowUnexported(Resource{}), cmp.Comparer(func(x, y error) bool {
if x == nil && y == nil {
Expand All @@ -790,9 +786,9 @@ func TestRun(t *testing.T) {
}

// testPodValidator initializes a PodValidator like NewPodValidator except for loading custom rules
func testPodValidator(k kubernetes.Interface, m map[string]struct{}) *PodValidator {
func testPodValidator(k kubernetes.Interface, ownerRef metav1.OwnerReference) *PodValidator {
rs := []Recommender{recommender.ContainerError{}}
return &PodValidator{k: k, recos: rs, ignore: m}
return &PodValidator{k: k, recos: rs, ownerRef: ownerRef}
}

func TestPodConditionChecks(t *testing.T) {
Expand Down
15 changes: 2 additions & 13 deletions pkg/skaffold/kubernetes/status/resource/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,10 @@ var (
}
)

type Group map[string]struct{}
type Group map[string]*Deployment

func (r Group) Add(d *Deployment) {
r[d.ID()] = struct{}{}
}

func (r Group) AddPods(d *Deployment) {
for _, p := range d.pods {
id := fmt.Sprintf("%s:%s:%s", p.Name(), p.Namespace(), p.Kind())
r[id] = struct{}{}
}
}

func (r Group) AddID(id string) {
r[id] = struct{}{}
r[d.ID()] = d
}

func (r Group) Contains(d *Deployment) bool {
Expand Down
37 changes: 13 additions & 24 deletions pkg/skaffold/kubernetes/status/status_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"golang.org/x/sync/singleflight"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
deploymentutil "k8s.io/kubectl/pkg/util/deployment"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
"github.com/GoogleContainerTools/skaffold/pkg/diag/validator"
Expand Down Expand Up @@ -56,6 +57,9 @@ var (

// report resource status for pending resources 5 seconds.
reportStatusTime = 5 * time.Second

// testing
getReplicaSet = deploymentutil.GetAllReplicaSets
)

const (
Expand Down Expand Up @@ -84,7 +88,6 @@ type Monitor struct {
deadlineSeconds int
muteLogs bool
seenResources resource.Group
prevResources resource.Group
singleRun singleflight.Group
namespaces *[]string
kubeContext string
Expand All @@ -98,7 +101,6 @@ func NewStatusMonitor(cfg Config, labeller *label.DefaultLabeller, namespaces *[
labeller: labeller,
deadlineSeconds: cfg.StatusCheckDeadlineSeconds(),
seenResources: make(resource.Group),
prevResources: make(resource.Group),
singleRun: singleflight.Group{},
namespaces: namespaces,
kubeContext: cfg.GetKubeContext(),
Expand Down Expand Up @@ -132,12 +134,6 @@ func (s *Monitor) check(ctx context.Context, out io.Writer) error {
}

func (s *Monitor) Reset() {
s.prevResources.Reset()
for k := range s.seenResources {
if parts := strings.Split(k, ":"); len(parts) > 0 {
s.prevResources.AddID(parts[0])
}
}
s.seenResources.Reset()
}

Expand All @@ -150,7 +146,7 @@ func (s *Monitor) statusCheck(ctx context.Context, out io.Writer) (proto.StatusC
deployments := make([]*resource.Deployment, 0)
for _, n := range *s.namespaces {
newDeployments, err := getDeployments(ctx, client, n, s.labeller,
getDeadline(s.deadlineSeconds), s.prevResources)
getDeadline(s.deadlineSeconds))
if err != nil {
return proto.StatusCode_STATUSCHECK_DEPLOYMENT_FETCH_ERR, fmt.Errorf("could not fetch deployments: %w", err)
}
Expand Down Expand Up @@ -193,24 +189,17 @@ func (s *Monitor) statusCheck(ctx context.Context, out io.Writer) (proto.StatusC
// Wait for all deployment statuses to be fetched
wg.Wait()
cancel()
// update seen all pods retrieved.
s.updateSeenResources(deployments)
return getSkaffoldDeployStatus(c, deployments)
}

func getDeployments(ctx context.Context, client kubernetes.Interface, ns string, l *label.DefaultLabeller, deadlineDuration time.Duration, prevPods resource.Group) ([]*resource.Deployment, error) {
func getDeployments(ctx context.Context, client kubernetes.Interface, ns string, l *label.DefaultLabeller, deadlineDuration time.Duration) ([]*resource.Deployment, error) {
deps, err := client.AppsV1().Deployments(ns).List(ctx, metav1.ListOptions{
LabelSelector: l.RunIDSelector(),
})
if err != nil {
return nil, fmt.Errorf("could not fetch deployments: %w", err)
}

var ignore []string
for k := range prevPods {
ignore = append(ignore, k)
}

deployments := make([]*resource.Deployment, len(deps.Items))
for i, d := range deps.Items {
var deadline time.Duration
Expand All @@ -219,9 +208,15 @@ func getDeployments(ctx context.Context, client kubernetes.Interface, ns string,
} else {
deadline = time.Duration(*d.Spec.ProgressDeadlineSeconds) * time.Second
}
var ownerRef metav1.OwnerReference
if _, _, newRef, errRef := getReplicaSet(&d, client.AppsV1()); errRef == nil {
ownerRef = metav1.OwnerReference{
UID: newRef.GetUID(),
}
}
pd := diag.New([]string{d.Namespace}).
WithLabel(label.RunIDLabel, l.Labels()[label.RunIDLabel]).
WithValidators([]validator.Validator{validator.NewPodValidator(client, ignore)})
WithValidators([]validator.Validator{validator.NewPodValidator(client, ownerRef)})

for k, v := range d.Spec.Template.Labels {
pd = pd.WithLabel(k, v)
Expand Down Expand Up @@ -274,12 +269,6 @@ func pollDeploymentStatus(ctx context.Context, cfg kubectl.Config, r *resource.D
}
}

func (s *Monitor) updateSeenResources(deployments []*resource.Deployment) {
for _, d := range deployments {
s.seenResources.AddPods(d)
}
}

func getSkaffoldDeployStatus(c *counter, rs []*resource.Deployment) (proto.StatusCode, error) {
if c.failed == 0 {
return proto.StatusCode_STATUSCHECK_SUCCESS, nil
Expand Down
48 changes: 5 additions & 43 deletions pkg/skaffold/kubernetes/status/status_check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
appsclient "k8s.io/client-go/kubernetes/typed/apps/v1"
utilpointer "k8s.io/utils/pointer"

"github.com/GoogleContainerTools/skaffold/pkg/diag"
Expand Down Expand Up @@ -216,12 +217,15 @@ func TestGetDeployments(t *testing.T) {

for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&getReplicaSet, func(_ *appsv1.Deployment, _ appsclient.AppsV1Interface) ([]*appsv1.ReplicaSet, []*appsv1.ReplicaSet, *appsv1.ReplicaSet, error) {
return nil, nil, &appsv1.ReplicaSet{}, nil
})
objs := make([]runtime.Object, len(test.deps))
for i, dep := range test.deps {
objs[i] = dep
}
client := fakekubeclientset.NewSimpleClientset(objs...)
actual, err := getDeployments(context.Background(), client, "test", labeller, 200*time.Second, resource.Group{})
actual, err := getDeployments(context.Background(), client, "test", labeller, 200*time.Second)
t.CheckErrorAndDeepEqual(test.shouldErr, err, &test.expected, &actual,
cmp.AllowUnexported(resource.Deployment{}, resource.Status{}),
cmpopts.IgnoreInterfaces(struct{ diag.Diagnose }{}))
Expand Down Expand Up @@ -623,48 +627,6 @@ func TestPollDeployment(t *testing.T) {
}
}

func TestMonitorReset(t *testing.T) {
labeller := label.NewLabeller(true, nil, "run-id")
tests := []struct {
description string
seen resource.Group
prev resource.Group
expectedPrev resource.Group
}{
{
description: "1st dev iteration",
seen: resource.Group{},
prev: resource.Group{},
expectedPrev: resource.Group{},
},
{
description: "nth dev iteration",
prev: resource.Group{
"pod1": {},
"dep": {},
},
seen: resource.Group{
"pod2:ns:pod": {},
"dep:ns:deployment": {},
},
expectedPrev: resource.Group{
"pod2": {},
"dep": {},
},
},
}
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
m := NewStatusMonitor(&statusConfig{}, labeller, &[]string{"test"})
m.seenResources = test.seen
m.prevResources = test.prev
m.Reset()
t.CheckDeepEqual(test.expectedPrev, m.prevResources)
t.CheckDeepEqual(resource.Group{}, m.seenResources)
})
}
}

type mockValidator struct {
runs [][]validator.Resource
iteration int
Expand Down

0 comments on commit ce0e102

Please sign in to comment.