From ab6956b8d6f4895ec92702cd107c76bdef243a1d Mon Sep 17 00:00:00 2001 From: Eric Chiang Date: Wed, 12 Dec 2018 09:09:37 -0800 Subject: [PATCH] *: allow watches on individual resources --- resource.go | 16 ++++++- resource_test.go | 110 +++++++++++++++++++++++++++++++++++++++++++++++ watch.go | 16 +++++++ watch_test.go | 49 ++++++++++++++++----- 4 files changed, 179 insertions(+), 12 deletions(-) diff --git a/resource.go b/resource.go index 3cf32f5..fdb95c0 100644 --- a/resource.go +++ b/resource.go @@ -260,11 +260,25 @@ func resourceWatchURL(endpoint, namespace string, r Resource, options ...Option) return "", fmt.Errorf("unregistered type %T", r) } + // Hack to let watch work on individual resources + name := "" + if meta := r.GetMetadata(); meta != nil && meta.Name != nil { + name = *meta.Name + if meta.Namespace != nil { + // Ensure that namespaces aren't different. + ns := *meta.Namespace + if namespace != "" && ns != namespace { + return "", fmt.Errorf("different namespace provided on resource than to watch call") + } + namespace = ns + } + } + if !t.namespaced && namespace != "" { return "", fmt.Errorf("type not namespaced") } - url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, "", options...) + url := urlFor(endpoint, t.apiGroup, t.apiVersion, namespace, t.name, name, options...) if strings.Contains(url, "?") { url = url + "&watch=true" } else { diff --git a/resource_test.go b/resource_test.go index 8da7cf8..d6435dd 100644 --- a/resource_test.go +++ b/resource_test.go @@ -161,3 +161,113 @@ func TestResourceURL(t *testing.T) { }) } } + +func TestResourceWatchURL(t *testing.T) { + tests := []struct { + name string + endpoint string + namespace string + resource Resource + options []Option + want string + wantErr bool + }{ + { + name: "watch_pods", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Pod{}, + want: "https://k8s.example.com/foo/api/v1/namespaces/my-namespace/pods?watch=true", + }, + { + name: "watch_all_pods", + endpoint: "https://k8s.example.com/foo/", + resource: &Pod{}, + want: "https://k8s.example.com/foo/api/v1/pods?watch=true", + }, + { + name: "watch_deployments", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{}, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?watch=true", + }, + { + name: "watch_with_options", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{}, + options: []Option{ + Timeout(time.Minute), + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments?timeoutSeconds=60&watch=true", + }, + { + name: "watch_non_namespaced", + endpoint: "https://k8s.example.com/foo/", + resource: &ClusterRole{}, + want: "https://k8s.example.com/foo/apis/rbac.authorization.k8s.io/v1/clusterroles?watch=true", + }, + { + name: "watch_non_namespaced_with_namespace", + namespace: "my-namespace", + endpoint: "https://k8s.example.com/foo/", + resource: &ClusterRole{}, + wantErr: true, // can't provide a namespace for a non-namespaced resource + }, + { + name: "watch_deployment", + endpoint: "https://k8s.example.com/foo/", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Namespace: String("my-namespace"), + Name: String("my-deployment"), + }, + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true", + }, + { + name: "watch_deployment_ns_in_call", + endpoint: "https://k8s.example.com/foo/", + namespace: "my-namespace", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Name: String("my-deployment"), + }, + }, + want: "https://k8s.example.com/foo/apis/apps/v1beta2/namespaces/my-namespace/deployments/my-deployment?watch=true", + }, + { + name: "watch_deployment_mismatched_ns", + endpoint: "https://k8s.example.com/foo/", + namespace: "my-other-namespace", + resource: &Deployment{ + Metadata: &metav1.ObjectMeta{ + Namespace: String("my-namespace"), + Name: String("my-deployment"), + }, + }, + wantErr: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := resourceWatchURL( + test.endpoint, + test.namespace, + test.resource, + test.options..., + ) + if err != nil { + if !test.wantErr { + t.Fatalf("resourceWatchURL: %v", err) + } + return + } + if got != test.want { + t.Errorf("want: %q", test.want) + t.Errorf("got : %q", got) + } + }) + } +} diff --git a/watch.go b/watch.go index 91c82dc..8b145c0 100644 --- a/watch.go +++ b/watch.go @@ -160,6 +160,22 @@ func parseUnknown(b []byte) (*runtime.Unknown, error) { // fmt.Println(eventType, *cm.Metadata.Name) // } // +// To watch an individual resource, provide a resource with pre-populated +// metadata: +// +// // Watch "my-configmap" in "my-namespace" +// configMap := corev1.ConfigMap{ +// Metadata: &metav1.ObjectMeta{ +// Namespace: String("my-namespace"), +// Name: String("my-configmap"), +// }, +// } +// watcher, err := client.Watch(ctx, "", &configMap) +// if err != nil { +// // handle error +// } +// defer watcher.Close() // Always close the returned watcher. +// func (c *Client) Watch(ctx context.Context, namespace string, r Resource, options ...Option) (*Watcher, error) { url, err := resourceWatchURL(c.Endpoint, namespace, r, options...) if err != nil { diff --git a/watch_test.go b/watch_test.go index f1e7dc5..2f41284 100644 --- a/watch_test.go +++ b/watch_test.go @@ -24,14 +24,21 @@ func init() { k8s.Register("", "v1", "configmaps", true, &configMapJSON{}) } -func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() k8s.Resource, update func(cm k8s.Resource)) { - w, err := client.Watch(context.TODO(), namespace, newCM()) +func testWatch(t *testing.T, client *k8s.Client, namespace string, r k8s.Resource, newCM func() k8s.Resource, update func(cm k8s.Resource)) { + cm := newCM() + + if r.GetMetadata() != nil { + // Individual watch must created beforehand + if err := client.Create(context.TODO(), cm); err != nil { + t.Errorf("create configmap: %v", err) + return + } + } + w, err := client.Watch(context.TODO(), namespace, r) if err != nil { - t.Errorf("watch configmaps: %v", err) + t.Fatalf("watch configmaps: %v", err) } defer w.Close() - - cm := newCM() want := func(eventType string) { got := newCM() eT, err := w.Next(got) @@ -51,11 +58,13 @@ func testWatch(t *testing.T, client *k8s.Client, namespace string, newCM func() } } - if err := client.Create(context.TODO(), cm); err != nil { - t.Errorf("create configmap: %v", err) - return + if r.GetMetadata() == nil { + if err := client.Create(context.TODO(), cm); err != nil { + t.Errorf("create configmap: %v", err) + return + } + want(k8s.EventAdded) } - want(k8s.EventAdded) update(cm) @@ -86,7 +95,7 @@ func TestWatchConfigMapJSON(t *testing.T) { updateCM := func(cm k8s.Resource) { (cm.(*configMapJSON)).Data = map[string]string{"hello": "world"} } - testWatch(t, client, namespace, newCM, updateCM) + testWatch(t, client, namespace, &configMapJSON{}, newCM, updateCM) }) } @@ -104,6 +113,24 @@ func TestWatchConfigMapProto(t *testing.T) { updateCM := func(cm k8s.Resource) { (cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"} } - testWatch(t, client, namespace, newCM, updateCM) + testWatch(t, client, namespace, &corev1.ConfigMap{}, newCM, updateCM) + }) +} + +func TestWatchIndividualConfigMap(t *testing.T) { + withNamespace(t, func(client *k8s.Client, namespace string) { + newCM := func() k8s.Resource { + return &corev1.ConfigMap{ + Metadata: &metav1.ObjectMeta{ + Name: k8s.String("my-configmap"), + Namespace: &namespace, + }, + } + } + + updateCM := func(cm k8s.Resource) { + (cm.(*corev1.ConfigMap)).Data = map[string]string{"hello": "world"} + } + testWatch(t, client, namespace, newCM(), newCM, updateCM) }) }