Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
Delete Certificate for an Envoy for a Pod that was Terminated (#1956)
Browse files Browse the repository at this point in the history
  • Loading branch information
draychev authored Nov 17, 2020
1 parent 2e81275 commit bf58d62
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 19 deletions.
52 changes: 52 additions & 0 deletions pkg/catalog/announcement_handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package catalog

import (
"errors"

"k8s.io/apimachinery/pkg/types"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/certificate"
)

var errEventNotHandled = errors.New("event not handled")

// releaseCertificate is an Announcement handler, which on receiving a PodDeleted event
// it releases the xDS certificate for the Envoy for that Pod.
func (mc *MeshCatalog) releaseCertificate(ann announcements.Announcement) error {
whatWeGot := ann.Type
whatWeCanHandle := announcements.PodDeleted
if whatWeCanHandle != whatWeGot {
log.Error().Msgf("releaseCertificate function received an announcement with type %s; it can only handle %s", whatWeGot, whatWeCanHandle)
return errEventNotHandled
}

if podUID, ok := ann.ReferencedObjectID.(types.UID); ok {
if podIface, ok := mc.podUIDToCN.Load(podUID); ok {
endpointCN := podIface.(certificate.CommonName)
log.Warn().Msgf("Pod with UID %s found in Mesh Catalog; Releasing certificate %s", podUID, endpointCN)
mc.certManager.ReleaseCertificate(endpointCN)
} else {
log.Warn().Msgf("Pod with UID %s not found in Mesh Catalog", podUID)
}
}

return nil
}

// updateRelatedProxies is an Announcement handler, which augments the handling of PodDeleted events
// and leverages broadcastToAllProxies() to let all proxies know that something has changed.
// TODO: The use of broadcastToAllProxies() needs to be deprecated in favor of more granular approach.
func (mc *MeshCatalog) updateRelatedProxies(ann announcements.Announcement) error {
whatWeGot := ann.Type
whatWeCanHandle := announcements.PodDeleted
if whatWeCanHandle != whatWeGot {
log.Error().Msgf("updateRelatedProxies function received an announcement with type %s; it can only handle %s", whatWeGot, whatWeCanHandle)
return errEventNotHandled
}

// TODO: the function below updates all proxies; understand what proxies need to be updated and update only these
mc.broadcastToAllProxies(ann)

return nil
}
104 changes: 104 additions & 0 deletions pkg/catalog/announcement_handlers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package catalog

import (
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"github.com/google/uuid"
"k8s.io/apimachinery/pkg/types"
testclient "k8s.io/client-go/kubernetes/fake"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/envoy"
)

var _ = Describe("Test Announcement Handlers", func() {
var mc *MeshCatalog
var podUID string
var proxy *envoy.Proxy
var envoyCN certificate.CommonName

BeforeEach(func() {
mc = NewFakeMeshCatalog(testclient.NewSimpleClientset())
podUID = uuid.New().String()

envoyCN = "abcdefg"
_, err := mc.certManager.IssueCertificate(envoyCN, 5*time.Second)
Expect(err).ToNot(HaveOccurred())

proxy = envoy.NewProxy(envoyCN, nil)
proxy.PodMetadata = &envoy.PodMetadata{
UID: podUID,
}

mc.RegisterProxy(proxy)
})

Context("test releaseCertificate()", func() {
It("deletes certificate when Pod is terminated", func() {
// Ensure setup is correct
{
certs, err := mc.certManager.ListCertificates()
Expect(err).ToNot(HaveOccurred())
Expect(len(certs)).To(Equal(1))
}

ann := announcements.Announcement{
Type: announcements.PodDeleted,
ReferencedObjectID: types.UID(podUID),
}
err := mc.releaseCertificate(ann)
Expect(err).ToNot(HaveOccurred())

// Ensure certificate was deleted
{
certs, err := mc.certManager.ListCertificates()
Expect(err).ToNot(HaveOccurred())
Expect(len(certs)).To(Equal(0))
}
})

It("ignores events other than pod-deleted", func() {
ann := announcements.Announcement{
Type: announcements.IngressAdded,
}

var connectedProxies []envoy.Proxy
mc.connectedProxies.Range(func(key interface{}, value interface{}) bool {
connectedProxy := value.(connectedProxy)
connectedProxies = append(connectedProxies, *connectedProxy.proxy)
return true
})

Expect(len(connectedProxies)).To(Equal(1))
Expect(connectedProxies[0]).To(Equal(*proxy))

err := mc.releaseCertificate(ann)
Expect(err).To(HaveOccurred())
})
})

Context("test updateRelatedProxies()", func() {
It("ignores events other than pod-deleted", func() {
ann := announcements.Announcement{
Type: announcements.IngressAdded,
}

var connectedProxies []envoy.Proxy
mc.connectedProxies.Range(func(key interface{}, value interface{}) bool {
connectedProxy := value.(connectedProxy)
connectedProxies = append(connectedProxies, *connectedProxy.proxy)
return true
})

Expect(len(connectedProxies)).To(Equal(1))
Expect(connectedProxies[0]).To(Equal(*proxy))

err := mc.updateRelatedProxies(ann)
Expect(err).To(HaveOccurred())
})
})
})
14 changes: 14 additions & 0 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,20 @@ func NewMeshCatalog(kubeController k8s.Controller, kubeClient kubernetes.Interfa
kubeController: kubeController,
}

// This map holds a list of Announcement handlers per type.
// The handlers will be evaluated sequentially in the order they appear here.
mc.announcementHandlerPerType = map[announcements.AnnouncementType][]func(ann announcements.Announcement) error{

// When a Kubernetes Pod is deleted
announcements.PodDeleted: {
// Delete the xDS Certificate that was issued to the Envoy fronting this pod.
mc.releaseCertificate,

// Remove the Endpoint (Pod) from relevant Envoys.
mc.updateRelatedProxies,
},
}

go mc.repeater()
return &mc
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/catalog/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package catalog
import (
"time"

"k8s.io/apimachinery/pkg/types"

"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/envoy"
)
Expand All @@ -15,12 +17,18 @@ func (mc *MeshCatalog) ExpectProxy(cn certificate.CommonName) {
}

// RegisterProxy implements MeshCatalog and registers a newly connected proxy.
func (mc *MeshCatalog) RegisterProxy(p *envoy.Proxy) {
mc.connectedProxies.Store(p.CommonName, connectedProxy{
proxy: p,
func (mc *MeshCatalog) RegisterProxy(proxy *envoy.Proxy) {
mc.connectedProxies.Store(proxy.CommonName, connectedProxy{
proxy: proxy,
connectedAt: time.Now(),
})
log.Info().Msgf("Registered new proxy: CN=%v, ip=%v", p.GetCommonName(), p.GetIP())

// If this proxy object is on a Kubernetes Pod - it will have an UID
if proxy.HasPodMetadata() {
podUID := types.UID(proxy.PodMetadata.UID)
mc.podUIDToCN.Store(podUID, proxy.GetCommonName())
}
log.Info().Msgf("Registered new proxy: CN=%v, ip=%v", proxy.GetCommonName(), proxy.GetIP())
}

// UnregisterProxy unregisters the given proxy from the catalog.
Expand Down
25 changes: 20 additions & 5 deletions pkg/catalog/repeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"reflect"
"time"

mapset "github.com/deckarep/golang-set"

"github.com/openservicemesh/osm/pkg/announcements"
)

Expand All @@ -12,7 +14,13 @@ const (
updateAtLeastEvery = 1 * time.Minute
)

// repeater rebroadcasts announcements from SMI, Secrets, Endpoints providers etc. to all connected proxies.
// This is a set of announcement types that are explicitly handled
// and there is no need to call the legacy broadcastToAllProxies() for these
var alreadyHandled = mapset.NewSetFromSlice([]interface{}{
announcements.PodDeleted,
})

// repeater is a goroutine, which rebroadcasts announcements from SMI, Secrets, Endpoints providers etc. to all connected proxies.
func (mc *MeshCatalog) repeater() {
lastUpdateAt := time.Now().Add(-1 * updateAtMostEvery)
for {
Expand All @@ -30,7 +38,7 @@ func (mc *MeshCatalog) repeater() {
mc.handleAnnouncement(ann)

delta := time.Since(lastUpdateAt)
if delta >= updateAtMostEvery {
if !alreadyHandled.Contains(ann.Type) && delta >= updateAtMostEvery {
mc.broadcastToAllProxies(ann)
lastUpdateAt = time.Now()
}
Expand All @@ -40,9 +48,16 @@ func (mc *MeshCatalog) repeater() {
}

func (mc *MeshCatalog) handleAnnouncement(ann announcements.Announcement) {
if ann.Type == announcements.PodDeleted {
log.Trace().Msgf("Handling announcement: %+v", ann)
// TODO: implement (https://github.com/openservicemesh/osm/issues/1719)
handlers, ok := mc.announcementHandlerPerType[ann.Type]
if !ok {
log.Error().Msgf("No handler for announcement of type %+v", ann.Type)
return
}

for _, handler := range handlers {
if err := handler(ann); err != nil {
log.Error().Err(err).Msgf("Error handling announcement %s", ann.Type)
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/catalog/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ type MeshCatalog struct {
// calls through kubeClient and instead relies on background cache synchronization and local
// lookups
kubeController k8s.Controller

// Maintain a mapping of pod UID to CN of the Envoy on that pod
podUIDToCN sync.Map

// Functions able to handle announcements flowing through the system; grouped per type of announcement
announcementHandlerPerType map[announcements.AnnouncementType][]func(ann announcements.Announcement) error
}

// MeshCataloger is the mechanism by which the Service Mesh controller discovers all Envoy proxies connected to the catalog.
Expand Down
1 change: 1 addition & 0 deletions pkg/certificate/providers/tresor/certificate_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (cm *CertManager) IssueCertificate(cn certificate.CommonName, validityPerio

// ReleaseCertificate is called when a cert will no longer be needed and should be removed from the system.
func (cm *CertManager) ReleaseCertificate(cn certificate.CommonName) {
log.Trace().Msgf("Releasing certificate %s", cn)
cm.deleteFromCache(cn)
}

Expand Down
16 changes: 12 additions & 4 deletions pkg/envoy/ads/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/openservicemesh/osm/pkg/catalog"
"github.com/openservicemesh/osm/pkg/envoy"
)

func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}) {
func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer, proxy *envoy.Proxy, quit chan struct{}, catalog catalog.MeshCataloger) {
defer close(requests)
defer close(quit)
for {
Expand All @@ -28,24 +29,31 @@ func receive(requests chan xds_discovery.DiscoveryRequest, server *xds_discovery
if request.TypeUrl != "" {
if !proxy.HasPodMetadata() {
// Set the Pod metadata on the given proxy only once. This could arrive with the first few XDS requests.
recordEnvoyPodMetadata(request, proxy)
recordEnvoyPodMetadata(request, proxy, catalog)
}
log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy %s: %+v", proxy.GetCommonName(), request)
nodeID := ""
if request.Node != nil {
nodeID = request.Node.Id
}
log.Trace().Msgf("[grpc] Received DiscoveryRequest from Envoy with CN %s; Node ID: %s", proxy.GetCommonName(), nodeID)
requests <- *request
} else {
log.Warn().Msgf("[grpc] Unknown resource: %+v", request)
}
}
}

func recordEnvoyPodMetadata(request *xds_discovery.DiscoveryRequest, proxy *envoy.Proxy) {
func recordEnvoyPodMetadata(request *xds_discovery.DiscoveryRequest, proxy *envoy.Proxy, catalog catalog.MeshCataloger) {
if request != nil && request.Node != nil {
if podUID, podNamespace, podIP, serviceAccountName, envoyNodeID, err := envoy.ParseEnvoyServiceNodeID(request.Node.Id); err != nil {
log.Error().Err(err).Msgf("Error parsing Envoy Node ID: %s", request.Node.Id)
} else {
log.Trace().Msgf("Recorded metadata for Envoy %s: podUID=%s, podNamespace=%s, podIP=%s, serviceAccountName=%s, envoyNodeID=%s",
proxy.CommonName, podUID, podNamespace, podIP, serviceAccountName, envoyNodeID)
proxy.SetMetadata(podUID, podNamespace, podIP, serviceAccountName, envoyNodeID)

// We call RegisterProxy again on the MeshCatalog to update the index on pod metadata
catalog.RegisterProxy(proxy)
}
}
}
8 changes: 7 additions & 1 deletion pkg/envoy/ads/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ func (s *Server) newAggregatedDiscoveryResponse(proxy *envoy.Proxy, request *xds
s.xdsLog[proxy.GetCommonName()][typeURL] = append(s.xdsLog[proxy.GetCommonName()][typeURL], time.Now())
}

log.Trace().Msgf("Invoking handler for %s with request: %+v", typeURL, request)
// request.Node is only available on the first Discovery Request; will be nil on the following
nodeID := ""
if request.Node != nil {
nodeID = request.Node.Id
}

log.Trace().Msgf("Invoking handler for type %s; request from Envoy with Node ID %s", typeURL, nodeID)
response, err := handler(s.catalog, proxy, request, cfg, s.certManager)
if err != nil {
log.Error().Msgf("Responder for TypeUrl %s is not implemented", request.TypeUrl)
Expand Down
5 changes: 3 additions & 2 deletions pkg/envoy/ads/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
)

// StreamAggregatedResources handles streaming of the clusters to the connected Envoy proxies
// This is evaluated once per new Envoy proxy connecting and remains running for the duration of the gRPC socket.
func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
// When a new Envoy proxy connects, ValidateClient would ensure that it has a valid certificate,
// and the Subject CN is in the allowedCommonNames set.
Expand Down Expand Up @@ -47,7 +48,7 @@ func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscov

// This helper handles receiving messages from the connected Envoys
// and any gRPC error states.
go receive(requests, &server, proxy, quit)
go receive(requests, &server, proxy, quit, s.catalog)

for {
select {
Expand Down Expand Up @@ -145,7 +146,7 @@ func (s *Server) StreamAggregatedResources(server xds_discovery.AggregatedDiscov
}

case <-proxy.GetAnnouncementsChannel():
log.Info().Msgf("Change detected - update all Envoys.")
log.Info().Msgf("Announcement for Envoy proxy %s received: sending all xDS updates", proxy.GetCommonName())
s.sendAllResponses(proxy, &server, s.cfg)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/envoy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Proxy struct {
// This could be nil if the Envoy is not operating in a Kubernetes cluster (VM for example)
// NOTE: This field may be not be set at the time Proxy struct is initialized. This would
// eventually be set when the metadata arrives via the xDS protocol.
podMetadata *PodMetadata
PodMetadata *PodMetadata
}

// PodMetadata is a struct holding information on the Pod on which a given Envoy proxy is installed
Expand All @@ -43,12 +43,12 @@ type PodMetadata struct {

// HasPodMetadata answers the question - has the Pod metadata been recorded for the given Envoy proxy
func (p *Proxy) HasPodMetadata() bool {
return p.podMetadata != nil
return p.PodMetadata != nil
}

// SetMetadata sets the proxy metadata constructured from the given parameters
func (p *Proxy) SetMetadata(podUID, podNamespace, podIP, podServiceAccountName, envoyNodeID string) {
p.podMetadata = &PodMetadata{
p.PodMetadata = &PodMetadata{
UID: podUID,
Namespace: podNamespace,
IP: podIP,
Expand Down

0 comments on commit bf58d62

Please sign in to comment.