Skip to content

Commit

Permalink
Merge branch 'dev' into feature/serverless
Browse files Browse the repository at this point in the history
  • Loading branch information
illyaks committed May 28, 2024
2 parents 528fe75 + 7184cf8 commit 56e562f
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 17 deletions.
12 changes: 10 additions & 2 deletions cmd/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@ package main

import (
"minik8s/pkg/kubelet/subscriber"
"os"
)

func main() {
server := subscriber.NewKubeletSubscriber()
server.Run()
if len(os.Args) < 3 {
server := subscriber.NewKubeletSubscriber("")
server.Run()
} else {
if os.Args[1] == "--name" {
server := subscriber.NewKubeletSubscriber(os.Args[2])
server.Run()
}
}
}
17 changes: 14 additions & 3 deletions cmd/kubeproxy/kubeproxy.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
package main

import "minik8s/pkg/kubeproxy"
import (
"minik8s/pkg/kubeproxy"
"os"
)

func main() {
proxy := kubeproxy.NewKubeProxy()
proxy.Run()
if len(os.Args) < 3 {
proxy := kubeproxy.NewKubeProxy("")
proxy.Run()
} else {
if os.Args[1] == "--name" {
proxy := kubeproxy.NewKubeProxy(os.Args[2])
proxy.Run()
}
}

}
2 changes: 1 addition & 1 deletion pkg/api/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type Node struct {
}

