Skip to content

Commit

Permalink
lazyload: add labels in leading pods
Browse files Browse the repository at this point in the history
  • Loading branch information
MouceL committed Apr 25, 2023
1 parent 0e4a68a commit 6f8a644
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 15 deletions.
44 changes: 39 additions & 5 deletions boot/helm-charts/slimeboot/templates/deployment.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
{{ range .Values.module }}
{{- if .enable }}
{{- if not (eq (default "" .mode) "BundleItem") }}

---
apiVersion: v1
kind: ConfigMap
Expand Down Expand Up @@ -185,7 +184,42 @@ spec:
name: mcp-over-xds
selector:
app: {{.name}}
{{- end }}
{{- end }}
{{- end }}

{{- if or (eq (default "" .name) "lazyload") (eq (default "" .kind) "lazyload") }}
{{- if and .global .global.misc }}
{{- if eq .global.misc.enableLeaderElection "on" }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ .name }}-leader
namespace: {{ $.Values.namespace}}
labels:
app: {{.name}}
spec:
type: {{ $.Values.service.type }}
ports:
- port: {{ $.Values.service.port }}
targetPort: http
protocol: TCP
name: http
- port: {{ $.Values.service.auxiliaryPort }}
targetPort: aux-port
protocol: TCP
name: aux-port
- port: {{ $.Values.service.logSourcePort }}
targetPort: log-source-port
protocol: TCP
name: log-source-port
- port: {{ $.Values.service.mcpOverXdsPort }}
targetPort: 16010
protocol: TCP
name: mcp-over-xds
selector:
app: {{.name}}
slime.io/leader: "true"
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
{{- end }}
4 changes: 2 additions & 2 deletions doc/zh/slime-boot.md
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ spec:

以limiter为例,安装带有两个副本的limiter模块。

我们需要设置`enable-leader-election: "on"`以及`replicaCount: 2` 可开启多副本模式。
我们需要设置`enableLeaderElection: "on"`以及`replicaCount: 2` 可开启多副本模式。

```yaml
apiVersion: config.netease.com/v1alpha1
Expand All @@ -463,7 +463,7 @@ spec:
disableInsertGlobalRateLimit: true
global:
misc:
enable-leader-election: "on"
enableLeaderElection: "on"
```


Expand Down
15 changes: 8 additions & 7 deletions framework/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ var defaultModuleConfig = &bootconfig.Config{
},
},
Misc: map[string]string{
"metrics-addr": ":8080",
"aux-addr": ":8081",
"enable-leader-election": "off",
"globalSidecarMode": "namespace",
"metricSourceType": "prometheus", // can be prometheus or accesslog
"logSourcePort": ":8082",
"metrics-addr": ":8080",
"aux-addr": ":8081",
"enableLeaderElection": "off",
"globalSidecarMode": "namespace",
"metricSourceType": "prometheus", // can be prometheus or accesslog
"logSourcePort": ":8082",
// which label keys of serviceEntry select endpoints
// will take effect when serviceEntry does not have workloadSelector field
"seLabelSelectorKeys": "app",
Expand Down Expand Up @@ -203,7 +203,8 @@ func (f ReadyManagerFunc) AddReadyChecker(name string, checker func() error) {

type Environment struct {
Config *bootconfig.Config
// clientSet, not support crd

// clientSet, not support crd, it can use in anytime anywhere
K8SClient *kubernetes.Clientset
// dynamic client, support any resource
DynamicClient dynamic.Interface
Expand Down
2 changes: 1 addition & 1 deletion framework/model/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func Main(bundle string, modules []Module) {
}

// setup for leaderelection
if mainModConfig.Global.Misc["enable-leader-election"] == "on" {
if mainModConfig.Global.Misc["enableLeaderElection"] == "on" {
// create a resource lock in the same namespace as the workload instance
rl, err := leaderelection.NewKubeResourceLock(conf, os.Getenv("WATCH_NAMESPACE"), bundle)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,11 @@ data:
"endpoint": {
"address": {
"socket_address": {
{{- if (eq $g.misc.enableLeaderElection "on") }}
"address": "{{ .name }}-leader.{{ $g.slimeNamespace }}",
{{- else }}
"address": "{{ .name }}.{{ $g.slimeNamespace }}",
{{- end }}
"port_value": {{ $.Values.service.logSourcePort }}
}
}
Expand Down
5 changes: 5 additions & 0 deletions staging/src/slime.io/slime/modules/lazyload/model/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package model

const (
SlimeLeader = "slime.io/leader"
)
92 changes: 92 additions & 0 deletions staging/src/slime.io/slime/modules/lazyload/module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@ import (
"fmt"
"github.com/golang/protobuf/proto"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"os"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
istioapi "slime.io/slime/framework/apis"
basecontroller "slime.io/slime/framework/controllers"
Expand All @@ -17,6 +21,7 @@ import (
"slime.io/slime/modules/lazyload/controllers"
modmodel "slime.io/slime/modules/lazyload/model"
"slime.io/slime/modules/lazyload/pkg/server"
"time"
)

var log = modmodel.ModuleLog
Expand Down Expand Up @@ -163,10 +168,97 @@ func (m *Module) Setup(opts module.ModuleOptions) error {
log.Warningf("watching metric is not running")
}

podNs := os.Getenv("WATCH_NAMESPACE")
podName := os.Getenv("POD_NAME")

le.AddOnStartedLeading(func(ctx context.Context) {
retry := time.After(2 * time.Second)
go func() {
for {
select {
case <-ctx.Done():
log.Infof("ctx is done, retrun")
return
case <-retry:
retry = nil
}
if err = addPodLabel(ctx, env.K8SClient, podNs, podName); err != nil {
log.Errorf("add leader labels error %s, retry", err)
retry = time.After(2 * time.Second)
} else {
log.Infof("add leader labels succeed")
return
}
}
}()
})

le.AddOnStoppedLeading(func() {
retry := time.After(2 * time.Second)
go func() {
for {
select {
case <-retry:
retry = nil
}
if err = deletePodLabel(context.TODO(), env.K8SClient, podNs, podName); err != nil {
log.Errorf("delete leader labels error %s", err)
retry = time.After(2 * time.Second)
} else {
log.Infof("delete leader labels succeed")
return
}

}
}()
})

le.AddOnStoppedLeading(sfReconciler.Clear)
return nil
}

func svfResetRegister(handler *server.Handler) {
handler.HandleFunc("/debug/svfReset", handler.SvfResetSetting)
}

func addPodLabel(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) error {
po, err := getDeepCopyPod(ctx, client, podNs, podName)
if err != nil {
return err
}

po.Labels[modmodel.SlimeLeader] = "true"
_, err = client.CoreV1().Pods(podNs).Update(ctx, po, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("update pod namespace/name: %s/%s err %s", podNs, podName, err)
}
return nil
}

func deletePodLabel(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) error {
po, err := getDeepCopyPod(ctx, client, podNs, podName)
if err != nil {
return err
}

delete(po.Labels, modmodel.SlimeLeader)
_, err = client.CoreV1().Pods(podNs).Update(ctx, po, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}

func getDeepCopyPod(ctx context.Context, client *kubernetes.Clientset, podNs, podName string) (*corev1.Pod, error) {

pod, err := client.CoreV1().Pods(podNs).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
err = fmt.Errorf("pod %s/%s is not found", podNs, podName)
} else {
err = fmt.Errorf("get pod %s/%s err %s", podNs, podName, err)
}
return nil, err
}
return pod.DeepCopy(), nil
}

0 comments on commit 6f8a644

Please sign in to comment.