Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
endpoint/kube: Cleanup and move remaining caches to kubecontroller (#…
Browse files Browse the repository at this point in the history
…2191)

* endpoint/kube: Move caches to kubecontroller layer

This commit removes a repeated cache (pods) and moves the endpoint cache
to kubecontroller in an attempt to unify code and the interfaces
that handles kubernetes objects.

This commit also removes the mocks on provider tests that were trying
to dynamically simulate kubecontroller logic (which is highly error-prone),
in favour of using the real kubecontroller object instead. Specific information
for each category has been added on their respective ginkgo `Defines`.

- Pod cache from provider has been removed. Kubecontroller held one
all along already.
- Endpoint cache has been moved from provider/kube to Kubecontroller.
- API to get endpoints for a service was added (GetEndpoints).
- Removed the Announcement channel for provider's interface and implementation,
  subscription/announcement capabilities are still available through pubsub.

Tests:
- Moved the tests that were using dynamic mocks to use a real kubecontroller object
instead.
- Added some more synchronization as there were still some races lurking around.
  • Loading branch information
eduser25 authored Dec 11, 2020
1 parent ea43cfd commit 9907bcf
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 319 deletions.
6 changes: 0 additions & 6 deletions pkg/catalog/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ func (mc *MeshCatalog) getAnnouncementChannels() []announcementChannel {
{"Services", mc.kubeController.GetAnnouncementsChannel(k8s.Services)},
}

// There could be many Endpoint Providers - iterate over all of them!
for _, ep := range mc.endpointsProviders {
annCh := announcementChannel{ep.GetID(), ep.GetAnnouncementsChannel()}
announcementChannels = append(announcementChannels, annCh)
}

if updateAtLeastEvery > 0 {
go func() {
ticker := time.NewTicker(updateAtLeastEvery)
Expand Down
8 changes: 6 additions & 2 deletions pkg/catalog/catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ var _ = Describe("Test catalog functions", func() {
It("provides the SMI Spec component via Mesh Catalog", func() {
chans := mc.getAnnouncementChannels()

// Currently returns len(Channels), see getAnnouncementChannels implementation for details.
expectedNumberOfChannels := 6
// Current channels are MeshSpec, CertManager, IngressMonitor, Ticker, Services
expectedNumberOfChannels := 5
Expect(len(chans)).To(Equal(expectedNumberOfChannels))
for _, aChannel := range chans {
Expect(len(aChannel.announcer)).ToNot(BeZero())
Expect(aChannel.channel).ToNot(BeNil())
}
})
})
})
141 changes: 20 additions & 121 deletions pkg/endpoint/providers/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,13 @@ package kube

