Skip to content

Commit

Permalink
Merge branch 'feature/serverless' of https://github.com/CloudOS-Group…
Browse files Browse the repository at this point in the history
…3/minik8s into feature/serverless
  • Loading branch information
illyaks committed May 29, 2024
2 parents 74dac94 + 17f8a13 commit 16b0f0e
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 43 deletions.
7 changes: 7 additions & 0 deletions pkg/api/msg_type/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ const (
TriggerTopic = "trigger"
// job topic
JobTopic = "job"
// function topic
FunctionTopic = "function"
// endpoint topic
EndpointTopic = "endpoint"
// pod operation
Expand Down Expand Up @@ -78,3 +80,8 @@ type JobMsg struct {
NewJob api.Job `json:"new_job,omitempty"`
OldJob api.Job `json:"old_job,omitempty"`
}

type FunctionMsg struct {
Opt string `json:"opt"`
OldFunctionName string `json:"old_function_name,omitempty"`
}
126 changes: 84 additions & 42 deletions pkg/controller/controllers/serverlesscontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ type PodWithStatus struct {
}

type ServerlessController struct {
freePods []PodWithStatus //uuid of pod -> isFree
subscriber *kafka.Subscriber
ready chan bool
done chan bool
functionFreePods map[string][]PodWithStatus
subscriber *kafka.Subscriber
ready chan bool
done chan bool
}

func NewServerlessController() *ServerlessController {
group := "serverless-controller"
Controller := &ServerlessController{
ready: make(chan bool),
done: make(chan bool),
freePods: []PodWithStatus{},
subscriber: kafka.NewSubscriber(group),
ready: make(chan bool),
done: make(chan bool),
functionFreePods: make(map[string][]PodWithStatus),
subscriber: kafka.NewSubscriber(group),
}
return Controller
}
Expand All @@ -57,13 +57,18 @@ func (this *ServerlessController) Cleanup(_ sarama.ConsumerGroupSession) error {

func (this *ServerlessController) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
if msg.Topic == msg_type.TriggerTopic {
log.Info("trigger received")
switch msg.Topic {
case msg_type.TriggerTopic:
sess.MarkMessage(msg, "")
this.triggerNewJob(msg.Value)
} else if msg.Topic == msg_type.JobTopic {
case msg_type.JobTopic:
sess.MarkMessage(msg, "")
this.updateJob(msg.Value)
case msg_type.FunctionTopic:
sess.MarkMessage(msg, "")
this.DeleteFunction(msg.Value)
default:
log.Warn("Unknown msg type in serverless controller")
}
}
return nil
Expand All @@ -79,17 +84,20 @@ func (this *ServerlessController) triggerNewJob(content []byte) {

var freePod *api.Pod
found := false
for index, podWithStatus := range this.freePods {
if podWithStatus.isFree {
functionName := triggerMsg.Function.Metadata.Name
for index, functionFreePod := range this.functionFreePods[functionName] {
if functionFreePod.isFree {
found = true
freePod = podWithStatus.pod
this.freePods[index].isFree = false
freePod = functionFreePod.pod
this.functionFreePods[functionName][index].isFree = false
this.functionFreePods[functionName][index].freeTime = 0
break
}
}
if !found {
freePod = function.CreatePodFromFunction(&triggerMsg.Function)
newPodWithStatus := PodWithStatus{pod: freePod, isFree: false, freeTime: 0}
this.freePods = append(this.freePods, newPodWithStatus)
this.functionFreePods[functionName] = append(this.functionFreePods[functionName], newPodWithStatus)
}
if freePod == nil {
log.Error("freePod shouldn't be nil")
Expand Down Expand Up @@ -129,48 +137,82 @@ func (this *ServerlessController) updateJob(content []byte) {
case msg_type.Add:
log.Warn("why will there be add msg of job in a place other than serverless controller?")
case msg_type.Update:
for index, podWithStatus := range this.freePods {
if podWithStatus.pod.Metadata.Name == jobMsg.OldJob.Instance.Metadata.Name {
switch jobMsg.NewJob.Status {
case api.JOB_ENDED:
this.freePods[index].isFree = true
this.freePods[index].freeTime = 0
case api.JOB_RUNNING:
this.freePods[index].isFree = false
this.freePods[index].freeTime = 0
for functionName, freePods := range this.functionFreePods {
for index, freePod := range freePods {
if freePod.pod.Metadata.Name == jobMsg.OldJob.Instance.Metadata.Name {
switch jobMsg.NewJob.Status {
case api.JOB_ENDED:
this.functionFreePods[functionName][index].isFree = true
this.functionFreePods[functionName][index].freeTime = 0
case api.JOB_RUNNING:
this.functionFreePods[functionName][index].isFree = false
this.functionFreePods[functionName][index].freeTime = 0
}
}
}
}
case msg_type.Delete:
for index, podWithStatus := range this.freePods {
if podWithStatus.pod.Metadata.Name == jobMsg.OldJob.Instance.Metadata.Name {
this.freePods[index].isFree = true
this.freePods[index].freeTime = 0
for functionName, freePods := range this.functionFreePods {
for index, freePod := range freePods {
if freePod.pod.Metadata.Name == jobMsg.OldJob.Instance.Metadata.Name {
this.functionFreePods[functionName][index].isFree = true
this.functionFreePods[functionName][index].freeTime = 0
}
}
}
default:
log.Warn("unknown operation %v", jobMsg.Opt)
}
}

func (this *ServerlessController) DeleteFunction(content []byte) {
var functionMsg msg_type.FunctionMsg
err := json.Unmarshal(content, &functionMsg)
if err != nil {
log.Error("json unmarshal err %v", err)
return
}

if functionMsg.Opt != msg_type.Delete {
log.Error("we don't support operations other than delete")
return
}

functionName := functionMsg.OldFunctionName
for _, functionFreePod := range this.functionFreePods[functionName] {
URL := config.GetUrlPrefix() + config.FunctionURL
URL = strings.Replace(URL, config.NamespacePlaceholder, "default", -1)
URL = strings.Replace(URL, config.NamePlaceholder, functionFreePod.pod.Metadata.Name, -1)

err := httputil.Delete(URL)
if err != nil {
log.Error("delete err %v", err)
return
}
}
delete(this.functionFreePods, functionName)
log.Info("successfully delete function in serverless controller")
}

func (this *ServerlessController) clearExpirePod() {
for {
<-time.After(CheckInterval)
for index, podWithStatus := range this.freePods {
if podWithStatus.freeTime >= MaxFreeTime {
URL := config.GetUrlPrefix() + config.PodURL
URL = strings.Replace(URL, config.NamespacePlaceholder, "default", -1)
URL = strings.Replace(URL, config.NamePlaceholder, podWithStatus.pod.Metadata.Name, -1)

err := httputil.Delete(URL)
if err != nil {
log.Error("delete pod err %v", err)
return
for functionName, freePods := range this.functionFreePods {
for index, freePod := range freePods {
if freePod.freeTime >= MaxFreeTime {
URL := config.GetUrlPrefix() + config.PodURL
URL = strings.Replace(URL, config.NamespacePlaceholder, "default", -1)
URL = strings.Replace(URL, config.NamePlaceholder, freePod.pod.Metadata.Name, -1)

err := httputil.Delete(URL)
if err != nil {
log.Error("delete pod err %v", err)
return
}
this.functionFreePods[functionName] = append(this.functionFreePods[functionName][:index], this.functionFreePods[functionName][index+1:]...)
}
this.freePods = append(this.freePods[:index], this.freePods[index+1:]...)
continue
this.functionFreePods[functionName][index].freeTime += CheckInterval
}
this.freePods[index].freeTime += CheckInterval
}
}
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/serverless/function/function.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package function

import (
"encoding/json"
"github.com/google/uuid"
"minik8s/pkg/api"
"minik8s/pkg/api/msg_type"
"minik8s/pkg/kafka"
"minik8s/pkg/serverless/function/function_util"
"minik8s/util/log"
)

var publisher kafka.Publisher

// This file handles:
// 1. create pod from function

Expand Down Expand Up @@ -66,7 +71,11 @@ func DeleteFunction(name string, namespace string) {
// Delete function
log.Info("Delete function")
// Step 1: delete all pods(replicas)

var functionMsg msg_type.FunctionMsg
functionMsg.Opt = msg_type.Delete
functionMsg.OldFunctionName = name
byteArr, _ := json.Marshal(functionMsg)
publisher.Publish(msg_type.FunctionTopic, string(byteArr))
// Step 2: delete images
err := function_util.DeleteFunctionImage(name, namespace)
if err != nil {
Expand Down

0 comments on commit 16b0f0e

Please sign in to comment.