Skip to content

Commit

Permalink
temporal
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed Nov 1, 2021
1 parent b7056d9 commit b8afa3f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
52 changes: 40 additions & 12 deletions components/supervisor/pkg/ports/ports.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http/httputil"
"net/url"
"reflect"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -142,7 +143,7 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
pm.mu.Unlock()

for _, s := range subs {
s.Close()
_ = s.Close()
}
}()
defer cancel()
Expand Down Expand Up @@ -230,16 +231,16 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve
pm.mu.Lock()
defer pm.mu.Unlock()

if exposed != nil && !reflect.DeepEqual(pm.exposed, exposed) {
if exposed != nil {
pm.exposed = exposed
}

if tunneled != nil && !reflect.DeepEqual(pm.tunneled, tunneled) {
if tunneled != nil {
pm.tunneled = tunneled
}

if served != nil {
var servedKeys []uint32 // to preserve insertion order
var servedKeys []uint32
servedMap := make(map[uint32]ServedPort)
for _, port := range served {
current, exists := servedMap[port.Port]
Expand All @@ -250,10 +251,16 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve
servedMap[port.Port] = port
}
}

sort.Slice(servedKeys, func(i, j int) bool {
return servedKeys[i] < servedKeys[j]
})

var newServed []ServedPort
for _, key := range servedKeys {
newServed = append(newServed, servedMap[key])
}

if !reflect.DeepEqual(pm.served, newServed) {
pm.served = newServed
pm.updateProxies()
Expand Down Expand Up @@ -281,7 +288,7 @@ func (pm *Manager) updateState(ctx context.Context, exposed []ExposedPort, serve
case sub.updates <- status:
case <-time.After(5 * time.Second):
log.Error("ports subscription droped out")
sub.Close()
_ = sub.Close()
}
}
}
Expand Down Expand Up @@ -385,7 +392,22 @@ func (pm *Manager) nextState(ctx context.Context) map[uint32]*managedPort {

mp.AutoExposure = pm.autoExpose(ctx, mp.LocalhostPort, public).state
}
return state

var ports []uint32
for port := range state {
ports = append(ports, port)
}

sort.Slice(ports, func(i, j int) bool {
return ports[i] < ports[j]
})

newState := make(map[uint32]*managedPort)
for _, mp := range ports {
newState[mp] = state[mp]
}

return newState
}

// clients should guard a call with check whether such port is already exposed or auto exposed
Expand Down Expand Up @@ -483,27 +505,33 @@ func (pm *Manager) updateSlirp() {
func (pm *Manager) updateProxies() {
opened := make(map[uint32]struct{}, len(pm.served))
for _, p := range pm.served {
if !p.BoundToLocalhost {
continue
}

opened[p.Port] = struct{}{}
}

for localPort, proxy := range pm.proxies {
globalPort := proxy.proxyPort
_, openedLocal := opened[localPort]
_, openedGlobal := opened[globalPort]
_, opened := opened[localPort]

if !openedLocal && openedGlobal {
if !opened {
delete(pm.proxies, localPort)

err := proxy.Close()
if err != nil {
log.WithError(err).WithField("globalPort", globalPort).WithField("localPort", localPort).Warn("cannot stop localhost proxy")
log.WithError(err).WithField("localPort", localPort).Warn("cannot stop localhost proxy")
} else {
log.WithField("globalPort", globalPort).WithField("localPort", localPort).Info("localhost proxy has been stopped")
log.WithField("localPort", localPort).Info("localhost proxy has been stopped")
}
}
}

for _, served := range pm.served {
if served.BoundToLocalhost {
continue
}

localPort := served.Port
_, exists := pm.proxies[localPort]
if exists || !served.BoundToLocalhost {
Expand Down
2 changes: 2 additions & 0 deletions components/supervisor/pkg/ports/ports_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
)

func TestPortsUpdateState(t *testing.T) {
// t.Skip("for now")

type ExposureExpectation []ExposedPort
type UpdateExpectation [][]*api.PortsStatus
type ConfigChange struct {
Expand Down

0 comments on commit b8afa3f

Please sign in to comment.