Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Tenant PodInformer for Tenant Status and remove recurrent job #2150

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/pod-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func getDeletePodResponse(session *models.Principal, params operator_api.DeleteP
return ErrorWithContext(ctx, err)
}
listOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("v1.min.io/tenant=%s", params.Tenant),
LabelSelector: fmt.Sprintf("%s=%s", miniov2.TenantLabel, params.Tenant),
FieldSelector: fmt.Sprintf("metadata.name=%s%s", params.Tenant, params.PodName[len(params.Tenant):]),
}
if err = clientset.CoreV1().Pods(params.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOpts); err != nil {
Expand Down Expand Up @@ -531,7 +531,7 @@ func generateTenantLogReport(ctx context.Context, coreInterface v1.CoreV1Interfa
return []byte{}, ErrorWithContext(ctx, errors.New("Namespace and Tenant name cannot be empty"))
}
podListOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("v1.min.io/tenant=%s", tenantName),
LabelSelector: fmt.Sprintf("%s=%s", miniov2.TenantLabel, tenantName),
}
pods, err := coreInterface.Pods(namespace).List(ctx, podListOpts)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions api/volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func getPVCsResponse(session *models.Principal, params operator_api.ListPVCsPara
Status: status,
StorageClass: *pvc.Spec.StorageClassName,
Volume: pvc.Spec.VolumeName,
Tenant: pvc.Labels["v1.min.io/tenant"],
Tenant: pvc.Labels[miniov2.TenantLabel],
}
ListPVCs = append(ListPVCs, &pvcResponse)
}
Expand All @@ -142,7 +142,7 @@ func getPVCsForTenantResponse(session *models.Principal, params operator_api.Lis

// Filter Tenant PVCs. They keep their v1 tenant annotation
listOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("v1.min.io/tenant=%s", params.Tenant),
LabelSelector: fmt.Sprintf("%s=%s", miniov2.TenantLabel, params.Tenant),
}

