Skip to content

Commit

Permalink
Merge pull request #2651 from tejal29/fix_port_forwarding
Browse files Browse the repository at this point in the history
Don't look up services in all namespaces.
  • Loading branch information
tejal29 authored Aug 15, 2019
2 parents d815a0c + 14c87ac commit 3cc1e79
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 25 deletions.
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewForwarderManager(out io.Writer, cli *kubectl.CLI, podSelector kubernetes

ForwarderManager := &ForwarderManager{
output: out,
Forwarders: []Forwarder{NewResourceForwarder(em, label, userDefined)},
Forwarders: []Forwarder{NewResourceForwarder(em, namespaces, label, userDefined)},
}

if opts.ForwardPods {
Expand Down
44 changes: 25 additions & 19 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
// services deployed by skaffold.
type ResourceForwarder struct {
EntryManager
namespaces []string
userDefinedResources []*latest.PortForwardResource
label string
}
Expand All @@ -39,12 +40,14 @@ var (
// For testing
retrieveAvailablePort = util.GetAvailablePort
retrieveServices = retrieveServiceResources
getClientSet = kubernetes.GetClientset
)

// NewResourceForwarder returns a struct that tracks and port-forwards pods as they are created and modified
func NewResourceForwarder(em EntryManager, label string, userDefinedResources []*latest.PortForwardResource) *ResourceForwarder {
func NewResourceForwarder(em EntryManager, namespaces []string, label string, userDefinedResources []*latest.PortForwardResource) *ResourceForwarder {
return &ResourceForwarder{
EntryManager: em,
namespaces: namespaces,
userDefinedResources: userDefinedResources,
label: label,
}
Expand All @@ -53,7 +56,7 @@ func NewResourceForwarder(em EntryManager, label string, userDefinedResources []
// Start gets a list of services deployed by skaffold as []latest.PortForwardResource and
// forwards them.
func (p *ResourceForwarder) Start(ctx context.Context) error {
serviceResources, err := retrieveServices(p.label)
serviceResources, err := retrieveServices(p.label, p.namespaces)
if err != nil {
return errors.Wrap(err, "retrieving services for automatic port forwarding")
}
Expand Down Expand Up @@ -97,27 +100,30 @@ func (p *ResourceForwarder) getCurrentEntry(resource latest.PortForwardResource)

// retrieveServiceResources retrieves all services in the cluster matching the given label
// as a list of PortForwardResources
func retrieveServiceResources(label string) ([]*latest.PortForwardResource, error) {
clientset, err := kubernetes.GetClientset()
func retrieveServiceResources(label string, namespaces []string) ([]*latest.PortForwardResource, error) {
clientset, err := getClientSet()
if err != nil {
return nil, errors.Wrap(err, "getting clientset")
}
services, err := clientset.CoreV1().Services("").List(metav1.ListOptions{
LabelSelector: label,
})
if err != nil {
return nil, errors.Wrapf(err, "selecting services by label %s", label)
}

var resources []*latest.PortForwardResource
for _, s := range services.Items {
for _, p := range s.Spec.Ports {
resources = append(resources, &latest.PortForwardResource{
Type: constants.Service,
Name: s.Name,
Namespace: s.Namespace,
Port: int(p.Port),
LocalPort: int(p.Port),
})
for _, ns := range namespaces {
services, err := clientset.CoreV1().Services(ns).List(metav1.ListOptions{
LabelSelector: label,
})
if err != nil {
return nil, errors.Wrapf(err, "selecting services by label %s", label)
}
for _, s := range services.Items {
for _, p := range s.Spec.Ports {
resources = append(resources, &latest.PortForwardResource{
Type: constants.Service,
Name: s.Name,
Namespace: s.Namespace,
Port: int(p.Port),
LocalPort: int(p.Port),
})
}
}
}
return resources, nil
Expand Down
115 changes: 110 additions & 5 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,26 @@ package portforward

import (
"context"
"fmt"
"io/ioutil"
"sync"
"testing"
"time"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/deploy"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/GoogleContainerTools/skaffold/pkg/skaffold/constants"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/event"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/testutil"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
)

type testForwarder struct {
Expand Down Expand Up @@ -116,11 +123,11 @@ func TestStart(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
event.InitializeState(latest.BuildConfig{})
fakeForwarder := newTestForwarder()
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), "", nil)
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), []string{"test"}, "", nil)
rf.EntryForwarder = fakeForwarder

t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, test.availablePorts))
t.Override(&retrieveServices, func(string) ([]*latest.PortForwardResource, error) {
t.Override(&retrieveServices, func(string, []string) ([]*latest.PortForwardResource, error) {
return test.resources, nil
})

Expand Down Expand Up @@ -183,7 +190,7 @@ func TestGetCurrentEntryFunc(t *testing.T) {
expectedEntry := test.expected
expectedEntry.resource = test.resource

rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), "", nil)
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), []string{"test"}, "", nil)
rf.forwardedResources = forwardedResources{
resources: test.forwardedResources,
lock: &sync.Mutex{},
Expand Down Expand Up @@ -226,11 +233,11 @@ func TestUserDefinedResources(t *testing.T) {
testutil.Run(t, "one service and one user defined pod", func(t *testutil.T) {
event.InitializeState(latest.BuildConfig{})
fakeForwarder := newTestForwarder()
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), "", []*latest.PortForwardResource{pod})
rf := NewResourceForwarder(NewEntryManager(ioutil.Discard, nil), []string{"test"}, "", []*latest.PortForwardResource{pod})
rf.EntryForwarder = fakeForwarder

t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, []int{8080, 9000}))
t.Override(&retrieveServices, func(string) ([]*latest.PortForwardResource, error) {
t.Override(&retrieveServices, func(string, []string) ([]*latest.PortForwardResource, error) {
return []*latest.PortForwardResource{svc}, nil
})

Expand All @@ -246,3 +253,101 @@ func TestUserDefinedResources(t *testing.T) {
}
})
}

func mockClient(m kubernetes.Interface) func() (kubernetes.Interface, error) {
return func() (kubernetes.Interface, error) {
return m, nil
}

}

func TestRetrieveServices(t *testing.T) {
tests := []struct {
description string
namespaces []string
services []*v1.Service
expected []*latest.PortForwardResource
}{
{
description: "multiple services in multiple namespaces",
namespaces: []string{"test", "test1"},
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "test",
Labels: map[string]string{
deploy.RunIDLabel: "9876-6789",
},
},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 8080}}},
}, {
ObjectMeta: metav1.ObjectMeta{
Name: "svc2",
Namespace: "test1",
Labels: map[string]string{
deploy.RunIDLabel: "9876-6789",
},
},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 8081}}},
},
},
expected: []*latest.PortForwardResource{{
Type: constants.Service,
Name: "svc1",
Namespace: "test",
Port: 8080,
LocalPort: 8080,
}, {
Type: constants.Service,
Name: "svc2",
Namespace: "test1",
Port: 8081,
LocalPort: 8081,
}},
}, {
description: "no services in given namespace",
namespaces: []string{"randon"},
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "test",
Labels: map[string]string{
deploy.RunIDLabel: "9876-6789",
},
},
Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 8080}}},
},
},
}, {
description: "services present but does not expose any port",
namespaces: []string{"test"},
services: []*v1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "svc1",
Namespace: "test",
Labels: map[string]string{
deploy.RunIDLabel: "9876-6789",
},
},
},
},
},
}

for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
objs := make([]runtime.Object, len(test.services))
for i, s := range test.services {
objs[i] = s
}
client := fakekubeclientset.NewSimpleClientset(objs...)
t.Override(&getClientSet, mockClient(client))
actual, err := retrieveServiceResources(fmt.Sprintf("%s=9876-6789", deploy.RunIDLabel), test.namespaces)
t.CheckNoError(err)
t.CheckDeepEqual(test.expected, actual)
})
}
}

0 comments on commit 3cc1e79

Please sign in to comment.