Skip to content

Commit

Permalink
Merge pull request #34 from CloudOS-Group3/feature/kubeproxy
Browse files Browse the repository at this point in the history
Feature/kubeproxy
  • Loading branch information
Doris-xm authored May 23, 2024
2 parents f96e77d + 108a96e commit 206d7e2
Show file tree
Hide file tree
Showing 8 changed files with 181 additions and 144 deletions.
20 changes: 18 additions & 2 deletions pkg/apiserver/handlers/labelIndex_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,32 @@ import (
"minik8s/pkg/api"
"minik8s/pkg/config"
"minik8s/pkg/util"
"minik8s/util/log"
"net/http"
)

func GetLabelIndex(context *gin.Context) {
label := context.Param(config.LabelParam)
URL := config.LabelIndexPath + label
labelIndex := etcdClient.PrefixGet(URL)
res := etcdClient.GetEtcdPair(URL)
log.Info("Get URL %s", URL)

labelIndex := &api.LabelIndex{}
if len(res) != 0 {
err := json.Unmarshal([]byte(res), labelIndex)
if err != nil {
log.Error("Error unmarshalling labelIndex json %v", err)
return
}
}
byteArr, err := json.Marshal(labelIndex)
if err != nil {
log.Error("Error marshal labelIndex: %s", err.Error())
return
}

context.JSON(http.StatusOK, gin.H{
"data": labelIndex,
"data": string(byteArr),
})
}
func AddLabelIndex(context *gin.Context) {
Expand Down
1 change: 1 addition & 0 deletions pkg/config/urlconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
NamespacePlaceholder = ":namespace"
NameParam = "name"
NamespaceParam = "namespace"
LabelPlaceholder = ":label"
LabelParam = "label"
)

Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func NewControllerManager() *ControllerManager {
}

