Skip to content

Commit

Permalink
feat(listers): Switch to informers + listers to reduce the number of …
Browse files Browse the repository at this point in the history
…API calls to the Kubernetes API server
  • Loading branch information
Zachary Seguin committed Aug 11, 2020
1 parent 0f3a9ba commit a8363a5
Show file tree
Hide file tree
Showing 15 changed files with 190 additions and 718 deletions.
11 changes: 8 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,26 @@ module github.com/StatCan/jupyter-apis
go 1.14

require (
github.com/StatCan/kubeflow-controller v0.0.0-20200805150330-c19fa4b0fcb6
github.com/StatCan/kubeflow-controller v0.0.0-20200811133651-33215007413e
github.com/andanhm/go-prettytime v1.0.0
github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef // indirect
github.com/golang/protobuf v1.4.2 // indirect
github.com/gorilla/handlers v1.4.2
github.com/gorilla/mux v1.7.4
github.com/hashicorp/vault/api v1.0.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.10 // indirect
github.com/json-iterator/go v1.1.10 // indirect
github.com/kr/pretty v0.2.0 // indirect
golang.org/x/crypto v0.0.0-20200709230013-948cd5f35899 // indirect
golang.org/x/net v0.0.0-20200707034311-ab3426394381 // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
k8s.io/api v0.18.6
k8s.io/apimachinery v0.18.6
k8s.io/client-go v11.0.1-0.20190409021438-1a26190bd76a+incompatible
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000
k8s.io/utils v0.0.0-20200603063816-c1c6865ac451 // indirect
)

replace k8s.io/client-go => k8s.io/client-go v0.18.6
Expand Down
228 changes: 2 additions & 226 deletions go.sum

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions listers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package main

import (
"context"
"fmt"
"log"
"time"

kubeflowinformers "github.com/StatCan/kubeflow-controller/pkg/generated/informers/externalversions"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)

func (s *server) setupListers(ctx context.Context) error {
factory := informers.NewSharedInformerFactory(s.clientsets.kubernetes, 5*time.Minute)
kubeflowFactory := kubeflowinformers.NewSharedInformerFactory(s.clientsets.kubeflow, time.Minute*5)

// Events
eventsInformer := factory.Core().V1().Events()
go eventsInformer.Informer().Run(ctx.Done())

s.listers.events = eventsInformer.Lister()

// StorageClasses
storageClassesInformer := factory.Storage().V1().StorageClasses()
go storageClassesInformer.Informer().Run(ctx.Done())

s.listers.storageClasses = storageClassesInformer.Lister()

// PersistentVolumeClaims
pvcInformer := factory.Core().V1().PersistentVolumeClaims()
go pvcInformer.Informer().Run(ctx.Done())

s.listers.persistentVolumeClaims = pvcInformer.Lister()

// PodDefaults
podDefaultsInformer := kubeflowFactory.Kubeflow().V1alpha1().PodDefaults()
go podDefaultsInformer.Informer().Run(ctx.Done())

s.listers.podDefaults = podDefaultsInformer.Lister()

// Notebooks
notebooksInformer := kubeflowFactory.Kubeflow().V1().Notebooks()
go notebooksInformer.Informer().Run(ctx.Done())

s.listers.notebooks = notebooksInformer.Lister()

// Wait until sync
log.Printf("synching caches...")
tctx, _ := context.WithTimeout(ctx, time.Minute)
if !cache.WaitForCacheSync(tctx.Done(), eventsInformer.Informer().HasSynced, storageClassesInformer.Informer().HasSynced, pvcInformer.Informer().HasSynced, podDefaultsInformer.Informer().HasSynced, notebooksInformer.Informer().HasSynced) {
return fmt.Errorf("timeout synching caches")
}
log.Printf("done synching caches")

return nil
}
39 changes: 25 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,48 @@ import (
"sync"
"time"

notebooksclient "github.com/StatCan/jupyter-apis/notebooks"
notebooksv1 "github.com/StatCan/jupyter-apis/notebooks/api/v1"
kubeflowv1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1"
kubeflowv1alpha1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1alpha1"
kubeflow "github.com/StatCan/kubeflow-controller/pkg/generated/clientset/versioned"
kubeflowv1listers "github.com/StatCan/kubeflow-controller/pkg/generated/listers/kubeflowcontroller/v1"
kubeflowv1alpha1listers "github.com/StatCan/kubeflow-controller/pkg/generated/listers/kubeflowcontroller/v1alpha1"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
authorizationv1 "k8s.io/api/authorization/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
v1listers "k8s.io/client-go/listers/core/v1"
storagev1listers "k8s.io/client-go/listers/storage/v1"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

var kubeconfig string
var userIDHeader string

type listers struct {
events v1listers.EventLister
storageClasses storagev1listers.StorageClassLister
persistentVolumeClaims v1listers.PersistentVolumeClaimLister
podDefaults kubeflowv1alpha1listers.PodDefaultLister
notebooks kubeflowv1listers.NotebookLister
}

type clientsets struct {
kubernetes *kubernetes.Clientset
kubeflow *kubeflow.Clientset
notebooks *notebooksclient.Clientset
}

type server struct {
mux sync.Mutex

clientsets clientsets
listers listers
}

func main() {
var err error
gctx, gcancel := context.WithCancel(context.Background())

// Setup the default path to the of the kubeconfig file.
// TODO: This breaks the in-cluster config and needs to be commented out in those instances. Need to find a fix.
Expand Down Expand Up @@ -73,11 +85,7 @@ func main() {
log.Fatal(err)
}

// Generate the Notebooks clientset
s.clientsets.notebooks, err = notebooksclient.NewForConfig(config)
if err != nil {
log.Fatal(err)
}
err = s.setupListers(gctx)

// Generate the Gorilla Mux router
router := mux.NewRouter()
Expand All @@ -88,30 +96,30 @@ func main() {
router.HandleFunc("/api/namespaces/{namespace}/notebooks", s.checkAccess(authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: notebooksv1.GroupVersion.Group,
Group: kubeflowv1.SchemeGroupVersion.Group,
Verb: "list",
Resource: "notebooks",
Version: notebooksv1.GroupVersion.Version,
Version: kubeflowv1.SchemeGroupVersion.Version,
},
},
}, s.GetNotebooks)).Methods("GET")
router.HandleFunc("/api/namespaces/{namespace}/notebooks", s.checkAccess(authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: notebooksv1.GroupVersion.Group,
Group: kubeflowv1.SchemeGroupVersion.Group,
Verb: "create",
Resource: "notebooks",
Version: notebooksv1.GroupVersion.Version,
Version: kubeflowv1.SchemeGroupVersion.Version,
},
},
}, s.NewNotebook)).Headers("Content-Type", "application/json").Methods("POST")
router.HandleFunc("/api/namespaces/{namespace}/notebooks/{notebook}", s.checkAccess(authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: notebooksv1.GroupVersion.Group,
Group: kubeflowv1.SchemeGroupVersion.Group,
Verb: "delete",
Resource: "notebooks",
Version: notebooksv1.GroupVersion.Version,
Version: kubeflowv1.SchemeGroupVersion.Version,
},
},
}, s.DeleteNotebook)).Methods("DELETE")
Expand Down Expand Up @@ -166,6 +174,9 @@ func main() {
// Block until we receive our signal
<-c

// Cancel global context
gcancel()

// Create a deadline to wait for
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
Expand Down
71 changes: 33 additions & 38 deletions notebooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"sort"
"strings"

notebooksv1 "github.com/StatCan/jupyter-apis/notebooks/api/v1"
kubeflowv1 "github.com/StatCan/kubeflow-controller/pkg/apis/kubeflowcontroller/v1"
"github.com/andanhm/go-prettytime"
"github.com/gorilla/mux"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)

