Skip to content

Commit

Permalink
lazyload: add labels in leading pods
Browse files Browse the repository at this point in the history
  • Loading branch information
MouceL authored and YonkaFang committed May 12, 2023
1 parent ce35656 commit 67380da
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 16 deletions.
44 changes: 39 additions & 5 deletions boot/helm-charts/slimeboot/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{{ range .Values.module }}
{{- if .enable }}
{{- if not (eq (default "" .mode) "BundleItem") }}

---
apiVersion: v1
kind: ConfigMap
Expand Down Expand Up @@ -185,7 +184,42 @@ spec:
name: mcp-over-xds
selector:
app: {{.name}}
{{- end }}
{{- end }}
{{- end }}

{{- if or (eq (default "" .name) "lazyload") (eq (default "" .kind) "lazyload") }}
{{- if and .global .global.misc }}
{{- if eq .global.misc.enableLeaderElection "on" }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ .name }}-leader
namespace: {{ $.Values.namespace}}
labels:
app: {{.name}}
spec:
type: {{ $.Values.service.type }}
ports:
- port: {{ $.Values.service.port }}
targetPort: http
protocol: TCP
name: http
- port: {{ $.Values.service.auxiliaryPort }}
targetPort: aux-port
protocol: TCP
name: aux-port
- port: {{ $.Values.service.logSourcePort }}
targetPort: log-source-port
protocol: TCP
name: log-source-port
- port: {{ $.Values.service.mcpOverXdsPort }}
targetPort: 16010
protocol: TCP
name: mcp-over-xds
selector:
app: {{.name}}
slime.io/leader: "true"
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
4 changes: 2 additions & 2 deletions doc/zh/slime-boot.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ spec:

以limiter为例,安装带有两个副本的limiter模块。

我们需要设置`enable-leader-election: "on"`以及`replicaCount: 2` 可开启多副本模式。
我们需要设置`enableLeaderElection: "on"`以及`replicaCount: 2` 可开启多副本模式。

```yaml
apiVersion: config.netease.com/v1alpha1
Expand All @@ -463,7 +463,7 @@ spec:
disableInsertGlobalRateLimit: true
global:
misc:
enable-leader-election: "on"
enableLeaderElection: "on"
```


