From 96f3537490437036f41c2405a5f248ec64ff1b83 Mon Sep 17 00:00:00 2001 From: Chen Yinghao <1216334362@qq.com> Date: Sun, 26 May 2024 15:17:12 +0800 Subject: [PATCH] fix: fix bug in integration test --- cmd/kubelet/kubelet.go | 12 ++++++++++-- cmd/kubeproxy/kubeproxy.go | 17 ++++++++++++++--- pkg/api/node.go | 2 +- pkg/apiserver/handlers/podhandlers.go | 1 + pkg/config/ipconfig.go | 3 ++- pkg/controller/controllers/dns_controller.go | 4 ++-- pkg/kubelet/host/host_manager.go | 3 --- pkg/kubelet/node/heartbeat.go | 13 +++++++++++++ pkg/kubelet/subscriber/subscriber.go | 10 +++++++++- pkg/kubeproxy/proxier.go | 9 ++++++++- pkg/scheduler/scheduler.go | 11 ++++++++--- yaml/node.yaml | 4 ++++ 12 files changed, 72 insertions(+), 17 deletions(-) create mode 100644 yaml/node.yaml diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 0c5a63c..a47f844 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -2,9 +2,17 @@ package main import ( "minik8s/pkg/kubelet/subscriber" + "os" ) func main() { - server := subscriber.NewKubeletSubscriber() - server.Run() + if len(os.Args) < 3 { + server := subscriber.NewKubeletSubscriber("") + server.Run() + } else { + if os.Args[1] == "--name" { + server := subscriber.NewKubeletSubscriber(os.Args[2]) + server.Run() + } + } } diff --git a/cmd/kubeproxy/kubeproxy.go b/cmd/kubeproxy/kubeproxy.go index 05785ba..7eece08 100644 --- a/cmd/kubeproxy/kubeproxy.go +++ b/cmd/kubeproxy/kubeproxy.go @@ -1,8 +1,19 @@ package main -import "minik8s/pkg/kubeproxy" +import ( + "minik8s/pkg/kubeproxy" + "os" +) func main() { - proxy := kubeproxy.NewKubeProxy() - proxy.Run() + if len(os.Args) < 3 { + proxy := kubeproxy.NewKubeProxy("") + proxy.Run() + } else { + if os.Args[1] == "--name" { + proxy := kubeproxy.NewKubeProxy(os.Args[2]) + proxy.Run() + } + } + } diff --git a/pkg/api/node.go b/pkg/api/node.go index fdd54f5..c96bca7 100644 --- a/pkg/api/node.go +++ b/pkg/api/node.go @@ -11,6 +11,7 @@ type Node struct { } type NodeSpec struct { + Hostname string `yaml:"hostname,omitempty" json:"hostname,omitempty"` ExternalID string `yaml:"externalID,omitempty" json:"externalID,omitempty"` PodCIDR string `yaml:"podCIDR,omitempty" json:"podCIDR,omitempty"` PodCIDRs []string `yaml:"podCIDRs,omitempty" json:"podCIDRs,omitempty"` @@ -29,7 +30,6 @@ const ( ) type NodeStatus struct { - Hostname string `yaml:"hostname,omitempty" json:"hostname,omitempty"` Condition NodeCondition `yaml:"condition,omitempty" json:"condition,omitempty"` PodsNumber int `yaml:"podsNumber,omitempty" json:"podsNumber,omitempty"` Pods []Pod `yaml:"pods,omitempty" json:"pods,omitempty"` diff --git a/pkg/apiserver/handlers/podhandlers.go b/pkg/apiserver/handlers/podhandlers.go index 36517d3..15193f2 100644 --- a/pkg/apiserver/handlers/podhandlers.go +++ b/pkg/apiserver/handlers/podhandlers.go @@ -44,6 +44,7 @@ func AddPod(context *gin.Context) { newPod.Status.StartTime = time.Now() newPod.Metadata.UUID = uuid.NewString() + newPod.Status.Phase = "Pending" // check if the pod already exists oldPod, exited := etcdClient.GetPod(newPod.Metadata.NameSpace, newPod.Metadata.Name) diff --git a/pkg/config/ipconfig.go b/pkg/config/ipconfig.go index 941a1ed..333e53d 100644 --- a/pkg/config/ipconfig.go +++ b/pkg/config/ipconfig.go @@ -6,11 +6,12 @@ const ( Local = false localhost = "localhost" Remotehost = "192.168.3.8" // IP of master - Nodename = "node1" port = 6443 protocol = "http://" ) +var Nodename = "" + const ( EtcdDefaultPort = 9092 ) diff --git a/pkg/controller/controllers/dns_controller.go b/pkg/controller/controllers/dns_controller.go index f6d5962..8ed9652 100644 --- a/pkg/controller/controllers/dns_controller.go +++ b/pkg/controller/controllers/dns_controller.go @@ -152,12 +152,12 @@ func (s *DNSController) WriteDNS() { os.WriteFile("/etc/hosts", []byte(str), 0644) for _, host := range s.RegisteredDNS { NginxStr := "server {\n\tlisten 80;\n" - hostStr := fmt.Sprintf("\tserver_name %s\n", host.Host) + hostStr := fmt.Sprintf("\tserver_name %s;\n", host.Host) NginxStr += hostStr for _, path := range host.Paths { pathStr := fmt.Sprintf("\tlocation %s {\n", path.Path) NginxStr += pathStr - proxyStr := fmt.Sprintf("\t\tproxy_pass %s:%s\n", path.ServiceIP, path.ServicePort) + proxyStr := fmt.Sprintf("\t\tproxy_pass %s:%s;\n", path.ServiceIP, path.ServicePort) NginxStr += proxyStr NginxStr += "\t}\n" } diff --git a/pkg/kubelet/host/host_manager.go b/pkg/kubelet/host/host_manager.go index feab5b3..c58afea 100644 --- a/pkg/kubelet/host/host_manager.go +++ b/pkg/kubelet/host/host_manager.go @@ -40,9 +40,6 @@ func (m *KubeletHostManager) RemoveHost(host string) { } func (m *KubeletHostManager) WriteHost() { - if config.Local { - return - } str := "127.0.0.1 localhost\n# The following lines are desirable for IPv6 capable hosts\n::1 ip6-localhost ip6-loopback\nfe00::0 ip6-localnet\nff00::0 ip6-mcastprefix\nff02::1 ip6-allnodes\nff02::2 ip6-allrouters\nff02::3 ip6-allhosts" for _, host := range m.Hosts { hostStr := fmt.Sprintf("%s %s\n", config.Remotehost, host) diff --git a/pkg/kubelet/node/heartbeat.go b/pkg/kubelet/node/heartbeat.go index 77cfe2a..564ea5f 100644 --- a/pkg/kubelet/node/heartbeat.go +++ b/pkg/kubelet/node/heartbeat.go @@ -24,6 +24,16 @@ func init() { NewNode.Metadata.Name = config.Nodename NewNode.Status.Pods = make([]api.Pod, 0) NewNode.Status.PodsNumber = 0 + URL := config.GetUrlPrefix() + config.PodsURL + URL = strings.Replace(URL, config.NamespacePlaceholder, "default", -1) + pods := []api.Pod{} + _ = httputil.Get(URL, &pods, "data") + for _, pod := range pods { + if pod.Spec.NodeName == config.Nodename { + pods = append(NewNode.Status.Pods, pod) + NewNode.Status.PodsNumber++ + } + } Heartbeat = &NodeInfo{NewNode} } @@ -60,11 +70,14 @@ func DoHeartBeat() { Metrics, err := GetPodMetrics(&PodInList) if err != nil { log.Error("Get Pod Metrics Error: %s", err.Error()) + PodInList.Status.Phase = "Unknown" + Heartbeat.Node.Status.Pods[index] = PodInList continue } PodInList.Status.Metrics = *Metrics PodInList.Status.CPUPercentage = (Metrics.CpuUsage - Heartbeat.Node.Status.Pods[index].Status.Metrics.CpuUsage) / float64(30*time.Second) PodInList.Status.MemoryPercentage = Metrics.MemoryUsage / (2 * 1024 * 1024 * 1024) // total: 2G + PodInList.Status.Phase = "Running" URL := config.GetUrlPrefix() + config.PodURL URL = strings.Replace(URL, config.NamespacePlaceholder, PodInList.Metadata.NameSpace, -1) URL = strings.Replace(URL, config.NamePlaceholder, PodInList.Metadata.Name, -1) diff --git a/pkg/kubelet/subscriber/subscriber.go b/pkg/kubelet/subscriber/subscriber.go index aba3dc6..bbda1cb 100644 --- a/pkg/kubelet/subscriber/subscriber.go +++ b/pkg/kubelet/subscriber/subscriber.go @@ -11,6 +11,7 @@ import ( "minik8s/pkg/kubelet/node" "minik8s/pkg/kubelet/pod" "minik8s/util/log" + "os" "sync" ) @@ -21,7 +22,13 @@ type KubeletSubscriber struct { done chan bool } -func NewKubeletSubscriber() *KubeletSubscriber { +func NewKubeletSubscriber(name string) *KubeletSubscriber { + if name == "" { + content, _ := os.ReadFile("/etc/hostname") + config.Nodename = string(content) + } else { + config.Nodename = name + } group := "kubelet" + "-" + config.Nodename return &KubeletSubscriber{ ready: make(chan bool), @@ -37,6 +44,7 @@ func (k *KubeletSubscriber) Setup(_ sarama.ConsumerGroupSession) error { } func (k *KubeletSubscriber) Cleanup(_ sarama.ConsumerGroupSession) error { + k.ready = make(chan bool) return nil } diff --git a/pkg/kubeproxy/proxier.go b/pkg/kubeproxy/proxier.go index 1ef807b..c3d2269 100644 --- a/pkg/kubeproxy/proxier.go +++ b/pkg/kubeproxy/proxier.go @@ -10,6 +10,7 @@ import ( "minik8s/pkg/kafka" "minik8s/pkg/kubeproxy/ipvs" "minik8s/util/log" + "os" "sync" ) @@ -96,7 +97,13 @@ func (e *KubeProxy) ConsumeClaim(session sarama.ConsumerGroupSession, claim sara return nil } -func NewKubeProxy() *KubeProxy { +func NewKubeProxy(name string) *KubeProxy { + if name == "" { + content, _ := os.ReadFile("/etc/hostname") + config.Nodename = string(content) + } else { + config.Nodename = name + } group := "kube-proxy" + "-" + config.Nodename return &KubeProxy{ ready: make(chan bool), diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 8248b28..b2c3238 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -69,9 +69,14 @@ func (s *Scheduler) PodHandler(msg []byte) { if pod.Spec.NodeName != "" { return } else { - index := s.count % len(s.nodes) - s.count = s.count + 1 - pod.Spec.NodeName = s.nodes[index].Metadata.Name + for { + index := s.count % len(s.nodes) + s.count = s.count + 1 + if s.nodes[index].Status.Condition.Status == api.NodeReady { + pod.Spec.NodeName = s.nodes[index].Metadata.Name + break + } + } } fmt.Printf("pod %s has assigned to node %s\n", pod.Metadata.Name, pod.Spec.NodeName) diff --git a/yaml/node.yaml b/yaml/node.yaml new file mode 100644 index 0000000..c4b4ec4 --- /dev/null +++ b/yaml/node.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Node +spec: + hostname: master \ No newline at end of file