Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic working implementation of executor component #19

Merged
merged 6 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions cmd/executor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package main

import (
"fmt"
config "github.com/G-Research/k8s-batch/internal/executor/configuration"
"github.com/G-Research/k8s-batch/internal/executor/reporter"
"github.com/G-Research/k8s-batch/internal/executor/service"
"github.com/G-Research/k8s-batch/internal/executor/startup"
"github.com/G-Research/k8s-batch/internal/executor/submitter"
"github.com/spf13/viper"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
Expand All @@ -16,6 +18,21 @@ import (
)

func main() {
viper.SetConfigName("config")
viper.AddConfigPath("./config/executor")
var configuration config.Configuration

if err := viper.ReadInConfig(); err != nil {
fmt.Println(err)
os.Exit(-1)
}

err := viper.Unmarshal(&configuration)
if err != nil {
fmt.Println(err)
os.Exit(-1)
}

kubernetesClient, err := startup.LoadDefaultKubernetesClient()
if err != nil {
fmt.Println(err)
Expand All @@ -28,7 +45,7 @@ func main() {
//tweakOptions := informers.WithTweakListOptions(tweakOptionsFunc)
//factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, tweakOptions)

podEventReporter := reporter.PodEventReporter{ KubernetesClient: kubernetesClient }
podEventReporter := reporter.PodEventReporter{KubernetesClient: kubernetesClient}

factory := informers.NewSharedInformerFactoryWithOptions(kubernetesClient, 0)
podWatcher := initializePodWatcher(factory, podEventReporter)
Expand All @@ -41,11 +58,11 @@ func main() {
defer close(stopper)
factory.Start(stopper)

jobSubmitter := submitter.JobSubmitter{KubernetesClient:kubernetesClient}
jobSubmitter := submitter.JobSubmitter{KubernetesClient: kubernetesClient}

kubernetesAllocationService := service.KubernetesAllocationService{
PodLister: podWatcher.Lister(),
NodeLister: nodeLister,
PodLister: podWatcher.Lister(),
NodeLister: nodeLister,
JobSubmitter: jobSubmitter,
}

Expand Down
9 changes: 9 additions & 0 deletions config/executor/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
application:
clusterId : "Cluster1"
inClusterDeployment: false
task:
utilisationReportingInterval: 30s
forgottenCompletedPodReportingInterval: 30s
jobLeaseRenewalInterval: 10s
podDeletionInterval: 60s
requestNewJobsInterval: 10s
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ require (
github.com/go-redis/redis v6.15.2+incompatible
github.com/imdario/mergo v0.3.7 // indirect
github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a
github.com/oklog/ulid v1.3.1
github.com/spf13/viper v1.4.0
github.com/stretchr/testify v1.3.0
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect
golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect
k8s.io/api v0.0.0-20190620084959-7cf5895f2711
k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719
k8s.io/client-go v0.0.0-20190620085101-78d2af792bab
Expand Down
116 changes: 116 additions & 0 deletions go.sum

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions internal/executor/configuration/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package configuration

import "time"

type ApplicationConfiguration struct {
ClusterId string
InClusterDeployment bool
}

type TaskConfiguration struct {
UtilisationReportingInterval time.Duration
ForgottenCompletedPodReportingInterval time.Duration
JobLeaseRenewalInterval time.Duration
PodDeletionInterval time.Duration
RequestNewJobsInterval time.Duration
}

type Configuration struct {
Application ApplicationConfiguration
Task TaskConfiguration
}
8 changes: 8 additions & 0 deletions internal/executor/domain/pod_labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package domain

const (
JobId = "job_id"
JobSetId = "jobset_id"
Queue = "queue_id"
ReadyForCleanup = "ready_for_cleanup"
)
16 changes: 16 additions & 0 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package reporter

import v1 "k8s.io/api/core/v1"

type EventReporter interface {
ReportEvent(pod *v1.Pod)
}

type JobEventReporter struct {
podEvents []*v1.Pod
//TODO API CLIENT
}

func (jobEventReporter JobEventReporter) ReportEvent(pod v1.Pod) {

}
21 changes: 7 additions & 14 deletions internal/executor/reporter/pod_event_reporter_test.go
Original file line number Diff line number Diff line change
@@ -1,48 +1,41 @@
package reporter

import (
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"testing"
)

func TestIsInTerminalState_ShouldReturnTrueWhenPodInSucceededPhase(t *testing.T) {
pod := v1.Pod{
Status: v1.PodStatus {
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
}

inTerminatedState := IsInTerminalState(&pod)

if !inTerminatedState {
t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, true)
}
assert.True(t, inTerminatedState)
}

func TestIsInTerminalState_ShouldReturnTrueWhenPodInFailedPhase(t *testing.T) {
pod := v1.Pod{
Status: v1.PodStatus {
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
}

inTerminatedState := IsInTerminalState(&pod)

if !inTerminatedState {
t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, true)
}
assert.True(t, inTerminatedState)
}

func TestIsInTerminalState_ShouldReturnFalseWhenPodInNonTerminalState(t *testing.T) {
pod := v1.Pod{
Status: v1.PodStatus {
Status: v1.PodStatus{
Phase: v1.PodPending,
},
}

inTerminatedState := IsInTerminalState(&pod)

if inTerminatedState {
t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, false)
}
assert.False(t, inTerminatedState)
}
20 changes: 16 additions & 4 deletions internal/executor/service/kubernetes_allocation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"fmt"
"github.com/G-Research/k8s-batch/internal/executor/submitter"
"github.com/G-Research/k8s-batch/internal/model"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -42,13 +43,24 @@ func (allocationService KubernetesAllocationService) FillInSpareClusterCapacity(
freeMemory := totalNodeMemory.DeepCopy()
freeMemory.Sub(totalPodMemoryLimit)

//newJobs := jobRequest.RequestJobs(freeCpu, freeMemory)
//for _, job := range newJobs {
// jobSubmitter.SubmitJob(job, "default")
//}
newJobs := requestJobs(&freeCpu, &freeMemory)
for _, job := range newJobs {
allocationService.JobSubmitter.SubmitJob(job, "default")
}

}

func requestJobs(freeCpu *resource.Quantity, freeMemory *resource.Quantity) []*model.Job {
//leaseRequest := api.LeaseRequest{
// ClusterID: "ClusterID",
// AvailableResource: map[v1.ResourceName]resource.Quantity {
// v1.ResourceCPU: *freeCpu,
// v1.ResourceMemory: *freeMemory,
// },
//}
return make([]*model.Job, 0)
}

func getAllAvailableProcessingNodes(nodes []*v1.Node) []*v1.Node {
processingNodes := make([]*v1.Node, 0, len(nodes))

Expand Down
Loading