Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fix for RPC port detection #5715

Merged
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/kubectl_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (k *KubectlForwarder) forward(parentCtx context.Context, pfe *portForwardEn
}
pfe.terminationLock.Unlock()

if !isPortFree(pfe.localPort) {
if !isPortFree(util.Loopback, pfe.localPort) {
// Assuming that Skaffold brokered ports don't overlap, this has to be an external process that started
// since the dev loop kicked off. We are notifying the user in the hope that they can fix it
color.Red.Fprintf(k.out, "failed to port forward %v, port %d is taken, retrying...\n", pfe, pfe.localPort)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestUnavailablePort(t *testing.T) {
// has been called
var portFreeWG sync.WaitGroup
portFreeWG.Add(1)
t.Override(&isPortFree, func(int) bool {
t.Override(&isPortFree, func(string, int) bool {
portFreeWG.Done()
return false
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/pod_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *WatchingPodForwarder) podForwardingEntry(resourceVersion, containerName
}

// retrieve an open port on the host
entry.localPort = retrieveAvailablePort(resource.Port.IntVal, &p.entryManager.forwardedPorts)
entry.localPort = retrieveAvailablePort(resource.Address, resource.Port.IntVal, &p.entryManager.forwardedPorts)

return entry, nil
}
3 changes: 2 additions & 1 deletion pkg/skaffold/kubernetes/portforward/pod_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubernetes"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
"github.com/GoogleContainerTools/skaffold/testutil"
testEvent "github.com/GoogleContainerTools/skaffold/testutil/event"
)
Expand Down Expand Up @@ -403,7 +404,7 @@ func TestAutomaticPortForwardPod(t *testing.T) {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
taken := map[int]struct{}{}
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(taken, test.availablePorts))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, taken, test.availablePorts))
t.Override(&topLevelOwnerKey, func(context.Context, metav1.Object, string) string { return "owner" })

if test.forwarder == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/kubectl"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/latest"
schemautil "github.com/GoogleContainerTools/skaffold/pkg/skaffold/schema/util"
"github.com/GoogleContainerTools/skaffold/pkg/skaffold/util"
)

// SimulateDevCycle is used for testing a port forward + stop + restart in a simulated dev cycle
Expand All @@ -37,7 +38,7 @@ func SimulateDevCycle(t *testing.T, kubectlCLI *kubectl.CLI, namespace string) {
defer func() { portForwardEvent = portForwardEventHandler }()
portForwardEvent = func(entry *portForwardEntry) {}
ctx := context.Background()
localPort := retrieveAvailablePort(9000, &em.forwardedPorts)
localPort := retrieveAvailablePort(util.Loopback, 9000, &em.forwardedPorts)
pfe := newPortForwardEntry(0, latest.PortForwardResource{
Type: "deployment",
Name: "leeroy-web",
Expand All @@ -50,7 +51,7 @@ func SimulateDevCycle(t *testing.T, kubectlCLI *kubectl.CLI, namespace string) {

logrus.Info("waiting for the same port to become available...")
if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (done bool, err error) {
nextPort := retrieveAvailablePort(localPort, &em.forwardedPorts)
nextPort := retrieveAvailablePort(util.Loopback, localPort, &em.forwardedPorts)

logrus.Infof("next port %d", nextPort)

Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/kubernetes/portforward/resource_forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (p *ResourceForwarder) getCurrentEntry(resource latest.PortForwardResource)
if requestPort == 0 && resource.Port.IntVal >= 1024 {
requestPort = resource.Port.IntVal
}
entry.localPort = retrieveAvailablePort(requestPort, &p.entryManager.forwardedPorts)
entry.localPort = retrieveAvailablePort(resource.Address, requestPort, &p.entryManager.forwardedPorts)
return entry
}

Expand Down
12 changes: 6 additions & 6 deletions pkg/skaffold/kubernetes/portforward/resource_forwarder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ func newTestForwarder() *testForwarder {
return &testForwarder{}
}

func mockRetrieveAvailablePort(taken map[int]struct{}, availablePorts []int) func(int, *util.PortSet) int {
func mockRetrieveAvailablePort(_ string, taken map[int]struct{}, availablePorts []int) func(string, int, *util.PortSet) int {
// Return first available port in ports that isn't taken
var lock sync.Mutex
return func(int, *util.PortSet) int {
return func(string, int, *util.PortSet) int {
for _, p := range availablePorts {
lock.Lock()
if _, ok := taken[p]; ok {
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestStart(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, test.availablePorts))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, test.availablePorts))
t.Override(&retrieveServices, func(context.Context, string, []string) ([]*latest.PortForwardResource, error) {
return test.resources, nil
})
Expand Down Expand Up @@ -201,9 +201,9 @@ func TestGetCurrentEntryFunc(t *testing.T) {

for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
t.Override(&retrieveAvailablePort, func(req int, ps *util.PortSet) int {
t.Override(&retrieveAvailablePort, func(addr string, req int, ps *util.PortSet) int {
t.CheckDeepEqual(test.expectedReq, req)
return mockRetrieveAvailablePort(map[int]struct{}{}, test.availablePorts)(req, ps)
return mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, test.availablePorts)(addr, req, ps)
})

entryManager := NewEntryManager(ioutil.Discard, newTestForwarder())
Expand Down Expand Up @@ -267,7 +267,7 @@ func TestUserDefinedResources(t *testing.T) {
for _, test := range tests {
testutil.Run(t, test.description, func(t *testutil.T) {
testEvent.InitializeState([]latest.Pipeline{{}})
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(map[int]struct{}{}, []int{8080, 9000}))
t.Override(&retrieveAvailablePort, mockRetrieveAvailablePort(util.Loopback, map[int]struct{}{}, []int{8080, 9000}))
t.Override(&retrieveServices, func(context.Context, string, []string) ([]*latest.PortForwardResource, error) {
return []*latest.PortForwardResource{svc}, nil
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/skaffold/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func errorHandler(ctx context.Context, _ *runtime.ServeMux, marshaler runtime.Ma

func listenOnAvailablePort(preferredPort int, usedPorts *util.PortSet) (net.Listener, int, error) {
for try := 1; ; try++ {
port := util.GetAvailablePort(preferredPort, usedPorts)
port := util.GetAvailablePort(util.Loopback, preferredPort, usedPorts)

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", util.Loopback, port))
if err != nil {
Expand Down
35 changes: 24 additions & 11 deletions pkg/skaffold/util/port.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// unless we really want to expose something to the network.
const Loopback = "127.0.0.1"

// Network address which represent any address. This is the default that
// we should use when checking if port is free.
const Any = ""

type PortSet struct {
ports map[int]bool
lock sync.Mutex
Expand Down Expand Up @@ -88,36 +92,36 @@ func (f *PortSet) List() []int {
}

// GetAvailablePort returns an available port that is near the requested port when possible.
// First, check if the provided port is available on the specified address. If so, use it.
// First, check if the provided port is available on the specified address and INADDR_ANY. If so, use it.
// If not, check if any of the next 10 subsequent ports are available.
// If not, check if any of ports 4503-4533 are available.
// If not, return a random port, which hopefully won't collide with any future containers
//
// See https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.txt
func GetAvailablePort(port int, usedPorts *PortSet) int {
func GetAvailablePort(address string, port int, usedPorts *PortSet) int {
if port > 0 {
dat-boris marked this conversation as resolved.
Show resolved Hide resolved
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
return port
}

// try the next 10 ports after the provided one
for i := 0; i < 10; i++ {
port++
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
logrus.Debugf("found open port: %d", port)
return port
}
}
}

for port = 4503; port <= 4533; port++ {
if getPortIfAvailable(port, usedPorts) {
if getPortIfAvailable(address, port, usedPorts) {
logrus.Debugf("found open port: %d", port)
return port
}
}

l, err := net.Listen("tcp", ":0")
l, err := net.Listen("tcp", fmt.Sprintf("%s:0", address))
if err != nil {
return -1
}
Expand All @@ -129,20 +133,29 @@ func GetAvailablePort(port int, usedPorts *PortSet) int {
return p
}

func getPortIfAvailable(p int, usedPorts *PortSet) bool {
func getPortIfAvailable(address string, p int, usedPorts *PortSet) bool {
if alreadySet := usedPorts.LoadOrSet(p); alreadySet {
return false
}

return IsPortFree(p)
return IsPortFree(address, p)
}

func IsPortFree(p int) bool {
func IsPortFree(address string, p int) bool {
// Ensure the port is available across all interfaces
l, err := net.Listen("tcp", fmt.Sprintf(":%d", p))
if err != nil {
if err != nil || l == nil {
return false
}

l.Close()

if address != Any {
// Ensure the port is available on the specific interface too
l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", address, p))
if err != nil || l == nil {
return false
}
l.Close()
}
return true
}
2 changes: 1 addition & 1 deletion pkg/skaffold/util/port_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestGetAvailablePort(t *testing.T) {
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
port := GetAvailablePort(4503, &ports)
port := GetAvailablePort(Loopback, 4503, &ports)

l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", Loopback, port))
if err != nil {
Expand Down