Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
catalog: Convert connectedProxies to sync.Map
Browse files Browse the repository at this point in the history
  • Loading branch information
draychev committed Nov 14, 2020
1 parent e533370 commit c8e220f
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 24 deletions.
1 change: 0 additions & 1 deletion pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func NewMeshCatalog(kubeController k8s.Controller, kubeClient kubernetes.Interfa
configurator: cfg,

expectedProxies: make(map[certificate.CommonName]expectedProxy),
connectedProxies: make(map[certificate.CommonName]connectedProxy),
disconnectedProxies: make(map[certificate.CommonName]disconnectedProxy),

// Kubernetes needed to determine what Services a pod that connects to XDS belongs to.
Expand Down
18 changes: 8 additions & 10 deletions pkg/catalog/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import (
func (mc *MeshCatalog) ListExpectedProxies() map[certificate.CommonName]time.Time {
proxies := make(map[certificate.CommonName]time.Time)
mc.expectedProxiesLock.Lock()
mc.connectedProxiesLock.Lock()
mc.disconnectedProxiesLock.Lock()
for cn, props := range mc.expectedProxies {
if _, ok := mc.connectedProxies[cn]; ok {
if _, ok := mc.connectedProxies.Load(cn); ok {
continue
}
if _, ok := mc.disconnectedProxies[cn]; ok {
Expand All @@ -28,24 +27,23 @@ func (mc *MeshCatalog) ListExpectedProxies() map[certificate.CommonName]time.Tim
proxies[cn] = props.certificateIssuedAt
}
mc.disconnectedProxiesLock.Unlock()
mc.connectedProxiesLock.Unlock()
mc.expectedProxiesLock.Unlock()
return proxies
}

// ListConnectedProxies lists the Envoy proxies already connected and the time they first connected.
func (mc *MeshCatalog) ListConnectedProxies() map[certificate.CommonName]*envoy.Proxy {
proxies := make(map[certificate.CommonName]*envoy.Proxy)
mc.connectedProxiesLock.Lock()
mc.disconnectedProxiesLock.Lock()
for cn, props := range mc.connectedProxies {
if _, ok := mc.disconnectedProxies[cn]; ok {
continue
mc.connectedProxies.Range(func(cnIface, propsIface interface{}) bool {
cn := cnIface.(certificate.CommonName)
props := propsIface.(connectedProxy)
if _, isDisconnected := mc.disconnectedProxies[cn]; !isDisconnected {
proxies[cn] = props.proxy
}
proxies[cn] = props.proxy
}
return true
})
mc.disconnectedProxiesLock.Unlock()
mc.connectedProxiesLock.Unlock()
return proxies
}

Expand Down
10 changes: 3 additions & 7 deletions pkg/catalog/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,16 @@ func (mc *MeshCatalog) ExpectProxy(cn certificate.CommonName) {

// RegisterProxy implements MeshCatalog and registers a newly connected proxy.
func (mc *MeshCatalog) RegisterProxy(p *envoy.Proxy) {
mc.connectedProxiesLock.Lock()
mc.connectedProxies[p.CommonName] = connectedProxy{
mc.connectedProxies.Store(p.CommonName, connectedProxy{
proxy: p,
connectedAt: time.Now(),
}
mc.connectedProxiesLock.Unlock()
})
log.Info().Msgf("Registered new proxy: CN=%v, ip=%v", p.GetCommonName(), p.GetIP())
}

// UnregisterProxy unregisters the given proxy from the catalog.
func (mc *MeshCatalog) UnregisterProxy(p *envoy.Proxy) {
mc.connectedProxiesLock.Lock()
delete(mc.connectedProxies, p.CommonName)
mc.connectedProxiesLock.Unlock()
mc.connectedProxies.Delete(p.CommonName)

mc.disconnectedProxiesLock.Lock()
mc.disconnectedProxies[p.CommonName] = disconnectedProxy{
Expand Down
8 changes: 4 additions & 4 deletions pkg/catalog/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (mc *MeshCatalog) getCases() ([]reflect.SelectCase, []string) {
}

func (mc *MeshCatalog) broadcastToAllProxies(message announcements.Announcement) {
mc.connectedProxiesLock.Lock()
for _, connectedEnvoy := range mc.connectedProxies {
mc.connectedProxies.Range(func(_, connectedEnvoyInterface interface{}) bool {
connectedEnvoy := connectedEnvoyInterface.(connectedProxy)
log.Debug().Msgf("[repeater] Broadcast announcement to Envoy with CN %s", connectedEnvoy.proxy.GetCommonName())
select {
// send the message if possible - do not block
case connectedEnvoy.proxy.GetAnnouncementsChannel() <- message:
default:
}
}
mc.connectedProxiesLock.Unlock()
return true
})
}
3 changes: 1 addition & 2 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ type MeshCatalog struct {
expectedProxies map[certificate.CommonName]expectedProxy
expectedProxiesLock sync.Mutex

connectedProxies map[certificate.CommonName]connectedProxy
connectedProxiesLock sync.Mutex
connectedProxies sync.Map

disconnectedProxies map[certificate.CommonName]disconnectedProxy
disconnectedProxiesLock sync.Mutex
Expand Down

0 comments on commit c8e220f

Please sign in to comment.