Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Leader Election support #874

Merged
merged 7 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions helm/minio-operator/templates/cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ rules:
verbs:
- get
- watch
- patch
- create
- list
- delete
Expand Down Expand Up @@ -120,3 +121,11 @@ rules:
- get
- create
- list
- apiGroups:
- "coordination.k8s.io"
resources:
- leases
verbs:
- get
- update
- create
5 changes: 5 additions & 0 deletions helm/minio-operator/templates/operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ spec:
imagePullPolicy: {{ .Values.operator.image.pullPolicy }}
{{- if or .Values.operator.clusterDomain .Values.operator.nsToWatch }}
env:
- name: POD_NAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
{{- if .Values.operator.clusterDomain }}
- name: CLUSTER_DOMAIN
value: {{ .Values.operator.clusterDomain }}
Expand Down
1 change: 1 addition & 0 deletions helm/minio-operator/templates/operator-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ spec:
- port: 4222
name: https
selector:
operator: leader
{{- include "minio-operator.selectorLabels" . | nindent 4 }}
13 changes: 9 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ import (
"syscall"
"time"

"k8s.io/client-go/tools/clientcmd"

"k8s.io/client-go/rest"

miniov2 "github.com/minio/operator/pkg/apis/minio.min.io/v2"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/client-go/tools/clientcmd"

"k8s.io/klog/v2"

clientset "github.com/minio/operator/pkg/client/clientset/versioned"
Expand All @@ -45,7 +47,6 @@ import (
apiextension "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// version provides the version of this operator
Expand Down Expand Up @@ -156,8 +157,12 @@ func main() {
minioInformerFactory = informers.NewSharedInformerFactory(controllerClient, time.Second*30)
promInformerFactory = prominformers.NewSharedInformerFactory(promClient, time.Second*30)
}
podName := os.Getenv("POD_NAME")
dvaldivia marked this conversation as resolved.
Show resolved Hide resolved
if podName == "" {
podName = "operator-pod"
}

mainController := cluster.NewController(kubeClient, controllerClient, promClient,
mainController := cluster.NewController(podName, kubeClient, controllerClient, promClient,
kubeInformerFactory.Apps().V1().StatefulSets(),
kubeInformerFactory.Apps().V1().Deployments(),
kubeInformerFactory.Core().V1().Pods(),
Expand Down
2 changes: 1 addition & 1 deletion manifests/minio-operator.v4.1.2.clusterserviceversion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ metadata:
},
"podManagementPolicy": "Parallel",
"console": {
"image": "minio/console:v0.10.3",
"image": "minio/console:v0.10.4",
"replicas": 1,
"consoleSecret": {
"name": "console-secret"
Expand Down
30 changes: 0 additions & 30 deletions pkg/apis/minio.min.io/v2/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (
// MinIOCRDResourceKind is the Kind of a Cluster.
const MinIOCRDResourceKind = "Tenant"

// OperatorCRDResourceKind is the Kind of a Cluster.
const OperatorCRDResourceKind = "Operator"

// DefaultPodManagementPolicy specifies default pod management policy as expllained here
// https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/#pod-management-policies
const DefaultPodManagementPolicy = appsv1.ParallelPodManagement
Expand Down Expand Up @@ -100,20 +97,8 @@ const DefaultMinIOUpdateURL = "https://dl.min.io/server/minio/release/" + runtim
// MinIOHLSvcNameSuffix specifies the suffix added to Tenant name to create a headless service
const MinIOHLSvcNameSuffix = "-hl"

// DefaultServers specifies the default MinIO replicas to use for distributed deployment if not specified explicitly by user
const DefaultServers = 1

// DefaultVolumesPerServer specifies the default number of volumes per MinIO Tenant
const DefaultVolumesPerServer = 1

// DefaultPoolName specifies the default pool name
const DefaultPoolName = "pool-0"

// Console Related Constants

// DefaultConsoleImage specifies the latest Console Docker hub image
const DefaultConsoleImage = "minio/console:v0.10.3"

// ConsoleTenantLabel is applied to the Console pods of a Tenant cluster
const ConsoleTenantLabel = "v1.min.io/console"

Expand All @@ -138,19 +123,6 @@ const ConsoleName = "-console"
// ConsoleAdminPolicyName denotes the policy name for Console user
const ConsoleAdminPolicyName = "consoleAdmin"

// ConsoleRestartPolicy defines the default restart policy for Console Containers
const ConsoleRestartPolicy = corev1.RestartPolicyAlways

// ConsoleConfigMountPath specifies the path where Console config file and all secrets are mounted
// We keep this to /tmp so it doesn't require any special permissions
const ConsoleConfigMountPath = "/tmp/console"

// DefaultConsoleReplicas specifies the default number of Console pods to be created if not specified
const DefaultConsoleReplicas = 2

// ConsoleCertPath is the path where all Console certs are mounted
const ConsoleCertPath = "/tmp/certs"

// Prometheus related constants

// PrometheusImage specifies the container image for prometheus server
Expand Down Expand Up @@ -316,8 +288,6 @@ const MinIOPrometheusScrapeTimeout = 2 * time.Second

const tenantMinIOImageEnv = "TENANT_MINIO_IMAGE"

const tenantConsoleImageEnv = "TENANT_CONSOLE_IMAGE"

const tenantKesImageEnv = "TENANT_KES_IMAGE"

const monitoringIntervalEnv = "MONITORING_INTERVAL"
Expand Down
8 changes: 0 additions & 8 deletions pkg/apis/minio.min.io/v2/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -876,14 +876,6 @@ func GetTenantMinIOImage() string {
return tenantMinIOImage
}

// GetTenantConsoleImage returns the default Console Image for a tenant
func GetTenantConsoleImage() string {
tenantConsoleImageOnce.Do(func() {
tenantConsoleImage = envGet(tenantConsoleImageEnv, DefaultConsoleImage)
})
return tenantConsoleImage
}

// GetTenantKesImage returns the default KES Image for a tenant
func GetTenantKesImage() string {
tenantKesImageOnce.Do(func() {
Expand Down
131 changes: 118 additions & 13 deletions pkg/controller/cluster/main-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ package cluster

import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"

miniov1 "github.com/minio/operator/pkg/apis/minio.min.io/v1"

"github.com/minio/madmin-go"
Expand Down Expand Up @@ -125,6 +132,8 @@ var ErrLogSearchNotReady = fmt.Errorf("Log Search is not ready")

// Controller struct watches the Kubernetes API for changes to Tenant resources
type Controller struct {
// podName is the identifier of this instance
podName string
// kubeClientSet is a standard kubernetes clientset
kubeClientSet kubernetes.Interface
// minioClientSet is a clientset for our own API group
Expand Down Expand Up @@ -201,7 +210,7 @@ type Controller struct {
}

// NewController returns a new sample controller
func NewController(kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller {
func NewController(podName string, kubeClientSet kubernetes.Interface, minioClientSet clientset.Interface, promClient promclientset.Interface, statefulSetInformer appsinformers.StatefulSetInformer, deploymentInformer appsinformers.DeploymentInformer, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, tenantInformer informers.TenantInformer, serviceInformer coreinformers.ServiceInformer, serviceMonitorInformer prominformers.ServiceMonitorInformer, hostsTemplate, operatorVersion string) *Controller {

// Create event broadcaster
// Add minio-controller types to the default Kubernetes Scheme so Events can be
Expand All @@ -214,6 +223,7 @@ func NewController(kubeClientSet kubernetes.Interface, minioClientSet clientset.
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

controller := &Controller{
podName: podName,
kubeClientSet: kubeClientSet,
minioClientSet: minioClientSet,
promClient: promClient,
Expand Down Expand Up @@ -370,23 +380,112 @@ func (c *Controller) Start(threadiness int, stopCh <-chan struct{}) error {
// Start the informer factories to begin populating the informer caches
klog.Info("Starting Tenant controller")

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.statefulSetListerSynced, c.deploymentListerSynced, c.tenantsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
run := func(ctx context.Context) {
klog.Info("Controller loop...")

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.statefulSetListerSynced, c.deploymentListerSynced, c.tenantsSynced); !ok {
panic("failed to wait for caches to sync")
}

klog.Info("Starting workers")
// Launch two workers to process Tenant resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

// Launch a single worker for Health Check reacting to Pod Changes
go wait.Until(c.runHealthCheckWorker, time.Second, stopCh)

// Launch a goroutine to monitor all Tenants
go c.recurrentTenantStatusMonitor(stopCh)

select {}
}

klog.Info("Starting workers")
// Launch two workers to process Tenant resources
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()

leaseLockName := "minio-operator-lock"
leaseLockNamespace := miniov2.GetNSFromFile()

// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: c.kubeClientSet.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: c.podName,
},
}

// Launcha single worker for Health Check reacting to Pod Changes
go wait.Until(c.runHealthCheckWorker, time.Second, stopCh)
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
run(ctx)
},
OnStoppedLeading: func() {
// we can do cleanup here
klog.Infof("leader lost: %s", c.podName)
os.Exit(0)
},
OnNewLeader: func(identity string) {
// we're notified when new leader elected
if identity == c.podName {
klog.Infof("%s: I've become the leader", c.podName)
// Patch this pod so the main service uses it
p := []patchAnnotation{{
Op: "add",
Path: fmt.Sprintf("/metadata/labels/%s", strings.Replace("operator", "/", "~1", -1)),
Value: "leader",
}}

payloadBytes, err := json.Marshal(p)
if err != nil {
klog.Errorf("failed to marshal patch: %+v", err)
}
_, err = c.kubeClientSet.CoreV1().Pods(leaseLockNamespace).Patch(ctx, c.podName, types.JSONPatchType, payloadBytes, metav1.PatchOptions{})
if err != nil {
klog.Errorf("failed to patch operator leader pod: %+v", err)
}

// Launch a goroutine to monitor all Tenants
go c.recurrentTenantStatusMonitor(stopCh)
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})

return nil
}
Expand Down Expand Up @@ -1459,3 +1558,9 @@ func (c *Controller) checkAndCreatePrometheusServiceMonitor(ctx context.Context,
_, err = c.promClient.MonitoringV1().ServiceMonitors(tenant.Namespace).Create(ctx, prometheusSM, metav1.CreateOptions{})
return err
}

type patchAnnotation struct {
Op string `json:"op"`
Path string `json:"path"`
Value string `json:"value"`
}
12 changes: 6 additions & 6 deletions pkg/controller/cluster/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,12 @@ func (c *Controller) updateHealthStatusForTenant(tenant *miniov2.Tenant) error {
metrics, err := getPrometheusMetricsForTenant(tenant, bearerToken)
if err != nil {
klog.Infof("'%s/%s' Can't generate tenant prometheus token: %v", tenant.Namespace, tenant.Name, err)
}
tenant.Status.Usage.Usage = metrics.Usage
tenant.Status.Usage.Capacity = metrics.UsableCapacity

if tenant, err = c.updatePoolStatus(context.Background(), tenant); err != nil {
klog.Infof("'%s/%s' Can't update tenant status for usage: %v", tenant.Namespace, tenant.Name, err)
} else {
tenant.Status.Usage.Usage = metrics.Usage
tenant.Status.Usage.Capacity = metrics.UsableCapacity
if tenant, err = c.updatePoolStatus(context.Background(), tenant); err != nil {
klog.Infof("'%s/%s' Can't update tenant status for usage: %v", tenant.Namespace, tenant.Name, err)
}
}

return nil
Expand Down
Loading