// List all PVCs
Expand All @@ -167,7 +167,7 @@ func getPVCsForTenantResponse(session *models.Principal, params operator_api.Lis
Status: status,
StorageClass: *pvc.Spec.StorageClassName,
Volume: pvc.Spec.VolumeName,
Tenant: pvc.Labels["v1.min.io/tenant"],
Tenant: pvc.Labels[miniov2.TenantLabel],
}
ListPVCs = append(ListPVCs, &pvcResponse)
}
Expand All @@ -188,7 +188,7 @@ func getDeletePVCResponse(session *models.Principal, params operator_api.DeleteP
return ErrorWithContext(ctx, err)
}
listOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("v1.min.io/tenant=%s", params.Tenant),
LabelSelector: fmt.Sprintf("%s=%s", miniov2.TenantLabel, params.Tenant),
FieldSelector: fmt.Sprintf("metadata.name=%s", params.PVCName),
}
if err = clientset.CoreV1().PersistentVolumeClaims(params.Namespace).DeleteCollection(ctx, metav1.DeleteOptions{}, listOpts); err != nil {
Expand Down Expand Up @@ -237,7 +237,7 @@ func getTenantCSResponse(session *models.Principal, params operator_api.ListTena
}

// Get CSRs by Label "v1.min.io/tenant=" + params.Tenant
listByTenantLabel := metav1.ListOptions{LabelSelector: "v1.min.io/tenant=" + params.Tenant}
listByTenantLabel := metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", miniov2.TenantLabel, params.Tenant)}
listResult, listError := clientset.CertificatesV1().CertificateSigningRequests().List(ctx, listByTenantLabel)
if listError != nil {
return nil, ErrorWithContext(ctx, listError)
Expand Down
5 changes: 3 additions & 2 deletions operator-integration/tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/go-openapi/loads"
"github.com/minio/operator/api/operations"
"github.com/minio/operator/models"
miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -350,7 +351,7 @@ func TestCreateTenant(t *testing.T) {
pools := make([]map[string]interface{}, 1)
matchExpressions := make([]map[string]interface{}, 2)
matchExpressions[0] = map[string]interface{}{
"key": "v1.min.io/tenant",
"key": miniov2.TenantLabel,
"operator": "In",
"values": values,
}
Expand Down Expand Up @@ -489,7 +490,7 @@ func TestDeleteTenant(t *testing.T) {
pools := make([]map[string]interface{}, 1)
matchExpressions := make([]map[string]interface{}, 2)
matchExpressions[0] = map[string]interface{}{
"key": "v1.min.io/tenant",
"key": miniov2.TenantLabel,
"operator": "In",
"values": values,
}
Expand Down
47 changes: 32 additions & 15 deletions pkg/controller/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ func NewController(
) *Controller {
statefulSetInformer := kubeInformerFactory.Apps().V1().StatefulSets()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
podInformer := kubeInformerFactory.Core().V1().Pods()
serviceInformer := kubeInformerFactory.Core().V1().Services()
jobInformer := kubeInformerFactory.Batch().V1().Jobs()
secretInformer := kubeInformerFactoryInOperatorNamespace.Core().V1().Secrets()
Expand All @@ -251,6 +250,22 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

// Create PodInformer for Tenant Pods
labelSelector := metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: miniov2.TenantLabel,
Operator: metav1.LabelSelectorOpExists,
},
},
}
labelSelectorString, err := utils.LabelSelectorToString(labelSelector)
if err != nil {
klog.Errorf("bad label: %s for podInformer", labelSelectorString)
}

podInformer := utils.NewPodInformer(kubeClientSet, labelSelectorString)

controller := &Controller{
podName: podName,
namespacesToWatch: namespacesToWatch,
Expand All @@ -260,7 +275,7 @@ func NewController(
promClient: promClient,
statefulSetLister: statefulSetInformer.Lister(),
statefulSetListerSynced: statefulSetInformer.Informer().HasSynced,
podInformer: podInformer.Informer(),
podInformer: podInformer,
deploymentLister: deploymentInformer.Lister(),
deploymentListerSynced: deploymentInformer.Informer().HasSynced,
tenantsSynced: tenantInformer.Informer().HasSynced,
Expand Down Expand Up @@ -345,14 +360,13 @@ func NewController(
DeleteFunc: controller.handleObject,
})

podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handlePodChange,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*corev1.Pod)
oldDepl := old.(*corev1.Pod)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
// Periodic resync will send update events for all known Deployments.
// Two different versions of the same Pods will always have different RVs.
newPod := new.(*corev1.Pod)
oldPod := old.(*corev1.Pod)
// Ignore Pod changes if same ResourceVersion
if newPod.ResourceVersion == oldPod.ResourceVersion {
return
}
controller.handlePodChange(new)
Expand All @@ -379,8 +393,13 @@ func NewController(
return controller
}

// StartPodInformer runs PodInformer
func (c *Controller) StartPodInformer(stopCh <-chan struct{}) {
c.podInformer.Run(stopCh)
}

// startUpgradeServer Starts the Upgrade tenant API server and notifies the start and stop via notificationChannel returned
func (c *Controller) startUpgradeServer(ctx context.Context) <-chan error {
func (c *Controller) startUpgradeServer() <-chan error {
notificationChannel := make(chan error)
go func() {
defer close(notificationChannel)
Expand Down Expand Up @@ -429,7 +448,7 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
var upgradeServerChannel <-chan error

klog.Info("Waiting for Upgrade Server to start")
upgradeServerChannel = c.startUpgradeServer(ctx)
upgradeServerChannel = c.startUpgradeServer()

// Start the informer factories to begin populating the informer caches
klog.Info("Starting Tenant controller")
Expand All @@ -448,7 +467,6 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha

klog.Info("Starting workers and Job workers")
JobController := c.controllers[0]
// fmt.Println(controller.SyncHandler())
// Launch two workers to process Job resources
for i := 0; i < threadiness; i++ {
go wait.Until(JobController.runJobWorker, time.Second, stopCh)
Expand All @@ -458,8 +476,7 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
// Launch a single worker for Health Check reacting to Pod Changes
go wait.Until(c.runHealthCheckWorker, time.Second, stopCh)

// Launch a goroutine to monitor all Tenants
go c.recurrentTenantStatusMonitor(stopCh)
go c.StartPodInformer(stopCh)

// 1) we need to make sure we have console TLS certificates (if enabled)
if isOperatorConsoleTLS() {
Expand Down Expand Up @@ -502,7 +519,7 @@ func leaderRun(ctx context.Context, c *Controller, threadiness int, stopCh <-cha
case err := <-upgradeServerChannel:
if err != http.ErrServerClosed {
klog.Errorf("Upgrade Server stopped: %v, going to restart", err)
upgradeServerChannel = c.startUpgradeServer(ctx)
upgradeServerChannel = c.startUpgradeServer()
}
// webserver was instructed to stop, do not attempt to restart
continue
Expand Down Expand Up @@ -740,7 +757,7 @@ func (c *Controller) syncHandler(key string) (Result, error) {
Namespace: namespace,
}
client.MatchingLabels{
"v1.min.io/tenant": tenantName,
miniov2.TenantLabel: tenantName,
}.ApplyToList(&listOpt)
err := c.k8sClient.List(ctx, &pvcList, &listOpt)
if err != nil {
Expand Down
41 changes: 0 additions & 41 deletions pkg/controller/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package controller
import (
"context"
"fmt"
"log"
"time"

"github.com/minio/madmin-go/v3"
Expand All @@ -40,46 +39,6 @@ const (
HealthReduceAvailabilityMessage = "Reduced Availability"
)

// recurrentTenantStatusMonitor loop that checks every 3 minutes for tenants health
func (c *Controller) recurrentTenantStatusMonitor(stopCh <-chan struct{}) {
// do an initial check, then start the periodic check
if err := c.tenantsHealthMonitor(); err != nil {
log.Println(err)
}
// How often will this function run
interval := miniov2.GetMonitoringInterval()
ticker := time.NewTicker(time.Duration(interval) * time.Minute)
defer func() {
log.Println("recurrent pod status monitor closed")
}()
for {
select {
case <-ticker.C:
if err := c.tenantsHealthMonitor(); err != nil {
klog.Infof("%v", err)
}
case <-stopCh:
ticker.Stop()
return
}
}
}

func (c *Controller) tenantsHealthMonitor() error {
// list all tenants and get their cluster health
tenants, err := c.minioClientSet.MinioV2().Tenants("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}
for _, tenant := range tenants.Items {
if err = c.updateHealthStatusForTenant(&tenant); err != nil {
klog.Errorf("%v", err)
return err
}
}
return nil
}

func (c *Controller) updateHealthStatusForTenant(tenant *miniov2.Tenant) error {
// don't get the tenant cluster health if it doesn't have at least 1 pool initialized
oneInitialized := false
Expand Down
46 changes: 14 additions & 32 deletions pkg/controller/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,28 @@
package controller

import (
"context"
"fmt"

miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"github.com/minio/operator/pkg/utils"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
)

// handlePodChange will handle changes in pods and see if they are a MinIO pod, if so, queue it for processing
// handlePodChange will handle changes in pods and queue it for processing, pods are already filtered by PodInformer
func (c *Controller) handlePodChange(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
klog.V(4).Infof("Recovered deleted object '%s' from tombstone", object.GetName())
// NOTE: currently only Tenant pods are being monitored by the Pod Informer
object, err := utils.CastObjectToMetaV1(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
// if the pod is a MinIO pod, we do process it
if tenantName, ok := object.GetLabels()[miniov2.TenantLabel]; ok {
// check this is still a valid tenant
_, err := c.minioClientSet.MinioV2().Tenants(object.GetNamespace()).Get(context.Background(), tenantName, metav1.GetOptions{})
if err != nil {
klog.V(4).Infof("ignoring orphaned object '%s' of tenant '%s'", object.GetSelfLink(), tenantName)
return
}
key := fmt.Sprintf("%s/%s", object.GetNamespace(), tenantName)
// check if already in queue before re-queueing

c.healthCheckQueue.Add(key)
instanceName, ok := object.GetLabels()[miniov2.TenantLabel]
if !ok {
utilruntime.HandleError(fmt.Errorf("label: %s not found in %s", miniov2.TenantLabel, object.GetName()))
return
}

key := fmt.Sprintf("%s/%s", object.GetNamespace(), instanceName)
c.healthCheckQueue.Add(key)
}
Loading
Loading