forked from oliwave/snowflake-id
-
Notifications
You must be signed in to change notification settings - Fork 0
/
operation.go
147 lines (122 loc) · 3 KB
/
operation.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package main
import (
"encoding/json"
"fmt"
"log"
"strconv"
"sync"
coreV1 "k8s.io/api/core/v1"
)
var (
mu sync.Mutex
)
func HandlePod(ad *admission) ([]byte, error) {
// 1. Get pod annotation
op := ad.review.Request.Operation
pod := &coreV1.Pod{}
var raw []byte
if op == "CREATE" {
raw = ad.review.Request.Object.Raw
} else {
raw = ad.review.Request.OldObject.Raw
}
if err := json.Unmarshal(raw, pod); err != nil {
fmt.Errorf("could not unmarshal pod on admission request: %v", err)
}
if enabledSF := isSnowflakeApp(pod); !enabledSF {
return []byte{}, nil
}
if op == "CREATE" {
return addEnvToPod(ad, pod)
} else if op == "DELETE" {
go removePod(ad, pod)
return []byte{}, nil
} else { // Not in the case
return nil, nil
}
}
func isSnowflakeApp(p *coreV1.Pod) bool {
for key, value := range p.Annotations {
enabled, _ := strconv.ParseBool(value)
if key == "snowflake-id.io/enabled" && enabled {
return true
}
}
return false
}
func addEnvToPod(ad *admission, pod *coreV1.Pod) ([]byte, error) {
// 2. Verify ENV
envs := pod.Spec.Containers[0].Env
var dataCenterIDIsSet bool
var workerIDIsSet bool
// Avoid duplicate ENV fields
for _, env := range envs {
if env.Name == "SNOWFLAKE_DATA_CENTER_ID" {
dataCenterIDIsSet = true
} else if env.Name == "SNOWFLAKE_WORKER_ID" {
workerIDIsSet = true
}
}
if dataCenterIDIsSet && workerIDIsSet { // 2.c
return []byte{}, nil
}
if !dataCenterIDIsSet && !workerIDIsSet { // 2.a
// 3. Get the `replicaSet` of the pod
// ---WARNING---
// Mutex only works if there is only one copy of controller itself.
//
// TODO - The Architecture should be refactored to ditributed lock.
// ---WARNING---
mu.Lock()
s := AppScheduler{
pod: pod,
}
sf, err := s.schedulePod(ad.review.Request.Name)
log.Printf("scheduled pod is %+v", *sf)
if err != nil {
return nil, err
}
mu.Unlock()
patches := ad.createPatch(envs, sf)
patchesBytes, err := json.Marshal(patches)
if err != nil {
fmt.Errorf("could not marshal JSON patch: %v", err)
}
return patchesBytes, nil
}
// 2.b
return nil, fmt.Errorf("SNOWFLAKE_DATA_CENTER_ID and SNOWFLAKE_WORKER_ID should be set as a pair")
}
func removePod(ad *admission, pod *coreV1.Pod) {
rsName := pod.OwnerReferences[0].Name
envs := pod.Spec.Containers[0].Env
var workerID int
for _, env := range envs {
if env.Name == "SNOWFLAKE_WORKER_ID" {
workerID, _ = strconv.Atoi(env.Value)
break
}
}
var deletedPod bool
mu.Lock()
pa := populateApp(rsName)
for i, n := range pa.Nodes {
if pod.Spec.NodeName == n.Name {
for j, p := range n.Pods {
if workerID == p.ID {
deletedPod = true
l := len(n.Pods)
n.Pods[j] = n.Pods[l-1] // Copy last element to index i.
pa.Nodes[i].Pods = n.Pods[:l-1] // Truncate slice.
// pa.Nodes[i].Pods = append(n.Pods[:j], n.Pods[j+1:]...)
log.Println("Deleted pod is {Node:", n.Name, n.ID, ", Pod:", n.Pods[j].ID, "}")
}
}
}
if deletedPod {
break
}
}
pa.saveApp()
mu.Unlock()
}