diff --git a/cmd/executor/main.go b/cmd/executor/main.go index 6973a9edd3f..1172ee848f0 100644 --- a/cmd/executor/main.go +++ b/cmd/executor/main.go @@ -2,10 +2,12 @@ package main import ( "fmt" + config "github.com/G-Research/k8s-batch/internal/executor/configuration" "github.com/G-Research/k8s-batch/internal/executor/reporter" "github.com/G-Research/k8s-batch/internal/executor/service" "github.com/G-Research/k8s-batch/internal/executor/startup" "github.com/G-Research/k8s-batch/internal/executor/submitter" + "github.com/spf13/viper" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/informers" @@ -16,6 +18,21 @@ import ( ) func main() { + viper.SetConfigName("config") + viper.AddConfigPath("./config/executor") + var configuration config.Configuration + + if err := viper.ReadInConfig(); err != nil { + fmt.Println(err) + os.Exit(-1) + } + + err := viper.Unmarshal(&configuration) + if err != nil { + fmt.Println(err) + os.Exit(-1) + } + kubernetesClient, err := startup.LoadDefaultKubernetesClient() if err != nil { fmt.Println(err) @@ -28,7 +45,7 @@ func main() { //tweakOptions := informers.WithTweakListOptions(tweakOptionsFunc) //factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, tweakOptions) - podEventReporter := reporter.PodEventReporter{ KubernetesClient: kubernetesClient } + podEventReporter := reporter.PodEventReporter{KubernetesClient: kubernetesClient} factory := informers.NewSharedInformerFactoryWithOptions(kubernetesClient, 0) podWatcher := initializePodWatcher(factory, podEventReporter) @@ -41,11 +58,11 @@ func main() { defer close(stopper) factory.Start(stopper) - jobSubmitter := submitter.JobSubmitter{KubernetesClient:kubernetesClient} + jobSubmitter := submitter.JobSubmitter{KubernetesClient: kubernetesClient} kubernetesAllocationService := service.KubernetesAllocationService{ - PodLister: podWatcher.Lister(), - NodeLister: nodeLister, + PodLister: podWatcher.Lister(), + NodeLister: nodeLister, JobSubmitter: jobSubmitter, } diff --git a/config/executor/config.yaml b/config/executor/config.yaml new file mode 100644 index 00000000000..3963d74272d --- /dev/null +++ b/config/executor/config.yaml @@ -0,0 +1,9 @@ +application: + clusterId : "Cluster1" + inClusterDeployment: false +task: + utilisationReportingInterval: 30s + forgottenCompletedPodReportingInterval: 30s + jobLeaseRenewalInterval: 10s + podDeletionInterval: 60s + requestNewJobsInterval: 10s \ No newline at end of file diff --git a/go.mod b/go.mod index bee0dd19e53..34d3454e885 100644 --- a/go.mod +++ b/go.mod @@ -7,11 +7,11 @@ require ( github.com/go-redis/redis v6.15.2+incompatible github.com/imdario/mergo v0.3.7 // indirect github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a - github.com/oklog/ulid v1.3.1 + github.com/spf13/viper v1.4.0 + github.com/stretchr/testify v1.3.0 golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 // indirect golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb // indirect - golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 // indirect k8s.io/api v0.0.0-20190620084959-7cf5895f2711 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab diff --git a/go.sum b/go.sum index ee0fa5dcb88..40f9137e19a 100644 --- a/go.sum +++ b/go.sum @@ -1,28 +1,60 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/go-autorest v11.1.2+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgrijalva/jwt-go v0.0.0-20160705203006-01aeca54ebda/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g= github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= github.com/gin-gonic/gin v1.4.0 h1:3tMoCCfM7ppqsR0ptz/wi1impNpT7/9wQtMZ8lr1mCQ= github.com/gin-gonic/gin v1.4.0/go.mod h1:OW2EZn3DO8Ln9oIKOvM++LBO+5UPHJJDH72/q/3rZdM= +github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4= github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM= github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= +github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef h1:veQD95Isof8w9/WXiA+pa3tz3fJXkt5B7QaRBrM62gk= +github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= @@ -31,25 +63,48 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k= github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/gophercloud/gophercloud v0.0.0-20190126172459-c818fa66e4c8/go.mod h1:3WdhXV3rUYy9p6AUW8d94kr+HS62Y4VL9mBnFxsD8q4= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gregjones/httpcache v0.0.0-20170728041850-787624de3eb7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180701071628-ab8a2e0c74be/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a h1:b+Gt8sQs//Sl5Dcem5zP9Qc2FgEUAygREa2AAa2Vmcw= github.com/kjk/betterguid v0.0.0-20170621091430-c442874ba63a/go.mod h1:uxRAhHE1nl34DpWgfe0CYbNYbCnYplaB6rZH9ReWtUk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= @@ -57,35 +112,83 @@ github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3 h1:EooPXg51Tn+xmWPXJUGCnJhJSpeuMlBmfJVcqIRmmv8= github.com/onsi/gomega v0.0.0-20190113212917-5533ce8a0da3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= +github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= +github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= +github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.4.0 h1:yXHLWeravcrgGyFSyCgdYpXQ9dR9c/WED3pg1RhxqEU= +github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181025213731-e84da0312774/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190206173232-65e2d4e15006/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45 h1:SVwTIAaPC2U/AvvLNZ2a7OVsmBpC8L5BlwK1whH3hm0= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -97,13 +200,23 @@ golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5f golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.5.0 h1:KxkO13IPW4Lslp2bz+KHP2E3gtFlrIGNThxkZQ3g+4c= google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= @@ -112,11 +225,14 @@ gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2G gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o= gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.0.0-20190620084959-7cf5895f2711 h1:BblVYz/wE5WtBsD/Gvu54KyBUTJMflolzc5I2DTvh50= k8s.io/api v0.0.0-20190620084959-7cf5895f2711/go.mod h1:TBhBqb1AWbBQbW3XRusr7n7E4v2+5ZY8r8sAMnyFC5A= k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 h1:uV4S5IB5g4Nvi+TBVNf3e9L4wrirlwYJ6w88jUQxTUw= diff --git a/internal/executor/configuration/types.go b/internal/executor/configuration/types.go new file mode 100644 index 00000000000..87e4e5da9b8 --- /dev/null +++ b/internal/executor/configuration/types.go @@ -0,0 +1,21 @@ +package configuration + +import "time" + +type ApplicationConfiguration struct { + ClusterId string + InClusterDeployment bool +} + +type TaskConfiguration struct { + UtilisationReportingInterval time.Duration + ForgottenCompletedPodReportingInterval time.Duration + JobLeaseRenewalInterval time.Duration + PodDeletionInterval time.Duration + RequestNewJobsInterval time.Duration +} + +type Configuration struct { + Application ApplicationConfiguration + Task TaskConfiguration +} diff --git a/internal/executor/domain/pod_labels.go b/internal/executor/domain/pod_labels.go new file mode 100644 index 00000000000..c3e1665e374 --- /dev/null +++ b/internal/executor/domain/pod_labels.go @@ -0,0 +1,8 @@ +package domain + +const ( + JobId = "job_id" + JobSetId = "jobset_id" + Queue = "queue_id" + ReadyForCleanup = "ready_for_cleanup" +) diff --git a/internal/executor/reporter/job_event_reporter.go b/internal/executor/reporter/job_event_reporter.go new file mode 100644 index 00000000000..bb25fe0cc53 --- /dev/null +++ b/internal/executor/reporter/job_event_reporter.go @@ -0,0 +1,16 @@ +package reporter + +import v1 "k8s.io/api/core/v1" + +type EventReporter interface { + ReportEvent(pod *v1.Pod) +} + +type JobEventReporter struct { + podEvents []*v1.Pod + //TODO API CLIENT +} + +func (jobEventReporter JobEventReporter) ReportEvent(pod v1.Pod) { + +} diff --git a/internal/executor/reporter/pod_event_reporter_test.go b/internal/executor/reporter/pod_event_reporter_test.go index 9a0926a8930..2d68e05c425 100644 --- a/internal/executor/reporter/pod_event_reporter_test.go +++ b/internal/executor/reporter/pod_event_reporter_test.go @@ -1,48 +1,41 @@ package reporter import ( + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "testing" ) func TestIsInTerminalState_ShouldReturnTrueWhenPodInSucceededPhase(t *testing.T) { pod := v1.Pod{ - Status: v1.PodStatus { + Status: v1.PodStatus{ Phase: v1.PodSucceeded, }, } inTerminatedState := IsInTerminalState(&pod) - - if !inTerminatedState { - t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, true) - } + assert.True(t, inTerminatedState) } func TestIsInTerminalState_ShouldReturnTrueWhenPodInFailedPhase(t *testing.T) { pod := v1.Pod{ - Status: v1.PodStatus { + Status: v1.PodStatus{ Phase: v1.PodFailed, }, } inTerminatedState := IsInTerminalState(&pod) - - if !inTerminatedState { - t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, true) - } + assert.True(t, inTerminatedState) } func TestIsInTerminalState_ShouldReturnFalseWhenPodInNonTerminalState(t *testing.T) { pod := v1.Pod{ - Status: v1.PodStatus { + Status: v1.PodStatus{ Phase: v1.PodPending, }, } inTerminatedState := IsInTerminalState(&pod) - if inTerminatedState { - t.Errorf("InTerminatedState was incorrect, got: %t want: %t", inTerminatedState, false) - } + assert.False(t, inTerminatedState) } diff --git a/internal/executor/service/kubernetes_allocation_service.go b/internal/executor/service/kubernetes_allocation_service.go index e1b6c83aa1b..9064fdb2d08 100644 --- a/internal/executor/service/kubernetes_allocation_service.go +++ b/internal/executor/service/kubernetes_allocation_service.go @@ -3,6 +3,7 @@ package service import ( "fmt" "github.com/G-Research/k8s-batch/internal/executor/submitter" + "github.com/G-Research/k8s-batch/internal/model" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/labels" @@ -42,13 +43,24 @@ func (allocationService KubernetesAllocationService) FillInSpareClusterCapacity( freeMemory := totalNodeMemory.DeepCopy() freeMemory.Sub(totalPodMemoryLimit) - //newJobs := jobRequest.RequestJobs(freeCpu, freeMemory) - //for _, job := range newJobs { - // jobSubmitter.SubmitJob(job, "default") - //} + newJobs := requestJobs(&freeCpu, &freeMemory) + for _, job := range newJobs { + allocationService.JobSubmitter.SubmitJob(job, "default") + } } +func requestJobs(freeCpu *resource.Quantity, freeMemory *resource.Quantity) []*model.Job { + //leaseRequest := api.LeaseRequest{ + // ClusterID: "ClusterID", + // AvailableResource: map[v1.ResourceName]resource.Quantity { + // v1.ResourceCPU: *freeCpu, + // v1.ResourceMemory: *freeMemory, + // }, + //} + return make([]*model.Job, 0) +} + func getAllAvailableProcessingNodes(nodes []*v1.Node) []*v1.Node { processingNodes := make([]*v1.Node, 0, len(nodes)) diff --git a/internal/executor/service/kubernetes_allocation_service_test.go b/internal/executor/service/kubernetes_allocation_service_test.go index 30f3b8cf7cf..369100a3547 100644 --- a/internal/executor/service/kubernetes_allocation_service_test.go +++ b/internal/executor/service/kubernetes_allocation_service_test.go @@ -1,6 +1,7 @@ package service import ( + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -9,53 +10,47 @@ import ( func TestGetAllAvailableProcessingNodes_ShouldReturnAvailableProcessingNodes(t *testing.T) { node := v1.Node{ - Spec: v1.NodeSpec { + Spec: v1.NodeSpec{ Unschedulable: false, - Taints:nil, + Taints: nil, }, } nodes := []*v1.Node{&node} result := getAllAvailableProcessingNodes(nodes) - if len(result) != 1 { - t.Errorf("GetAllAvailableProcessingNodes was incorrect, got: %d want: %d", len(result), 1) - } + assert.Equal(t, len(result), 1) } func TestGetAllAvailableProcessingNodes_ShouldFilterUnschedulableNodes(t *testing.T) { node := v1.Node{ - Spec: v1.NodeSpec { + Spec: v1.NodeSpec{ Unschedulable: true, - Taints:nil, + Taints: nil, }, } nodes := []*v1.Node{&node} result := getAllAvailableProcessingNodes(nodes) - if len(result) != 0 { - t.Errorf("GetAllAvailableProcessingNodes was incorrect, got: %d want: %d", len(result), 0) - } + assert.Equal(t, len(result), 0) } func TestGetAllAvailableProcessingNodes_ShouldFilterNodesWithNoScheduleTaint(t *testing.T) { - taint := v1.Taint { - Effect:v1.TaintEffectNoSchedule, + taint := v1.Taint{ + Effect: v1.TaintEffectNoSchedule, } node := v1.Node{ - Spec: v1.NodeSpec { + Spec: v1.NodeSpec{ Unschedulable: false, - Taints: []v1.Taint{taint}, + Taints: []v1.Taint{taint}, }, } nodes := []*v1.Node{&node} result := getAllAvailableProcessingNodes(nodes) - if len(result) != 0 { - t.Errorf("GetAllAvailableProcessingNodes was incorrect, got: %d want: %d", len(result), 0) - } + assert.Equal(t, len(result), 0) } func TestGetAllPodsOnNodes_ShouldExcludePodsNoOnGivenNodes(t *testing.T) { @@ -72,18 +67,17 @@ func TestGetAllPodsOnNodes_ShouldExcludePodsNoOnGivenNodes(t *testing.T) { } node := v1.Node{ - ObjectMeta : metav1.ObjectMeta { + ObjectMeta: metav1.ObjectMeta{ Name: presentNodeName, }, } - pods := []*v1.Pod {&podOnNode, &podNotOnNode} - nodes := []*v1.Node {&node} + pods := []*v1.Pod{&podOnNode, &podNotOnNode} + nodes := []*v1.Node{&node} result := getAllPodsOnNodes(pods, nodes) - if len(result) != 1 || result[0].Spec.NodeName != presentNodeName { - t.Errorf("GetAllPodsOnNodes was incorrect, got: %d want: %d (%s)", len(result), 1, presentNodeName) - } + assert.Equal(t, len(result), 1) + assert.Equal(t, result[0].Spec.NodeName, presentNodeName) } func TestGetAllPodsOnNodes_ShouldHandleNoNodesProvided(t *testing.T) { @@ -93,19 +87,17 @@ func TestGetAllPodsOnNodes_ShouldHandleNoNodesProvided(t *testing.T) { }, } - pods := []*v1.Pod {&podOnNode} + pods := []*v1.Pod{&podOnNode} var nodes []*v1.Node result := getAllPodsOnNodes(pods, nodes) - if len(result) != 0 { - t.Errorf("GetAllPodsOnNodes was incorrect, got: %d want: %d", len(result), 0) - } + assert.Equal(t, len(result), 0) } func TestCalculateTotalCpu(t *testing.T) { resources := resource.NewMilliQuantity(100, resource.DecimalSI) - resourceMap := map[v1.ResourceName]resource.Quantity { v1.ResourceCPU: *resources } + resourceMap := map[v1.ResourceName]resource.Quantity{v1.ResourceCPU: *resources} node1 := makeNodeWithResource(resourceMap) node2 := makeNodeWithResource(resourceMap) @@ -113,14 +105,12 @@ func TestCalculateTotalCpu(t *testing.T) { //Expected is resources *2 resources.Add(*resources) - if !result.Equal(*resources) { - t.Errorf("CalculateTotalCpu was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } func TestCalculateTotalMemory(t *testing.T) { resources := resource.NewMilliQuantity(50*1024*1024, resource.DecimalSI) - resourceMap := map[v1.ResourceName]resource.Quantity { v1.ResourceMemory: *resources } + resourceMap := map[v1.ResourceName]resource.Quantity{v1.ResourceMemory: *resources} node1 := makeNodeWithResource(resourceMap) node2 := makeNodeWithResource(resourceMap) @@ -128,10 +118,7 @@ func TestCalculateTotalMemory(t *testing.T) { //Expected is resources *2 resources.Add(*resources) - - if !result.Equal(*resources) { - t.Errorf("CalculateTotalMemory was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } func TestCalculateTotalCpuLimit_ShouldSumAllContainers(t *testing.T) { @@ -142,10 +129,7 @@ func TestCalculateTotalCpuLimit_ShouldSumAllContainers(t *testing.T) { //Expected is resources * 2 containers resources.Add(*resources) - - if !result.Equal(*resources) { - t.Errorf("CalculateTotalCpuLimit was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } func TestCalculateTotalCpuLimit_ShouldSumAllPods(t *testing.T) { @@ -157,10 +141,7 @@ func TestCalculateTotalCpuLimit_ShouldSumAllPods(t *testing.T) { //Expected is resources * 2 pods resources.Add(*resources) - - if !result.Equal(*resources) { - t.Errorf("CalculateTotalCpuLimit was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } func TestCalculateTotalMemoryLimit_ShouldSumAllContainers(t *testing.T) { @@ -171,10 +152,7 @@ func TestCalculateTotalMemoryLimit_ShouldSumAllContainers(t *testing.T) { //Expected is resources * 2 containers resources.Add(*resources) - - if !result.Equal(*resources) { - t.Errorf("CalculateTotalMemoryLimit was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } func TestCalculateTotalMemoryLimit__ShouldSumAllPods(t *testing.T) { @@ -185,24 +163,20 @@ func TestCalculateTotalMemoryLimit__ShouldSumAllPods(t *testing.T) { //Expected is resources * 2 containers resources.Add(*resources) - - if !result.Equal(*resources) { - t.Errorf("CalculateTotalMemoryLimit was incorrect, got: %#v want %#v", result, resources) - } + assert.Equal(t, result, *resources) } - func makePodWthResource(resourceName v1.ResourceName, resources []*resource.Quantity) v1.Pod { containers := make([]v1.Container, len(resources)) for i, res := range resources { containers[i] = v1.Container{ Resources: v1.ResourceRequirements{ - Limits: map[v1.ResourceName]resource.Quantity { resourceName: *res }, + Limits: map[v1.ResourceName]resource.Quantity{resourceName: *res}, }, } } pod := v1.Pod{ - Spec: v1.PodSpec { + Spec: v1.PodSpec{ Containers: containers, }, } @@ -210,7 +184,6 @@ func makePodWthResource(resourceName v1.ResourceName, resources []*resource.Quan return pod } - func makeNodeWithResource(resources map[v1.ResourceName]resource.Quantity) v1.Node { node := v1.Node{ Status: v1.NodeStatus{ diff --git a/internal/executor/submitter/job_submitter.go b/internal/executor/submitter/job_submitter.go index 5a8f8706b99..93ac1c8eef2 100644 --- a/internal/executor/submitter/job_submitter.go +++ b/internal/executor/submitter/job_submitter.go @@ -1,13 +1,47 @@ package submitter import ( + "github.com/G-Research/k8s-batch/internal/executor/domain" "github.com/G-Research/k8s-batch/internal/model" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "strconv" ) +const PodNamePrefix string = "batch_" + type JobSubmitter struct { KubernetesClient kubernetes.Interface } -func (submitter JobSubmitter) SubmitJob(job *model.Job, namespace string) { +func (submitter JobSubmitter) SubmitJob(job *model.Job, namespace string) (*v1.Pod, error) { + pod := createPod(job) + + return submitter.KubernetesClient.CoreV1().Pods(namespace).Create(pod) +} + +func createPod(job *model.Job) *v1.Pod { + labels := createLabels(job) + + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: PodNamePrefix + job.Id, + Labels: labels, + }, + Spec: *job.PodSpec, + } + + return &pod +} + +func createLabels(job *model.Job) map[string]string { + labels := make(map[string]string) + + labels[domain.JobId] = job.Id + labels[domain.JobSetId] = job.JobSetId + labels[domain.Queue] = job.Queue + labels[domain.ReadyForCleanup] = strconv.FormatBool(false) + + return labels } diff --git a/internal/executor/submitter/job_submitter_test.go b/internal/executor/submitter/job_submitter_test.go new file mode 100644 index 00000000000..25022a5bbf9 --- /dev/null +++ b/internal/executor/submitter/job_submitter_test.go @@ -0,0 +1,67 @@ +package submitter + +import ( + "github.com/G-Research/k8s-batch/internal/executor/domain" + "github.com/G-Research/k8s-batch/internal/model" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "testing" +) + +func TestCreateLabels_CreatesExpectedLabels(t *testing.T) { + job := model.Job{ + Id: "Id", + JobSetId: "JobSetId", + Queue: "Queue1", + } + + expectedOutput := map[string]string{ + domain.JobId: job.Id, + domain.JobSetId: job.JobSetId, + domain.Queue: job.Queue, + domain.ReadyForCleanup: "false", + } + + result := createLabels(&job) + + assert.Equal(t, result, expectedOutput) +} + +func TestCreatePod_CreatesExpectedPod(t *testing.T) { + job := model.Job{ + Id: "Id", + JobSetId: "JobSetId", + Queue: "Queue1", + PodSpec: makePodSpec(), + } + + labels := createLabels(&job) + + expectedOutput := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: PodNamePrefix + job.Id, + Labels: labels, + }, + Spec: *job.PodSpec, + } + + result := createPod(&job) + + assert.Equal(t, result, &expectedOutput) +} + +func makePodSpec() *v1.PodSpec { + containers := make([]v1.Container, 1) + containers[0] = v1.Container{ + Name: "Container1", + Image: "index.docker.io/library/ubuntu:latest", + Args: []string{"sleep", "10s"}, + } + spec := v1.PodSpec{ + NodeName: "NodeName", + Containers: containers, + } + + return &spec +} diff --git a/internal/executor/task/background_task.go b/internal/executor/task/background_task.go new file mode 100644 index 00000000000..40fefee9237 --- /dev/null +++ b/internal/executor/task/background_task.go @@ -0,0 +1,5 @@ +package task + +type BackgroundTask interface { + Run() +} diff --git a/internal/executor/task/cluster_utilisation_reporter_task.go b/internal/executor/task/cluster_utilisation_reporter_task.go new file mode 100644 index 00000000000..93b4684e495 --- /dev/null +++ b/internal/executor/task/cluster_utilisation_reporter_task.go @@ -0,0 +1,65 @@ +package task + +import ( + "github.com/G-Research/k8s-batch/internal/executor/domain" + v1 "k8s.io/api/core/v1" + lister "k8s.io/client-go/listers/core/v1" +) + +type ClusterUtilisationReporterTask struct { + podLister lister.PodLister + //TODO API +} + +func (clusterUtilisationReporter ClusterUtilisationReporterTask) Run() { + allActivePods := getAllActivePods(clusterUtilisationReporter.podLister) + getUtilisationByQueue(allActivePods) + +} + +func getAllActivePods(podLister lister.PodLister) []*v1.Pod { + runningPodsSelector, err := createRunningPodLabelSelector() + if err != nil { + //TODO Handle error case + } + + allActiveBatchPods, err := podLister.List(runningPodsSelector) + allActiveBatchPods = removePodsInTerminalState(allActiveBatchPods) + return allActiveBatchPods +} + +func removePodsInTerminalState(pods []*v1.Pod) []*v1.Pod { + activePods := make([]*v1.Pod, 0) + + for _, pod := range pods { + if !isInTerminalState(pod) { + activePods = append(activePods, pod) + } + } + + return activePods +} + +func getUtilisationByQueue(pods []*v1.Pod) map[string]v1.ResourceList { + utilisationByQueue := make(map[string]v1.ResourceList) + + for _, pod := range pods { + queue := pod.Labels[domain.Queue] + + if _, ok := utilisationByQueue[queue]; ok { + //TODO Once we have decided upon which resource struct to use + manipulation methods implemented, add assignment code here + } else { + //TODO Once we have decided upon which resource struct to use + manipulation methods implemented, add assignment code here + } + } + + return utilisationByQueue +} + +func isInTerminalState(pod *v1.Pod) bool { + podPhase := pod.Status.Phase + if podPhase == v1.PodSucceeded || podPhase == v1.PodFailed { + return true + } + return false +} diff --git a/internal/executor/task/forgotten_completed_pod_reporter_task.go b/internal/executor/task/forgotten_completed_pod_reporter_task.go new file mode 100644 index 00000000000..b1ccd249b17 --- /dev/null +++ b/internal/executor/task/forgotten_completed_pod_reporter_task.go @@ -0,0 +1,89 @@ +package task + +import ( + "errors" + "github.com/G-Research/k8s-batch/internal/executor/domain" + "github.com/G-Research/k8s-batch/internal/executor/reporter" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + listers "k8s.io/client-go/listers/core/v1" + "time" +) + +type ForgottenCompletedPodReporterTask struct { + PodLister listers.PodLister + EventReporter reporter.EventReporter +} + +func (podCleanup ForgottenCompletedPodReporterTask) Run() { + podsToBeReported := getAllPodsRequiringCompletionEvent(podCleanup.PodLister) + + for _, pod := range podsToBeReported { + podCleanup.EventReporter.ReportEvent(pod) + } +} + +func getAllPodsRequiringCompletionEvent(podLister listers.PodLister) []*v1.Pod { + requirement, err := labels.NewRequirement(domain.JobId, selection.Exists, []string{}) + if err != nil { + return nil + //TODO Handle error case + } + + selector := labels.NewSelector().Add(*requirement) + allBatchPodsNotMarkedForCleanup, err := podLister.List(selector) + + if err != nil { + //TODO Do something in case of error + } + + completedBatchPodsNotMarkedForCleanup := filterCompletedPods(allBatchPodsNotMarkedForCleanup) + completedBatchPodsToBeReported := filterPodsInStateForLongerThanGivenDuration(completedBatchPodsNotMarkedForCleanup, time.Minute*2) + + return completedBatchPodsToBeReported +} + +func filterCompletedPods(pods []*v1.Pod) []*v1.Pod { + completedPods := make([]*v1.Pod, 0, len(pods)) + + for _, pod := range pods { + if isInTerminalState(pod) { + completedPods = append(completedPods, pod) + } + } + + return completedPods +} + +func filterPodsInStateForLongerThanGivenDuration(pods []*v1.Pod, duration time.Duration) []*v1.Pod { + podsInStateForLongerThanDuration := make([]*v1.Pod, 0) + + expiryTime := time.Now().Add(-duration) + for _, pod := range pods { + lastStatusChange, err := lastStatusChange(pod) + if err != nil || lastStatusChange.Before(expiryTime) { + podsInStateForLongerThanDuration = append(podsInStateForLongerThanDuration, pod) + } + } + + return podsInStateForLongerThanDuration +} + +func lastStatusChange(pod *v1.Pod) (time.Time, error) { + conditions := pod.Status.Conditions + + if len(conditions) <= 0 { + return *new(time.Time), errors.New("no state changes found, cannot determine last status change") + } + + var maxStatusChange time.Time + + for _, condition := range conditions { + if condition.LastTransitionTime.Time.After(maxStatusChange) { + maxStatusChange = condition.LastTransitionTime.Time + } + } + + return maxStatusChange, nil +} diff --git a/internal/executor/task/forgotten_completed_pod_reporter_task_test.go b/internal/executor/task/forgotten_completed_pod_reporter_task_test.go new file mode 100644 index 00000000000..ccf80a3baef --- /dev/null +++ b/internal/executor/task/forgotten_completed_pod_reporter_task_test.go @@ -0,0 +1,131 @@ +package task + +import ( + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "testing" + "time" +) + +func TestLastStatusChange_ReturnsExpectedValue(t *testing.T) { + earliest := time.Date(2019, 11, 20, 9, 30, 3, 0, time.UTC) + middle := time.Date(2019, 11, 20, 9, 31, 4, 0, time.UTC) + latest := time.Date(2019, 11, 20, 9, 31, 5, 0, time.UTC) + conditions := []v1.PodCondition{ + { + LastTransitionTime: metav1.NewTime(earliest), + }, + { + LastTransitionTime: metav1.NewTime(latest), + }, + { + LastTransitionTime: metav1.NewTime(middle), + }, + } + + pod := v1.Pod{ + Status: v1.PodStatus{ + Conditions: conditions, + }, + } + + result, _ := lastStatusChange(&pod) + + assert.Equal(t, result, latest) +} + +func TestLastStatusChange_ReturnsErrorWhenNoStateChangesFound(t *testing.T) { + pod := v1.Pod{ + Status: v1.PodStatus{ + Conditions: make([]v1.PodCondition, 0), + }, + } + + _, err := lastStatusChange(&pod) + + assert.True(t, err != nil) +} + +func TestFilterPodsInStateForLongerThanGivenDuration(t *testing.T) { + now := time.Now() + threeSecondsAgo := now.Add(-3 * time.Second) + sixSecondsAgo := now.Add(-6 * time.Second) + + pod1 := v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{{LastTransitionTime: metav1.NewTime(threeSecondsAgo)}}, + }, + } + + pod2 := v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{{LastTransitionTime: metav1.NewTime(sixSecondsAgo)}}, + }, + } + + result := filterPodsInStateForLongerThanGivenDuration([]*v1.Pod{&pod1, &pod2}, 5*time.Second) + + assert.Equal(t, len(result), 1) + assert.Equal(t, result[0], &pod2) + +} + +func TestFilterPodsInStateForLongerThanGivenDuration_ReturnsEmptyIfNoPodsInStateForLongerThanGivenDuration(t *testing.T) { + now := time.Now() + threeSecondsAgo := now.Add(-3 * time.Second) + fourSecondsAgo := now.Add(-4 * time.Second) + + pod1 := v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{{LastTransitionTime: metav1.NewTime(threeSecondsAgo)}}, + }, + } + + pod2 := v1.Pod{ + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{{LastTransitionTime: metav1.NewTime(fourSecondsAgo)}}, + }, + } + + result := filterPodsInStateForLongerThanGivenDuration([]*v1.Pod{&pod1, &pod2}, 5*time.Second) + + assert.Equal(t, len(result), 0) +} + +func TestFilterCompletedPods(t *testing.T) { + runningPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + + completedPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + } + + result := filterCompletedPods([]*v1.Pod{&runningPod, &completedPod}) + + assert.Equal(t, len(result), 1) + assert.Equal(t, result[0], &completedPod) +} + +func TestFilterCompletedPods_ShouldReturnEmptyIfNoCompletedPods(t *testing.T) { + runningPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + + pendingPod := v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + } + + result := filterCompletedPods([]*v1.Pod{&runningPod, &pendingPod}) + + assert.Equal(t, len(result), 0) +} diff --git a/internal/executor/task/job_lease_renewal_task.go b/internal/executor/task/job_lease_renewal_task.go new file mode 100644 index 00000000000..a752ec07504 --- /dev/null +++ b/internal/executor/task/job_lease_renewal_task.go @@ -0,0 +1,58 @@ +package task + +import ( + "github.com/G-Research/k8s-batch/internal/executor/domain" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + lister "k8s.io/client-go/listers/core/v1" + "strconv" +) + +type JobLeaseRenewalTask struct { + PodLister lister.PodLister + //TODO API +} + +func (jobLeaseRenewal JobLeaseRenewalTask) Run() { + runningPodsSelector, err := createRunningPodLabelSelector() + if err != nil { + //TODO Handle error case + } + + allPodsEligibleForRenewal, err := jobLeaseRenewal.PodLister.List(runningPodsSelector) + if err != nil { + //TODO Handle error case + } + if len(allPodsEligibleForRenewal) > 0 { + //extractJobIds(allPodsEligibleForRenewal) + } +} + +func createRunningPodLabelSelector() (labels.Selector, error) { + jobIdExistsRequirement, err := labels.NewRequirement(domain.JobId, selection.Exists, []string{}) + if err != nil { + return labels.NewSelector(), err + //TODO Handle error case + } + + notReadyCleanupRequirement, err := labels.NewRequirement(domain.ReadyForCleanup, selection.Equals, []string{strconv.FormatBool(false)}) + if err != nil { + return labels.NewSelector(), err + //TODO Handle error case + } + + selector := labels.NewSelector().Add(*jobIdExistsRequirement, *notReadyCleanupRequirement) + return selector, nil +} + +func extractJobIds(pods []*v1.Pod) []string { + jobIds := make([]string, 0, len(pods)) + + for _, pod := range pods { + jobId := pod.Labels[domain.JobId] + jobIds = append(jobIds, jobId) + } + + return jobIds +} diff --git a/internal/executor/task/pod_deletion_task.go b/internal/executor/task/pod_deletion_task.go new file mode 100644 index 00000000000..2561b1dbbb1 --- /dev/null +++ b/internal/executor/task/pod_deletion_task.go @@ -0,0 +1,46 @@ +package task + +import ( + "github.com/G-Research/k8s-batch/internal/executor/domain" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "strconv" + "strings" +) + +type PodDeletionTask struct { + KubernetesClient kubernetes.Clientset +} + +func (deletionTask PodDeletionTask) Run() { + deleteOptions := createPodDeletionDeleteOptions() + listOptions := createPodDeletionListOptions() + + err := deletionTask.KubernetesClient.CoreV1().Pods(v1.NamespaceAll).DeleteCollection(&deleteOptions, listOptions) + if err != nil { + //TODO handle error + } +} + +func createPodDeletionDeleteOptions() metav1.DeleteOptions { + gracePeriod := int64(0) + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriod, + } + return deleteOptions +} + +func createPodDeletionListOptions() metav1.ListOptions { + markedForDeletionJobLabels := make([]string, 2) + //Only delete batch jobs + markedForDeletionJobLabels = append(markedForDeletionJobLabels, domain.JobId) + //Only delete jobs with ready for cleanup = true + markedForDeletionJobLabels = append(markedForDeletionJobLabels, domain.ReadyForCleanup+"="+strconv.FormatBool(true)) + + listOptions := metav1.ListOptions{ + LabelSelector: strings.Join(markedForDeletionJobLabels, ","), + } + + return listOptions +} diff --git a/internal/executor/task/request_jobs_task.go b/internal/executor/task/request_jobs_task.go new file mode 100644 index 00000000000..4ab8c9b3d1a --- /dev/null +++ b/internal/executor/task/request_jobs_task.go @@ -0,0 +1,11 @@ +package task + +import "github.com/G-Research/k8s-batch/internal/executor/service" + +type RequestJobsTask struct { + AllocationService service.KubernetesAllocationService +} + +func (requestJobs RequestJobsTask) Run() { + requestJobs.AllocationService.FillInSpareClusterCapacity() +}