const DefaultServiceAccountName string = "default-editor"
Expand All @@ -42,8 +43,8 @@ type volumerequest struct {
}

type gpurequest struct {
Quantity resource.Quantity `json:"num"`
Vendor string `json:"vendor"`
Quantity string `json:"num"`
Vendor string `json:"vendor"`
}

type newnotebookrequest struct {
Expand Down Expand Up @@ -105,21 +106,7 @@ const (
GPUVendorAMD GPUVendor = "amd"
)

type EventsByTimestamp []corev1.Event

func (e EventsByTimestamp) Len() int {
return len(e)
}

func (e EventsByTimestamp) Less(a, b int) bool {
return e[b].CreationTimestamp.Before(&e[a].CreationTimestamp)
}

func (e EventsByTimestamp) Swap(a, b int) {
e[a], e[b] = e[b], e[a]
}

func processStatus(notebook notebooksv1.Notebook, events []corev1.Event) (Status, string) {
func processStatus(notebook *kubeflowv1.Notebook, events []*corev1.Event) (Status, string) {
// Notebook is being deleted
if notebook.DeletionTimestamp != nil {
return StatusWaiting, "Deleting Notebook Server"
Expand Down Expand Up @@ -155,7 +142,7 @@ func processStatus(notebook notebooksv1.Notebook, events []corev1.Event) (Status
return "", ""
}

func processGPU(notebook notebooksv1.Notebook) (resource.Quantity, GPUVendor) {
func processGPU(notebook *kubeflowv1.Notebook) (resource.Quantity, GPUVendor) {
if limit, ok := notebook.Spec.Template.Spec.Containers[0].Resources.Limits["nvidia.com/gpu"]; ok {
return limit, GPUVendorNvidia
} else if limit, ok := notebook.Spec.Template.Spec.Containers[0].Resources.Limits["amd.com/gpu"]; ok {
Expand All @@ -171,36 +158,38 @@ func (s *server) GetNotebooks(w http.ResponseWriter, r *http.Request) {

log.Printf("loading notebooks for %q", namespace)

notebooks, err := s.clientsets.notebooks.V1().Notebooks(namespace).List(r.Context())
notebooks, err := s.listers.notebooks.Notebooks(namespace).List(labels.Everything())
if err != nil {
s.error(w, r, err)
return
}

sort.Sort(notebooksByName(notebooks))

resp := notebooksresponse{
APIResponse: APIResponse{
Success: true,
},
Notebooks: make([]notebookresponse, 0),
}

for _, notebook := range notebooks.Items {
for _, notebook := range notebooks {
// Load events
allevents, err := s.clientsets.kubernetes.CoreV1().Events(notebook.Namespace).List(r.Context(), v1.ListOptions{
FieldSelector: fmt.Sprintf("involvedObject.kind=Notebook,involvedObject.name=%s", notebook.Name),
})
allevents, err := s.listers.events.Events(notebook.Namespace).List(labels.Everything())
if err != nil {
log.Printf("failed to load events for %s/%s: %v", notebook.Namespace, notebook.Name, err)
}

// Filter past events
events := make([]corev1.Event, 0)
for _, event := range allevents.Items {
if !event.CreationTimestamp.Before(&notebook.CreationTimestamp) {
events = append(events, event)
events := make([]*corev1.Event, 0)
for _, event := range allevents {
if event.InvolvedObject.Kind != "Notebook" || event.InvolvedObject.Name != notebook.Name || event.CreationTimestamp.Before(&notebook.CreationTimestamp) {
continue
}

events = append(events, event)
}
sort.Sort(EventsByTimestamp(events))
sort.Sort(eventsByTimestamp(events))

imageparts := strings.SplitAfter(notebook.Spec.Template.Spec.Containers[0].Image, "/")

Expand Down Expand Up @@ -234,7 +223,7 @@ func (s *server) GetNotebooks(w http.ResponseWriter, r *http.Request) {
s.respond(w, r, resp)
}

func (s *server) handleVolume(ctx context.Context, req volumerequest, notebook *notebooksv1.Notebook) error {
func (s *server) handleVolume(ctx context.Context, req volumerequest, notebook *kubeflowv1.Notebook) error {
if req.Type == VolumeTypeNew {
// Create the PVC
pvc := corev1.PersistentVolumeClaim{
Expand Down Expand Up @@ -308,13 +297,13 @@ func (s *server) NewNotebook(w http.ResponseWriter, r *http.Request) {

// Setup the notebook
// TODO: Work with default CPU/memory limits from config
notebook := notebooksv1.Notebook{
notebook := kubeflowv1.Notebook{
ObjectMeta: v1.ObjectMeta{
Name: req.Name,
Namespace: namespace,
},
Spec: notebooksv1.NotebookSpec{
Template: notebooksv1.NotebookTemplateSpec{
Spec: kubeflowv1.NotebookSpec{
Template: kubeflowv1.NotebookTemplateSpec{
Spec: corev1.PodSpec{
ServiceAccountName: DefaultServiceAccountName,
Containers: []corev1.Container{
Expand Down Expand Up @@ -374,15 +363,21 @@ func (s *server) NewNotebook(w http.ResponseWriter, r *http.Request) {
}

// Add GPU
if !req.GPUs.Quantity.IsZero() {
notebook.Spec.Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(req.GPUs.Vendor)] = req.GPUs.Quantity
notebook.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(req.GPUs.Vendor)] = req.GPUs.Quantity
if req.GPUs.Quantity != "none" {
qty, err := resource.ParseQuantity(req.GPUs.Quantity)
if err != nil {
s.error(w, r, err)
return
}

notebook.Spec.Template.Spec.Containers[0].Resources.Requests[corev1.ResourceName(req.GPUs.Vendor)] = qty
notebook.Spec.Template.Spec.Containers[0].Resources.Limits[corev1.ResourceName(req.GPUs.Vendor)] = qty
}

log.Printf("creating notebook %q for %q", notebook.ObjectMeta.Name, namespace)

// Submit the notebook to the API server
_, err = s.clientsets.notebooks.V1().Notebooks(namespace).Create(r.Context(), &notebook)
_, err = s.clientsets.kubeflow.KubeflowV1().Notebooks(namespace).Create(r.Context(), &notebook, v1.CreateOptions{})
if err != nil {
s.error(w, r, err)
return
Expand All @@ -401,7 +396,7 @@ func (s *server) DeleteNotebook(w http.ResponseWriter, r *http.Request) {
log.Printf("deleting notebook %q for %q", notebook, namespace)

propagation := v1.DeletePropagationForeground
err := s.clientsets.notebooks.V1().Notebooks(namespace).Delete(r.Context(), notebook, &v1.DeleteOptions{
err := s.clientsets.kubeflow.KubeflowV1().Notebooks(namespace).Delete(r.Context(), notebook, v1.DeleteOptions{
PropagationPolicy: &propagation,
})
if err != nil {
Expand Down
Loading

0 comments on commit a8363a5

Please sign in to comment.