Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 48 additions & 45 deletions pkg/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ type Client struct {
getPodLogs PodLogsFunc
kubeconfigPath string
mu sync.RWMutex // Protects access to client components

// For periodic refresh
refreshCtx context.Context
refreshCancel context.CancelFunc
refreshInterval time.Duration
refreshing bool
refreshMu sync.Mutex // Protects refreshing state
refreshCtx context.Context
refreshCancel context.CancelFunc
refreshInterval time.Duration
refreshing bool
refreshMu sync.Mutex // Protects refreshing state
}

// NewClient creates a new Kubernetes client
Expand Down Expand Up @@ -69,10 +69,10 @@ func NewClient(kubeconfigPath string) (*Client, error) {
clientset: clientset,
kubeconfigPath: kubeconfigPath,
}

// Set the default implementation for getPodLogs
client.getPodLogs = client.defaultGetPodLogs

return client, nil
}

Expand Down Expand Up @@ -105,7 +105,7 @@ var getConfig ConfigGetter = defaultConfigGetter
func (c *Client) ListAPIResources(ctx context.Context) ([]*metav1.APIResourceList, error) {
c.mu.RLock()
defer c.mu.RUnlock()

_, resourcesList, err := c.discoveryClient.ServerGroupsAndResources()
if err != nil {
return nil, fmt.Errorf("failed to get server resources: %w", err)
Expand All @@ -114,28 +114,32 @@ func (c *Client) ListAPIResources(ctx context.Context) ([]*metav1.APIResourceLis
}

// ListClusteredResources returns all clustered resources of the specified group/version/kind
func (c *Client) ListClusteredResources(ctx context.Context, gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
func (c *Client) ListClusteredResources(ctx context.Context, gvr schema.GroupVersionResource, labelSelector string) (*unstructured.UnstructuredList, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})

return c.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
}

// ListNamespacedResources returns all namespaced resources of the specified group/version/kind in the given namespace
func (c *Client) ListNamespacedResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) {
func (c *Client) ListNamespacedResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string, labelSelector string) (*unstructured.UnstructuredList, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})

return c.dynamicClient.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
}

// ApplyClusteredResource creates or updates a clustered resource
func (c *Client) ApplyClusteredResource(ctx context.Context, gvr schema.GroupVersionResource, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
c.mu.RLock()
defer c.mu.RUnlock()

name := obj.GetName()

// Check if resource exists
existing, err := c.dynamicClient.Resource(gvr).Get(ctx, name, metav1.GetOptions{})
if err == nil {
Expand All @@ -144,8 +148,7 @@ func (c *Client) ApplyClusteredResource(ctx context.Context, gvr schema.GroupVer
obj.SetResourceVersion(existing.GetResourceVersion())
return c.dynamicClient.Resource(gvr).Update(ctx, obj, metav1.UpdateOptions{})
}



// Resource doesn't exist or error occurred, create it
return c.dynamicClient.Resource(gvr).Create(ctx, obj, metav1.CreateOptions{})
}
Expand All @@ -154,25 +157,25 @@ func (c *Client) ApplyClusteredResource(ctx context.Context, gvr schema.GroupVer
func (c *Client) GetClusteredResource(ctx context.Context, gvr schema.GroupVersionResource, name string) (interface{}, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).Get(ctx, name, metav1.GetOptions{})
}

// GetNamespacedResource gets a namespaced resource
func (c *Client) GetNamespacedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (interface{}, error) {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
}

// ApplyNamespacedResource creates or updates a namespaced resource
func (c *Client) ApplyNamespacedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace string, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
c.mu.RLock()
defer c.mu.RUnlock()

name := obj.GetName()

// Check if resource exists
existing, err := c.dynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
if err == nil {
Expand All @@ -181,7 +184,7 @@ func (c *Client) ApplyNamespacedResource(ctx context.Context, gvr schema.GroupVe
obj.SetResourceVersion(existing.GetResourceVersion())
return c.dynamicClient.Resource(gvr).Namespace(namespace).Update(ctx, obj, metav1.UpdateOptions{})
}

// Resource doesn't exist or error occurred, create it
return c.dynamicClient.Resource(gvr).Namespace(namespace).Create(ctx, obj, metav1.CreateOptions{})
}
Expand All @@ -190,39 +193,39 @@ func (c *Client) ApplyNamespacedResource(ctx context.Context, gvr schema.GroupVe
func (c *Client) DeleteClusteredResource(ctx context.Context, gvr schema.GroupVersionResource, name string) error {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).Delete(ctx, name, metav1.DeleteOptions{})
}

// DeleteNamespacedResource deletes a namespaced resource
func (c *Client) DeleteNamespacedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) error {
c.mu.RLock()
defer c.mu.RUnlock()

return c.dynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
}