Expand Down
15 changes: 8 additions & 7 deletions framework/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ var defaultModuleConfig = &bootconfig.Config{
},
},
Misc: map[string]string{
"metrics-addr": ":8080",
"aux-addr": ":8081",
"enable-leader-election": "off",
"globalSidecarMode": "namespace",
"metricSourceType": "prometheus", // can be prometheus or accesslog
"logSourcePort": ":8082",
"metrics-addr": ":8080",
"aux-addr": ":8081",
"enableLeaderElection": "off",
"globalSidecarMode": "namespace",
"metricSourceType": "prometheus", // can be prometheus or accesslog
"logSourcePort": ":8082",
// which label keys of serviceEntry select endpoints
// will take effect when serviceEntry does not have workloadSelector field
"seLabelSelectorKeys": "app",
Expand Down Expand Up @@ -203,7 +203,8 @@ func (f ReadyManagerFunc) AddReadyChecker(name string, checker func() error) {

type Environment struct {
Config *bootconfig.Config
// clientSet, not support crd

// clientSet, not support crd, it can use in anytime anywhere
K8SClient *kubernetes.Clientset
// dynamic client, support any resource
DynamicClient dynamic.Interface
Expand Down
3 changes: 1 addition & 2 deletions framework/model/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func Main(bundle string, modules []Module) {
}

// setup for leaderelection
if mainModConfig.Global.Misc["enable-leader-election"] == "on" {
if mainModConfig.Global.Misc["enableLeaderElection"] == "on" {
// create a resource lock in the same namespace as the workload instance
rl, err := leaderelection.NewKubeResourceLock(conf, os.Getenv("WATCH_NAMESPACE"), bundle)
if err != nil {
Expand Down Expand Up @@ -487,7 +487,6 @@ func Main(bundle string, modules []Module) {
log.Errorf("problem running, %+v", err)
}
}()

wg.Wait()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,11 @@ data:
"endpoint": {
"address": {
"socket_address": {
{{- if (eq $g.misc.enableLeaderElection "on") }}
"address": "{{ .name }}-leader.{{ $g.slimeNamespace }}",
{{- else }}
"address": "{{ .name }}.{{ $g.slimeNamespace }}",
{{- end }}
"port_value": {{ $.Values.service.logSourcePort }}
}
}
Expand Down
5 changes: 5 additions & 0 deletions staging/src/slime.io/slime/modules/lazyload/model/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package model

const (
SlimeLeader = "slime.io/leader"
)
116 changes: 116 additions & 0 deletions staging/src/slime.io/slime/modules/lazyload/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"fmt"
"github.com/golang/protobuf/proto"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"os"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
istioapi "slime.io/slime/framework/apis"
basecontroller "slime.io/slime/framework/controllers"
Expand All @@ -17,6 +21,7 @@ import (
"slime.io/slime/modules/lazyload/controllers"
modmodel "slime.io/slime/modules/lazyload/model"
"slime.io/slime/modules/lazyload/pkg/server"
"time"
)

var log = modmodel.ModuleLog
Expand Down Expand Up @@ -72,9 +77,16 @@ func (m *Module) Setup(opts module.ModuleOptions) error {
sfReconciler.RegisterSeHandler()
}

podNs := os.Getenv("WATCH_NAMESPACE")
podName := os.Getenv("POD_NAME")

opts.InitCbs.AddStartup(func(ctx context.Context) {
sfReconciler.StartSvcCache(ctx)
sfReconciler.StartIpToSvcCache(ctx)
if env.Config.Global != nil && env.Config.Global.Misc["enableLeaderElection"] == "on" {
log.Infof("delete leader labels before working")
deleteLeaderLabelUntilSucceed(env.K8SClient, podNs, podName)
}
})

// build metric source
Expand Down Expand Up @@ -163,10 +175,114 @@ func (m *Module) Setup(opts module.ModuleOptions) error {
log.Warningf("watching metric is not running")
}

if env.Config.Global != nil && env.Config.Global.Misc["enableLeaderElection"] == "on" {

log.Infof("add/delete leader label in StartedLeading/stoppedLeading")

le.AddOnStartedLeading(func(ctx context.Context) {
first := make(chan struct{}, 1)
first <- struct{}{}
var retry <-chan time.Time

go func() {
for {
select {
case <-ctx.Done():
log.Infof("ctx is done, retrun")
return
case <-first:
case <-retry:
retry = nil
}
if err = addPodLabel(ctx, env.K8SClient, podNs, podName); err != nil {
log.Errorf("add leader labels error %s, retry", err)
retry = time.After(1 * time.Second)
} else {
log.Infof("add leader labels succeed")
return
}
}
}()
})

le.AddOnStoppedLeading(func() {
go deleteLeaderLabelUntilSucceed(env.K8SClient, podNs, podName)
})
}

le.AddOnStoppedLeading(sfReconciler.Clear)
return nil
}

func svfResetRegister(handler *server.Handler) {
handler.HandleFunc("/debug/svfReset", handler.SvfResetSetting)
}

func deleteLeaderLabelUntilSucceed(client *kubernetes.Clientset, podNs, podName string) {
first := make(chan struct{}, 1)
first <- struct{}{}
var retry <-chan time.Time
for {
select {
case <-first:
case <-retry:
retry = nil
}

if err := deletePodLabel(context.TODO(), client, podNs, podName); err != nil {
log.Errorf("delete leader labels error %s", err)
retry = time.After(1 * time.Second)
} else {
log.Infof("delete leader labels succeed")
return
}
}
}

func addPodLabel(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) error {
po, err := getPod(ctx, client, podNs, podName)
if err != nil {
return err
}

po.Labels[modmodel.SlimeLeader] = "true"
_, err = client.CoreV1().Pods(podNs).Update(ctx, po, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update pod namespace/name: %s/%s err %s", podNs, podName, err)
}
return nil
}

func deletePodLabel(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) error {

po, err := getPod(ctx, client, podNs, podName)
if err != nil {
return err
}
// if slime.io/leader not exist, skip
if _, ok := po.Labels[modmodel.SlimeLeader]; !ok {
log.Infof("label slime.io/leader is not found, skip")
return nil
}

delete(po.Labels, modmodel.SlimeLeader)
_, err = client.CoreV1().Pods(podNs).Update(ctx, po, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

func getPod(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) (*corev1.Pod, error) {

pod, err := client.CoreV1().Pods(podNs).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
err = fmt.Errorf("pod %s/%s is not found", podNs, podName)
} else {
err = fmt.Errorf("get pod %s/%s err %s", podNs, podName, err)
}
return nil, err
}
return pod, nil
}

0 comments on commit 67380da

Please sign in to comment.