func (CM *ControllerManager) Run(stop chan bool) {
//go CM.DeploymentController.Run()
//go CM.EndpointController.Run()
//go CM.HPACcntroller.Run()
//go CM.NodeController.Run()
go CM.DeploymentController.Run()
go CM.EndpointController.Run()
go CM.HPACcntroller.Run()
go CM.NodeController.Run()
go CM.DNSController.Run()

_, ok := <-stop
Expand Down
159 changes: 117 additions & 42 deletions pkg/controller/controllers/endpoint_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ type EndPointController struct {
done chan bool
}

func (e EndPointController) Setup(session sarama.ConsumerGroupSession) error {
func (e *EndPointController) Setup(session sarama.ConsumerGroupSession) error {
close(e.ready)
return nil
}

func (e EndPointController) Cleanup(session sarama.ConsumerGroupSession) error {
func (e *EndPointController) Cleanup(session sarama.ConsumerGroupSession) error {
return nil
}

func (e EndPointController) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
func (e *EndPointController) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Info("Watch msg: %s\n", string(msg.Value))
if msg.Topic == msg_type.PodTopic {
session.MarkMessage(msg, "")
podMsg := &msg_type.PodMsg{}
Expand All @@ -46,19 +45,23 @@ func (e EndPointController) ConsumeClaim(session sarama.ConsumerGroupSession, cl
case msg_type.Update:
// discard pod without pod ip
if podMsg.NewPod.Status.PodIP == "" {
return nil
break
}
if !util.IsLabelEqual(podMsg.NewPod.Spec.NodeSelector, podMsg.OldPod.Spec.NodeSelector) {
OnPodUpdate(&podMsg.NewPod, podMsg.OldPod.Spec.NodeSelector)
// 1. the pod first created by kubelet and the pod ip is assigned
// 2. the pod's label is changed
if podMsg.OldPod.Status.PodIP == "" {
OnPodAdd(&podMsg.NewPod)
} else if !util.IsLabelEqual(podMsg.NewPod.Metadata.Labels, podMsg.OldPod.Metadata.Labels) {
OnPodUpdate(&podMsg.NewPod, podMsg.OldPod.Metadata.Labels)
}
break
case msg_type.Delete:
OnPodDelete(&podMsg.NewPod)
OnPodDelete(&podMsg.OldPod)
break
case msg_type.Add:
// discard pod without pod ip
if podMsg.NewPod.Status.PodIP == "" {
return nil
break
}
OnPodUpdate(&podMsg.NewPod, nil)
break
Expand All @@ -78,7 +81,7 @@ func (e EndPointController) ConsumeClaim(session sarama.ConsumerGroupSession, cl
}
break
case msg_type.Delete:
OnServiceDelete(&serviceMsg.NewService)
OnServiceDelete(&serviceMsg.OldService)
break
case msg_type.Add:
OnServiceUpdate(&serviceMsg.NewService, nil)
Expand All @@ -98,20 +101,72 @@ func NewEndPointController() *EndPointController {
subscriber: kafka.NewSubscriber(brokers, group),
}
}
func OnPodAdd(pod *api.Pod) {
log.Info("OnPodAdd")
labelIndex, _ := GetLabelIndex(pod.Metadata.Labels)
log.Info("GetLabelIndex: %v", labelIndex)
// Step 1: Deal with new label
if labelIndex == nil || len(labelIndex.Labels) == 0 {
// a new label
// create a new label index
log.Info("create a new label index, %v", pod.Metadata.Labels)
labelIndex = &api.LabelIndex{
Labels: pod.Metadata.Labels,
PodName: []string{util.GetUniqueName(pod.Metadata.NameSpace, pod.Metadata.Name)},
}

// no service need to be updated, since the label is new
} else {
// update the label index
labelIndex.PodName = append(labelIndex.PodName, util.GetUniqueName(pod.Metadata.NameSpace, pod.Metadata.Name))

// need to update service
for _, serviceName := range labelIndex.ServiceName {
svc_namespace, name := util.GetNamespaceAndName(serviceName)
svc, _ := GetService(svc_namespace, name)
if svc != nil {
// update service
// add the new endpoint to the service
// var all_ports []api.ContainerPort
for _, container := range pod.Spec.Containers {
//all_ports = append(all_ports, container.Ports...)
svc.Status.EndPoints = append(svc.Status.EndPoints,
api.EndPoint{
IP: pod.Status.PodIP,
Ports: matchTargetPort(svc, container.Ports),
})
}
log.Info("update service: %v", svc)
err := UpdateService(svc)
if err != nil {
return
}
}
}
}
// Step 2: store the new label index
err := UpdateLabelIndex(labelIndex)
if err != nil {
log.Fatal("add label index error")
return
}
}

func OnPodUpdate(pod *api.Pod, oldLabel map[string]string) {
if util.IsLabelEqual(pod.Spec.NodeSelector, oldLabel) {
log.Info("OnPodUpdate")
if util.IsLabelEqual(pod.Metadata.Labels, oldLabel) {
// no need to update
return
}
labelIndex, _ := GetLabelIndex(pod.Spec.NodeSelector)

labelIndex, _ := GetLabelIndex(pod.Metadata.Labels)
log.Info("GetLabelIndex: %v", labelIndex)
// Step 1: Deal with new label
if labelIndex == nil {
if labelIndex == nil || len(labelIndex.Labels) == 0 {
// a new label
// create a new label index
log.Info("create a new label index, %v", pod.Metadata.Labels)
labelIndex = &api.LabelIndex{
Labels: pod.Spec.NodeSelector,
Labels: pod.Metadata.Labels,
PodName: []string{util.GetUniqueName(pod.Metadata.NameSpace, pod.Metadata.Name)},
}

Expand All @@ -127,15 +182,16 @@ func OnPodUpdate(pod *api.Pod, oldLabel map[string]string) {
if svc != nil {
// update service
// add the new endpoint to the service
var all_ports []api.ContainerPort
// var all_ports []api.ContainerPort
for _, container := range pod.Spec.Containers {
all_ports = append(all_ports, container.Ports...)
//all_ports = append(all_ports, container.Ports...)
svc.Status.EndPoints = append(svc.Status.EndPoints,
api.EndPoint{
IP: pod.Status.PodIP,
Ports: matchTargetPort(svc, container.Ports),
})
}
svc.Status.EndPoints = append(svc.Status.EndPoints,
api.EndPoint{
IP: pod.Status.PodIP,
Ports: all_ports,
})
log.Info("update service: %v", svc)
err := UpdateService(svc)
if err != nil {
return
Expand All @@ -155,7 +211,7 @@ func OnPodUpdate(pod *api.Pod, oldLabel map[string]string) {
return
}
oldLabelIndex, _ := GetLabelIndex(oldLabel)
if oldLabelIndex == nil {
if oldLabelIndex == nil || len(oldLabelIndex.Labels) == 0 {
// Can't be here
return
}
Expand Down Expand Up @@ -188,7 +244,7 @@ func OnPodUpdate(pod *api.Pod, oldLabel map[string]string) {
// store the old label index
// check if to delete the label index
if len(labelIndex.PodName) == 0 && len(labelIndex.ServiceName) == 0 {
err := DeleteLabelIndex(pod.Spec.NodeSelector)
err := DeleteLabelIndex(pod.Metadata.Labels)
if err != nil {
log.Fatal("delete label index error")
}
Expand All @@ -201,9 +257,11 @@ func OnPodUpdate(pod *api.Pod, oldLabel map[string]string) {
}

func OnPodDelete(pod *api.Pod) {
labelIndex, _ := GetLabelIndex(pod.Spec.NodeSelector)
if labelIndex == nil {
log.Info("OnPodDelete")
labelIndex, _ := GetLabelIndex(pod.Metadata.Labels)
if labelIndex == nil || len(labelIndex.Labels) == 0 {
// Can't be here
log.Error("Can't find labelIndex. %v", labelIndex)
return
}
// remove the pod name from the label index
Expand Down Expand Up @@ -235,7 +293,7 @@ func OnPodDelete(pod *api.Pod) {
// store the label index
// check if to delete the label index
if len(labelIndex.PodName) == 0 && len(labelIndex.ServiceName) == 0 {
err := DeleteLabelIndex(pod.Spec.NodeSelector)
err := DeleteLabelIndex(pod.Metadata.Labels)
if err != nil {
log.Fatal("delete label index error")
}
Expand All @@ -255,26 +313,31 @@ func OnServiceUpdate(svc *api.Service, oldLabel map[string]string) {

// Step 1: Deal with new label
labelIndex, _ := GetLabelIndex(svc.Metadata.Labels)
if labelIndex == nil {
// Can't be here
return
if labelIndex == nil || len(labelIndex.Labels) == 0 {
// a new label
// create a new label index
labelIndex = &api.LabelIndex{
Labels: svc.Metadata.Labels,
ServiceName: []string{util.GetUniqueName(svc.Metadata.NameSpace, svc.Metadata.Name)},
}
} else {
// update the label index
labelIndex.ServiceName = append(labelIndex.ServiceName, util.GetUniqueName(svc.Metadata.NameSpace, svc.Metadata.Name))
// update service's endpoint
}
// update the label index
labelIndex.ServiceName = append(labelIndex.ServiceName, util.GetUniqueName(svc.Metadata.NameSpace, svc.Metadata.Name))
// update service's endpoint
for _, podName := range labelIndex.PodName {
namespace, name := util.GetNamespaceAndName(podName)
pod, _ := GetPod(namespace, name)
if pod != nil {
var all_ports []api.ContainerPort
//var all_ports []api.ContainerPort
for _, container := range pod.Spec.Containers {
all_ports = append(all_ports, container.Ports...)
//all_ports = append(all_ports, container.Ports...)
svc.Status.EndPoints = append(svc.Status.EndPoints,
api.EndPoint{
IP: pod.Status.PodIP,
Ports: matchTargetPort(svc, container.Ports),
})
}
svc.Status.EndPoints = append(svc.Status.EndPoints,
api.EndPoint{
IP: pod.Status.PodIP,
Ports: all_ports,
})
}
}
// store service
Expand All @@ -293,7 +356,7 @@ func OnServiceUpdate(svc *api.Service, oldLabel map[string]string) {
return
}
oldLabelIndex, _ := GetLabelIndex(oldLabel)
if oldLabelIndex == nil {
if oldLabelIndex == nil || len(oldLabelIndex.Labels) == 0 {
// Can't be here
return
}
Expand Down Expand Up @@ -322,7 +385,7 @@ func OnServiceUpdate(svc *api.Service, oldLabel map[string]string) {

func OnServiceDelete(svc *api.Service) {
labelIndex, _ := GetLabelIndex(svc.Metadata.Labels)
if labelIndex == nil {
if labelIndex == nil || len(labelIndex.Labels) == 0 {
// Can't be here
return
}
Expand Down Expand Up @@ -481,3 +544,15 @@ func DeleteService(namespace string, name string) error {

return nil
}

func matchTargetPort(svc *api.Service, ports []api.ContainerPort) []api.ContainerPort {
targetPorts := []api.ContainerPort{}
for _, target := range svc.Spec.Ports {
for _, container := range ports {
if uint16(target.TargetPort) == uint16(container.ContainerPort) {
targetPorts = append(targetPorts, container)
}
}
}
return targetPorts
}
Loading

0 comments on commit 206d7e2

Please sign in to comment.