type NodeSpec struct {
Hostname string `yaml:"hostname,omitempty" json:"hostname,omitempty"`
ExternalID string `yaml:"externalID,omitempty" json:"externalID,omitempty"`
PodCIDR string `yaml:"podCIDR,omitempty" json:"podCIDR,omitempty"`
PodCIDRs []string `yaml:"podCIDRs,omitempty" json:"podCIDRs,omitempty"`
Expand All @@ -29,7 +30,6 @@ const (
)

type NodeStatus struct {
Hostname string `yaml:"hostname,omitempty" json:"hostname,omitempty"`
Condition NodeCondition `yaml:"condition,omitempty" json:"condition,omitempty"`
PodsNumber int `yaml:"podsNumber,omitempty" json:"podsNumber,omitempty"`
Pods []Pod `yaml:"pods,omitempty" json:"pods,omitempty"`
Expand Down
1 change: 1 addition & 0 deletions pkg/apiserver/handlers/podhandlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func AddPod(context *gin.Context) {

newPod.Status.StartTime = time.Now()
newPod.Metadata.UUID = uuid.NewString()
newPod.Status.Phase = "Pending"

// check if the pod already exists
oldPod, exited := etcdClient.GetPod(newPod.Metadata.NameSpace, newPod.Metadata.Name)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/ipconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ const (
Local = false
localhost = "localhost"
Remotehost = "192.168.3.8" // IP of master
Nodename = "node1"
port = 6443
protocol = "http://"
)

var Nodename = ""

const (
EtcdDefaultPort = 9092
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controllers/dns_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,12 @@ func (s *DNSController) WriteDNS() {
os.WriteFile("/etc/hosts", []byte(str), 0644)
for _, host := range s.RegisteredDNS {
NginxStr := "server {\n\tlisten 80;\n"
hostStr := fmt.Sprintf("\tserver_name %s\n", host.Host)
hostStr := fmt.Sprintf("\tserver_name %s;\n", host.Host)
NginxStr += hostStr
for _, path := range host.Paths {
pathStr := fmt.Sprintf("\tlocation %s {\n", path.Path)
NginxStr += pathStr
proxyStr := fmt.Sprintf("\t\tproxy_pass %s:%s\n", path.ServiceIP, path.ServicePort)
proxyStr := fmt.Sprintf("\t\tproxy_pass %s:%s;\n", path.ServiceIP, path.ServicePort)
NginxStr += proxyStr
NginxStr += "\t}\n"
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/kubelet/host/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ func (m *KubeletHostManager) RemoveHost(host string) {
}

func (m *KubeletHostManager) WriteHost() {
if config.Local {
return
}
str := "127.0.0.1 localhost\n# The following lines are desirable for IPv6 capable hosts\n::1 ip6-localhost ip6-loopback\nfe00::0 ip6-localnet\nff00::0 ip6-mcastprefix\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\nff02::3 ip6-allhosts"
for _, host := range m.Hosts {
hostStr := fmt.Sprintf("%s %s\n", config.Remotehost, host)
Expand Down
13 changes: 13 additions & 0 deletions pkg/kubelet/node/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,16 @@ func init() {
NewNode.Metadata.Name = config.Nodename
NewNode.Status.Pods = make([]api.Pod, 0)
NewNode.Status.PodsNumber = 0
URL := config.GetUrlPrefix() + config.PodsURL
URL = strings.Replace(URL, config.NamespacePlaceholder, "default", -1)
pods := []api.Pod{}
_ = httputil.Get(URL, &pods, "data")
for _, pod := range pods {
if pod.Spec.NodeName == config.Nodename {
pods = append(NewNode.Status.Pods, pod)
NewNode.Status.PodsNumber++
}
}
Heartbeat = &NodeInfo{NewNode}
}

Expand Down Expand Up @@ -60,11 +70,14 @@ func DoHeartBeat() {
Metrics, err := GetPodMetrics(&PodInList)
if err != nil {
log.Error("Get Pod Metrics Error: %s", err.Error())
PodInList.Status.Phase = "Unknown"
Heartbeat.Node.Status.Pods[index] = PodInList
continue
}
PodInList.Status.Metrics = *Metrics
PodInList.Status.CPUPercentage = (Metrics.CpuUsage - Heartbeat.Node.Status.Pods[index].Status.Metrics.CpuUsage) / float64(30*time.Second)
PodInList.Status.MemoryPercentage = Metrics.MemoryUsage / (2 * 1024 * 1024 * 1024) // total: 2G
PodInList.Status.Phase = "Running"
URL := config.GetUrlPrefix() + config.PodURL
URL = strings.Replace(URL, config.NamespacePlaceholder, PodInList.Metadata.NameSpace, -1)
URL = strings.Replace(URL, config.NamePlaceholder, PodInList.Metadata.Name, -1)
Expand Down
10 changes: 9 additions & 1 deletion pkg/kubelet/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"minik8s/pkg/kubelet/node"
"minik8s/pkg/kubelet/pod"
"minik8s/util/log"
"os"
"sync"
)

Expand All @@ -21,7 +22,13 @@ type KubeletSubscriber struct {
done chan bool
}

func NewKubeletSubscriber() *KubeletSubscriber {
func NewKubeletSubscriber(name string) *KubeletSubscriber {
if name == "" {
content, _ := os.ReadFile("/etc/hostname")
config.Nodename = string(content)
} else {
config.Nodename = name
}
group := "kubelet" + "-" + config.Nodename
return &KubeletSubscriber{
ready: make(chan bool),
Expand All @@ -37,6 +44,7 @@ func (k *KubeletSubscriber) Setup(_ sarama.ConsumerGroupSession) error {
}

func (k *KubeletSubscriber) Cleanup(_ sarama.ConsumerGroupSession) error {
k.ready = make(chan bool)
return nil
}

Expand Down
9 changes: 8 additions & 1 deletion pkg/kubeproxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"minik8s/pkg/kafka"
"minik8s/pkg/kubeproxy/ipvs"
"minik8s/util/log"
"os"
"sync"
)

Expand Down Expand Up @@ -96,7 +97,13 @@ func (e *KubeProxy) ConsumeClaim(session sarama.ConsumerGroupSession, claim sara
return nil
}

func NewKubeProxy() *KubeProxy {
func NewKubeProxy(name string) *KubeProxy {
if name == "" {
content, _ := os.ReadFile("/etc/hostname")
config.Nodename = string(content)
} else {
config.Nodename = name
}
group := "kube-proxy" + "-" + config.Nodename
return &KubeProxy{
ready: make(chan bool),
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ func (s *Scheduler) PodHandler(msg []byte) {
if pod.Spec.NodeName != "" {
return
} else {
index := s.count % len(s.nodes)
s.count = s.count + 1
pod.Spec.NodeName = s.nodes[index].Metadata.Name
for {
index := s.count % len(s.nodes)
s.count = s.count + 1
if s.nodes[index].Status.Condition.Status == api.NodeReady {
pod.Spec.NodeName = s.nodes[index].Metadata.Name
break
}
}
}
fmt.Printf("pod %s has assigned to node %s\n", pod.Metadata.Name, pod.Spec.NodeName)

Expand Down
4 changes: 4 additions & 0 deletions yaml/node.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
apiVersion: v1
kind: Node
spec:
hostname: master

0 comments on commit 56e562f

Please sign in to comment.