-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clusters Client #1814
Clusters Client #1814
Changes from 18 commits
4c08caa
2836dde
0a90aac
af89a74
a3e571f
97507d2
4b483e0
c497fd9
369cde7
9d814e1
71b883e
a990431
856d7ab
b521a72
1ac8240
0a62e59
134005a
7456493
35c571c
f00885a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
package clustersmngr | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"sync" | ||
|
||
"sigs.k8s.io/controller-runtime/pkg/client" | ||
) | ||
|
||
// Client thin wrapper to controller-runtime/client adding multi clusters context. | ||
type Client interface { | ||
// Get retrieves an obj for the given object key. | ||
Get(ctx context.Context, cluster string, key client.ObjectKey, obj client.Object) error | ||
// List retrieves list of objects for a given namespace and list options. | ||
List(ctx context.Context, cluster string, list client.ObjectList, opts ...client.ListOption) error | ||
|
||
// Create saves the object obj. | ||
Create(ctx context.Context, cluster string, obj client.Object, opts ...client.CreateOption) error | ||
// Delete deletes the given obj | ||
Delete(ctx context.Context, cluster string, obj client.Object, opts ...client.DeleteOption) error | ||
// Update updates the given obj. | ||
Update(ctx context.Context, cluster string, obj client.Object, opts ...client.UpdateOption) error | ||
// Patch patches the given obj | ||
Patch(ctx context.Context, cluster string, obj client.Object, patch client.Patch, opts ...client.PatchOption) error | ||
|
||
// ClusteredList retrieves list of objects for all clusters. | ||
ClusteredList(ctx context.Context, clist ClusteredObjectList, opts ...client.ListOption) error | ||
|
||
// ClientsPool returns the clients pool. | ||
ClientsPool() ClientsPool | ||
} | ||
|
||
type ClusterNotFoundError struct { | ||
Cluster string | ||
} | ||
|
||
func (e ClusterNotFoundError) Error() string { | ||
return fmt.Sprintf("cluster=%s not found", e.Cluster) | ||
} | ||
|
||
type clustersClient struct { | ||
pool ClientsPool | ||
} | ||
|
||
func NewClient(clientsPool ClientsPool) Client { | ||
return &clustersClient{ | ||
pool: clientsPool, | ||
} | ||
} | ||
|
||
func (c *clustersClient) ClientsPool() ClientsPool { | ||
return c.pool | ||
} | ||
|
||
func (c *clustersClient) Get(ctx context.Context, cluster string, key client.ObjectKey, obj client.Object) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.Get(ctx, key, obj) | ||
} | ||
|
||
func (c *clustersClient) List(ctx context.Context, cluster string, list client.ObjectList, opts ...client.ListOption) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.List(ctx, list, opts...) | ||
} | ||
|
||
func (c *clustersClient) ClusteredList(ctx context.Context, clist ClusteredObjectList, opts ...client.ListOption) error { | ||
wg := sync.WaitGroup{} | ||
|
||
var errs []error | ||
|
||
for clusterName, c := range c.pool.Clients() { | ||
wg.Add(1) | ||
|
||
go func(clusterName string, c client.Client) { | ||
defer wg.Done() | ||
|
||
list := clist.ObjectList(clusterName) | ||
|
||
if err := c.List(ctx, list, opts...); err != nil { | ||
errs = append(errs, fmt.Errorf("cluster=\"%s\" err=\"%s\"", clusterName, err)) | ||
} | ||
}(clusterName, c) | ||
} | ||
|
||
wg.Wait() | ||
|
||
if len(errs) > 0 { | ||
return fmt.Errorf("failed to list resources: %s", errs) | ||
} | ||
Comment on lines
+87
to
+89
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify: if one of these Lists fails we will return the whole thing as a failure? Is this what we want? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made myself the same question, and I don't think the end goal would be this, but we need to coordinate with the UI to settle down the best way to handle this "partial success" response. |
||
|
||
return nil | ||
} | ||
|
||
func (c *clustersClient) Create(ctx context.Context, cluster string, obj client.Object, opts ...client.CreateOption) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.Create(ctx, obj, opts...) | ||
} | ||
|
||
func (c *clustersClient) Delete(ctx context.Context, cluster string, obj client.Object, opts ...client.DeleteOption) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.Delete(ctx, obj, opts...) | ||
} | ||
|
||
func (c *clustersClient) Update(ctx context.Context, cluster string, obj client.Object, opts ...client.UpdateOption) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.Update(ctx, obj, opts...) | ||
} | ||
|
||
func (c *clustersClient) Patch(ctx context.Context, cluster string, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { | ||
client := c.pool.Clients()[cluster] | ||
if client == nil { | ||
return ClusterNotFoundError{Cluster: cluster} | ||
} | ||
|
||
return client.Patch(ctx, obj, patch, opts...) | ||
} | ||
|
||
type ClusteredObjectList interface { | ||
ObjectList(cluster string) client.ObjectList | ||
Lists() map[string]client.ObjectList | ||
} | ||
|
||
type ClusteredList struct { | ||
sync.Mutex | ||
|
||
listFactory func() client.ObjectList | ||
lists map[string]client.ObjectList | ||
} | ||
|
||
func NewClusteredList(listFactory func() client.ObjectList) ClusteredObjectList { | ||
return &ClusteredList{ | ||
listFactory: listFactory, | ||
lists: make(map[string]client.ObjectList), | ||
} | ||
} | ||
|
||
func (cl *ClusteredList) ObjectList(cluster string) client.ObjectList { | ||
cl.Lock() | ||
defer cl.Unlock() | ||
|
||
cl.lists[cluster] = cl.listFactory() | ||
|
||
return cl.lists[cluster] | ||
} | ||
|
||
func (cl *ClusteredList) Lists() map[string]client.ObjectList { | ||
cl.Lock() | ||
defer cl.Unlock() | ||
|
||
return cl.lists | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that
c.pool.Clients()[cluster]
hurts my eyes a bit, totally personal taste. I know it does not fail, but my brain constantly says "it should fail if the key is not defined".Not a blocker at all. That's only extra sugar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agree, much better, pushing the change