import (
"net"
"reflect"
"strings"

mapset "github.com/deckarep/golang-set"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

a "github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/endpoint"
k8s "github.com/openservicemesh/osm/pkg/kubernetes"
Expand All @@ -27,50 +22,12 @@ const (

// NewProvider implements mesh.EndpointsProvider, which creates a new Kubernetes cluster/compute provider.
func NewProvider(kubeClient kubernetes.Interface, kubeController k8s.Controller, stop chan struct{}, providerIdent string, cfg configurator.Configurator) (endpoint.Provider, error) {
informerFactory := informers.NewSharedInformerFactory(kubeClient, k8s.DefaultKubeEventResyncInterval)

informerCollection := InformerCollection{
Endpoints: informerFactory.Core().V1().Endpoints().Informer(),
Pods: informerFactory.Core().V1().Pods().Informer(),
}

cacheCollection := CacheCollection{
Endpoints: informerCollection.Endpoints.GetStore(),
Pods: informerCollection.Pods.GetStore(),
}

client := Client{
providerIdent: providerIdent,
kubeClient: kubeClient,
informers: &informerCollection,
caches: &cacheCollection,
cacheSynced: make(chan interface{}),
announcements: make(chan a.Announcement),
kubeController: kubeController,
}

shouldObserve := func(obj interface{}) bool {
ns := reflect.ValueOf(obj).Elem().FieldByName("ObjectMeta").FieldByName("Namespace").String()
return kubeController.IsMonitoredNamespace(ns)
}
eptEventTypes := k8s.EventTypes{
Add: a.EndpointAdded,
Update: a.EndpointUpdated,
Delete: a.EndpointDeleted,
}
informerCollection.Endpoints.AddEventHandler(k8s.GetKubernetesEventHandlers("Endpoints", "Kubernetes", client.announcements, shouldObserve, nil, eptEventTypes))

podEventTypes := k8s.EventTypes{
Add: a.PodAdded,
Update: a.PodUpdated,
Delete: a.PodDeleted,
}
informerCollection.Pods.AddEventHandler(k8s.GetKubernetesEventHandlers("Pods", "Kubernetes", client.announcements, shouldObserve, getPodUID, podEventTypes))

if err := client.run(stop); err != nil {
return nil, errors.Errorf("Failed to start Kubernetes EndpointProvider client: %+v", err)
}

return &client, nil
}

Expand All @@ -84,37 +41,31 @@ func (c *Client) GetID() string {
func (c Client) ListEndpointsForService(svc service.MeshService) []endpoint.Endpoint {
log.Trace().Msgf("[%s] Getting Endpoints for service %s on Kubernetes", c.providerIdent, svc)
var endpoints []endpoint.Endpoint
endpointsInterface, exist, err := c.caches.Endpoints.GetByKey(svc.String())
if err != nil {

kubernetesEndpoints, err := c.kubeController.GetEndpoints(svc)
if err != nil || kubernetesEndpoints == nil {
log.Error().Err(err).Msgf("[%s] Error fetching Kubernetes Endpoints from cache", c.providerIdent)
return endpoints
}

if !exist {
log.Error().Msgf("[%s] Error fetching Kubernetes Endpoints from cache: MeshService %s does not exist", c.providerIdent, svc)
if !c.kubeController.IsMonitoredNamespace(kubernetesEndpoints.Namespace) {
// Doesn't belong to namespaces we are observing
return endpoints
}

kubernetesEndpoints := endpointsInterface.(*corev1.Endpoints)
if kubernetesEndpoints != nil {
if !c.kubeController.IsMonitoredNamespace(kubernetesEndpoints.Namespace) {
// Doesn't belong to namespaces we are observing
return endpoints
}
for _, kubernetesEndpoint := range kubernetesEndpoints.Subsets {
for _, address := range kubernetesEndpoint.Addresses {
for _, port := range kubernetesEndpoint.Ports {
ip := net.ParseIP(address.IP)
if ip == nil {
log.Error().Msgf("[%s] Error parsing IP address %s", c.providerIdent, address.IP)
break
}
ept := endpoint.Endpoint{
IP: ip,
Port: endpoint.Port(port.Port),
}
endpoints = append(endpoints, ept)
for _, kubernetesEndpoint := range kubernetesEndpoints.Subsets {
for _, address := range kubernetesEndpoint.Addresses {
for _, port := range kubernetesEndpoint.Ports {
ip := net.ParseIP(address.IP)
if ip == nil {
log.Error().Msgf("[%s] Error parsing IP address %s", c.providerIdent, address.IP)
break
}
ept := endpoint.Endpoint{
IP: ip,
Port: endpoint.Port(port.Port),
}
endpoints = append(endpoints, ept)
}
}
}
Expand All @@ -125,12 +76,7 @@ func (c Client) ListEndpointsForService(svc service.MeshService) []endpoint.Endp
func (c Client) GetServicesForServiceAccount(svcAccount service.K8sServiceAccount) ([]service.MeshService, error) {
services := mapset.NewSet()

for _, podInterface := range c.caches.Pods.List() {
pod := podInterface.(*corev1.Pod)
if pod == nil {
continue
}

for _, pod := range c.kubeController.ListPods() {
if pod.Namespace != svcAccount.Namespace {
continue
}
Expand Down Expand Up @@ -177,18 +123,12 @@ func (c Client) GetServicesForServiceAccount(svcAccount service.K8sServiceAccoun
func (c Client) GetPortToProtocolMappingForService(svc service.MeshService) (map[uint32]string, error) {
portToProtocolMap := make(map[uint32]string)

endpointsInterface, exist, err := c.caches.Endpoints.GetByKey(svc.String())
if err != nil {
endpoints, err := c.kubeController.GetEndpoints(svc)
if err != nil || endpoints == nil {
log.Error().Err(err).Msgf("[%s] Error fetching Kubernetes Endpoints from cache", c.providerIdent)
return nil, err
}

if !exist {
log.Error().Msgf("[%s] Error fetching Kubernetes Endpoints from cache: MeshService %s does not exist", c.providerIdent, svc)
return nil, errServiceNotFound
}

endpoints := endpointsInterface.(*corev1.Endpoints)
if !c.kubeController.IsMonitoredNamespace(endpoints.Namespace) {
return nil, errors.Errorf("Error fetching endpoints for service %s, namespace %s is not monitored", svc, endpoints.Namespace)
}
Expand All @@ -212,47 +152,6 @@ func (c Client) GetPortToProtocolMappingForService(svc service.MeshService) (map
return portToProtocolMap, nil
}

// GetAnnouncementsChannel returns the announcement channel for the Kubernetes endpoints provider.
func (c Client) GetAnnouncementsChannel() <-chan a.Announcement {
return c.announcements
}

func (c *Client) run(stop <-chan struct{}) error {
var hasSynced []cache.InformerSynced

if c.informers == nil {
return errInitInformers
}

sharedInformers := map[string]cache.SharedInformer{
"Endpoints": c.informers.Endpoints,
"Pods": c.informers.Pods,
}

var names []string
for name, informer := range sharedInformers {
// Depending on the use-case, some Informers from the collection may not have been initialized.
if informer == nil {
continue
}
names = append(names, name)
log.Info().Msgf("[%s] Starting informer %s", c.providerIdent, name)
go informer.Run(stop)
hasSynced = append(hasSynced, informer.HasSynced)
}

log.Info().Msgf("[%s] Waiting for informer's cache to sync: %+v", c.providerIdent, strings.Join(names, ", "))
if !cache.WaitForCacheSync(stop, hasSynced...) {
return errSyncingCaches
}

// Closing the cacheSynced channel signals to the rest of the system that... caches have been synced.
close(c.cacheSynced)

log.Info().Msgf("[%s] Cache sync finished for %+v", c.providerIdent, names)
return nil
}

// getServicesByLabels gets Kubernetes services whose selectors match the given labels
func (c *Client) getServicesByLabels(podLabels map[string]string, namespace string) ([]corev1.Service, error) {
var finalList []corev1.Service
Expand Down
Loading

0 comments on commit 9907bcf

Please sign in to comment.