// SetDynamicClient sets the dynamic client (for testing purposes)
func (c *Client) SetDynamicClient(dynamicClient dynamic.Interface) {
c.mu.Lock()
defer c.mu.Unlock()

c.dynamicClient = dynamicClient
}

// SetDiscoveryClient sets the discovery client (for testing purposes)
func (c *Client) SetDiscoveryClient(discoveryClient discovery.DiscoveryInterface) {
c.mu.Lock()
defer c.mu.Unlock()

c.discoveryClient = discoveryClient
}

// SetClientset sets the clientset (for testing purposes)
func (c *Client) SetClientset(clientset kubernetes.Interface) {
c.mu.Lock()
defer c.mu.Unlock()

// Store the interface directly, we'll use it through the interface methods
c.clientset = clientset
}
Expand All @@ -231,23 +234,23 @@ func (c *Client) SetClientset(clientset kubernetes.Interface) {
func (c *Client) SetPodLogsFunc(getPodLogs PodLogsFunc) {
c.mu.Lock()
defer c.mu.Unlock()

c.getPodLogs = getPodLogs
}

// GetPodLogs returns the current pod logs function
func (c *Client) GetPodLogs() PodLogsFunc {
c.mu.RLock()
defer c.mu.RUnlock()

return c.getPodLogs
}

// IsReady returns true if the client is ready to use
func (c *Client) IsReady() bool {
c.mu.RLock()
defer c.mu.RUnlock()

return c.discoveryClient != nil && c.dynamicClient != nil && c.clientset != nil
}

Expand Down Expand Up @@ -280,7 +283,7 @@ func (c *Client) RefreshClient() error {
// Update the client's components with proper locking
c.mu.Lock()
defer c.mu.Unlock()

c.discoveryClient = discoveryClient
c.dynamicClient = dynamicClient
c.clientset = clientset
Expand All @@ -294,23 +297,23 @@ func (c *Client) RefreshClient() error {
func (c *Client) StartPeriodicRefresh(interval time.Duration) error {
c.refreshMu.Lock()
defer c.refreshMu.Unlock()

if c.refreshing {
return fmt.Errorf("periodic refresh is already running")
}

// Create a cancellable context for the refresh goroutine
ctx, cancel := context.WithCancel(context.Background())
c.refreshCtx = ctx
c.refreshCancel = cancel
c.refreshInterval = interval
c.refreshing = true

// Start the refresh goroutine
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
Expand All @@ -325,7 +328,7 @@ func (c *Client) StartPeriodicRefresh(interval time.Duration) error {
}
}
}()

return nil
}

Expand All @@ -334,23 +337,23 @@ func (c *Client) StartPeriodicRefresh(interval time.Duration) error {
func (c *Client) StopPeriodicRefresh() error {
c.refreshMu.Lock()
defer c.refreshMu.Unlock()

if !c.refreshing {
return fmt.Errorf("periodic refresh is not running")
}

// Cancel the refresh context to stop the goroutine
c.refreshCancel()
c.refreshing = false

return nil
}

// IsRefreshing returns true if the client is periodically refreshing
func (c *Client) IsRefreshing() bool {
c.refreshMu.Lock()
defer c.refreshMu.Unlock()

return c.refreshing
}

Expand All @@ -359,10 +362,10 @@ func (c *Client) IsRefreshing() bool {
func (c *Client) GetRefreshInterval() time.Duration {
c.refreshMu.Lock()
defer c.refreshMu.Unlock()

if !c.refreshing {
return 0
}

return c.refreshInterval
}
}
Loading