Skip to content

Commit 9a2ad8a

Browse files
committed
Add periodic kubeconfig refresh functionality
This commit adds the ability to periodically re-read the kubeconfig file to reflect changes like token updates. The implementation includes: 1. Thread-safe client refresh mechanism 2. Periodic refresh with configurable interval 3. Command-line flag to enable and configure refresh interval 4. Comprehensive test coverage
1 parent 505e281 commit 9a2ad8a

File tree

3 files changed

+386
-9
lines changed

3 files changed

+386
-9
lines changed

cmd/server/main.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func main() {
1919
addr := flag.String("addr", ":8080", "Address to listen on")
2020
serveResources := flag.Bool("serve-resources", true, "Whether to serve cluster resources as MCP resources. Setting to false can reduce context size for LLMs when working with large clusters")
2121
readWrite := flag.Bool("read-write", false, "Whether to allow write operations on the cluster. When false, the server operates in read-only mode")
22+
kubeconfigRefreshInterval := flag.Duration("kubeconfig-refresh-interval", 0, "Interval to periodically re-read the kubeconfig (e.g., 5m for 5 minutes). If 0, no refresh will be performed")
2223
flag.Parse()
2324

2425
// Create a context that can be cancelled
@@ -39,6 +40,20 @@ func main() {
3940
if err != nil {
4041
log.Fatalf("Failed to create Kubernetes client: %v", err)
4142
}
43+
44+
// Start periodic refresh if interval is set
45+
if *kubeconfigRefreshInterval > 0 {
46+
log.Printf("Starting periodic kubeconfig refresh every %v", *kubeconfigRefreshInterval)
47+
if err := k8sClient.StartPeriodicRefresh(*kubeconfigRefreshInterval); err != nil {
48+
log.Fatalf("Failed to start periodic kubeconfig refresh: %v", err)
49+
}
50+
// Ensure we stop the refresh when shutting down
51+
defer func() {
52+
if err := k8sClient.StopPeriodicRefresh(); err != nil {
53+
log.Printf("Error stopping periodic kubeconfig refresh: %v", err)
54+
}
55+
}()
56+
}
4257

4358
// Create MCP server config
4459
config := &mcp.Config{
@@ -51,10 +66,10 @@ func main() {
5166

5267
// Create SSE server
5368
sseServer := mcp.CreateSSEServer(mcpServer)
54-
69+
5570
// Channel to receive server errors
5671
serverErrCh := make(chan error, 1)
57-
72+
5873
// Start the server in a goroutine
5974
go func() {
6075
log.Printf("Starting MCP server on %s", *addr)
@@ -63,19 +78,19 @@ func main() {
6378
serverErrCh <- err
6479
}
6580
}()
66-
81+
6782
// Wait for either a server error or a shutdown signal
6883
select {
6984
case err := <-serverErrCh:
7085
log.Fatalf("Server failed to start: %v", err)
7186
case <-ctx.Done():
7287
log.Println("Shutting down server...")
7388
}
74-
89+
7590
// Create a context with timeout for shutdown
7691
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second)
7792
defer shutdownCancel()
78-
93+
7994
// Attempt to shut down the server gracefully
8095
shutdownCh := make(chan error, 1)
8196
go func() {
@@ -87,7 +102,7 @@ func main() {
87102
shutdownCh <- err
88103
close(shutdownCh)
89104
}()
90-
105+
91106
// Wait for shutdown to complete or timeout
92107
select {
93108
case err, ok := <-shutdownCh:
@@ -103,7 +118,7 @@ func main() {
103118
// Force exit after timeout
104119
os.Exit(1)
105120
}
106-
121+
107122
log.Println("Server shutdown complete, exiting...")
108123
// Ensure we exit the program
109124
os.Exit(0)

pkg/k8s/client.go

Lines changed: 175 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"fmt"
66
"path/filepath"
7+
"sync"
8+
"time"
79

810
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
911
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -25,6 +27,15 @@ type Client struct {
2527
dynamicClient dynamic.Interface
2628
clientset kubernetes.Interface
2729
getPodLogs PodLogsFunc
30+
kubeconfigPath string
31+
mu sync.RWMutex // Protects access to client components
32+
33+
// For periodic refresh
34+
refreshCtx context.Context
35+
refreshCancel context.CancelFunc
36+
refreshInterval time.Duration
37+
refreshing bool
38+
refreshMu sync.Mutex // Protects refreshing state
2839
}
2940

3041
// NewClient creates a new Kubernetes client
@@ -56,6 +67,7 @@ func NewClient(kubeconfigPath string) (*Client, error) {
5667
discoveryClient: discoveryClient,
5768
dynamicClient: dynamicClient,
5869
clientset: clientset,
70+
kubeconfigPath: kubeconfigPath,
5971
}
6072

6173
// Set the default implementation for getPodLogs
@@ -64,8 +76,11 @@ func NewClient(kubeconfigPath string) (*Client, error) {
6476
return client, nil
6577
}
6678

67-
// getConfig returns a Kubernetes client configuration
68-
func getConfig(kubeconfigPath string) (*rest.Config, error) {
79+
// ConfigGetter is a function type for getting Kubernetes client configuration
80+
type ConfigGetter func(kubeconfigPath string) (*rest.Config, error)
81+
82+
// defaultConfigGetter is the default implementation of ConfigGetter
83+
func defaultConfigGetter(kubeconfigPath string) (*rest.Config, error) {
6984
if kubeconfigPath == "" {
7085
// Try in-cluster config first
7186
config, err := rest.InClusterConfig()
@@ -83,8 +98,14 @@ func getConfig(kubeconfigPath string) (*rest.Config, error) {
8398
return clientcmd.BuildConfigFromFlags("", kubeconfigPath)
8499
}
85100

101+
// getConfig is the current ConfigGetter implementation
102+
var getConfig ConfigGetter = defaultConfigGetter
103+
86104
// ListAPIResources returns all API resources supported by the Kubernetes API server
87105
func (c *Client) ListAPIResources(ctx context.Context) ([]*metav1.APIResourceList, error) {
106+
c.mu.RLock()
107+
defer c.mu.RUnlock()
108+
88109
_, resourcesList, err := c.discoveryClient.ServerGroupsAndResources()
89110
if err != nil {
90111
return nil, fmt.Errorf("failed to get server resources: %w", err)
@@ -94,16 +115,25 @@ func (c *Client) ListAPIResources(ctx context.Context) ([]*metav1.APIResourceLis
94115

95116
// ListClusteredResources returns all clustered resources of the specified group/version/kind
96117
func (c *Client) ListClusteredResources(ctx context.Context, gvr schema.GroupVersionResource) (*unstructured.UnstructuredList, error) {
118+
c.mu.RLock()
119+
defer c.mu.RUnlock()
120+
97121
return c.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})
98122
}
99123

100124
// ListNamespacedResources returns all namespaced resources of the specified group/version/kind in the given namespace
101125
func (c *Client) ListNamespacedResources(ctx context.Context, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, error) {
126+
c.mu.RLock()
127+
defer c.mu.RUnlock()
128+
102129
return c.dynamicClient.Resource(gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})
103130
}
104131

105132
// ApplyClusteredResource creates or updates a clustered resource
106133
func (c *Client) ApplyClusteredResource(ctx context.Context, gvr schema.GroupVersionResource, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
134+
c.mu.RLock()
135+
defer c.mu.RUnlock()
136+
107137
name := obj.GetName()
108138

109139
// Check if resource exists
@@ -122,16 +152,25 @@ func (c *Client) ApplyClusteredResource(ctx context.Context, gvr schema.GroupVer
122152

123153
// GetClusteredResource gets a clustered resource
124154
func (c *Client) GetClusteredResource(ctx context.Context, gvr schema.GroupVersionResource, name string) (interface{}, error) {
155+
c.mu.RLock()
156+
defer c.mu.RUnlock()
157+
125158
return c.dynamicClient.Resource(gvr).Get(ctx, name, metav1.GetOptions{})
126159
}
127160

128161
// GetNamespacedResource gets a namespaced resource
129162
func (c *Client) GetNamespacedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace, name string) (interface{}, error) {
163+
c.mu.RLock()
164+
defer c.mu.RUnlock()
165+
130166
return c.dynamicClient.Resource(gvr).Namespace(namespace).Get(ctx, name, metav1.GetOptions{})
131167
}
132168

133169
// ApplyNamespacedResource creates or updates a namespaced resource
134170
func (c *Client) ApplyNamespacedResource(ctx context.Context, gvr schema.GroupVersionResource, namespace string, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
171+
c.mu.RLock()
172+
defer c.mu.RUnlock()
173+
135174
name := obj.GetName()
136175

137176
// Check if resource exists
@@ -149,31 +188,165 @@ func (c *Client) ApplyNamespacedResource(ctx context.Context, gvr schema.GroupVe
149188

150189
// SetDynamicClient sets the dynamic client (for testing purposes)
151190
func (c *Client) SetDynamicClient(dynamicClient dynamic.Interface) {
191+
c.mu.Lock()
192+
defer c.mu.Unlock()
193+
152194
c.dynamicClient = dynamicClient
153195
}
154196

155197
// SetDiscoveryClient sets the discovery client (for testing purposes)
156198
func (c *Client) SetDiscoveryClient(discoveryClient discovery.DiscoveryInterface) {
199+
c.mu.Lock()
200+
defer c.mu.Unlock()
201+
157202
c.discoveryClient = discoveryClient
158203
}
159204

160205
// SetClientset sets the clientset (for testing purposes)
161206
func (c *Client) SetClientset(clientset kubernetes.Interface) {
207+
c.mu.Lock()
208+
defer c.mu.Unlock()
209+
162210
// Store the interface directly, we'll use it through the interface methods
163211
c.clientset = clientset
164212
}
165213

166214
// SetPodLogsFunc sets the function used to get pod logs (for testing purposes)
167215
func (c *Client) SetPodLogsFunc(getPodLogs PodLogsFunc) {
216+
c.mu.Lock()
217+
defer c.mu.Unlock()
218+
168219
c.getPodLogs = getPodLogs
169220
}
170221

171222
// GetPodLogs returns the current pod logs function
172223
func (c *Client) GetPodLogs() PodLogsFunc {
224+
c.mu.RLock()
225+
defer c.mu.RUnlock()
226+
173227
return c.getPodLogs
174228
}
175229

176230
// IsReady returns true if the client is ready to use
177231
func (c *Client) IsReady() bool {
232+
c.mu.RLock()
233+
defer c.mu.RUnlock()
234+
178235
return c.discoveryClient != nil && c.dynamicClient != nil && c.clientset != nil
236+
}
237+
238+
// RefreshClient re-reads the kubeconfig and updates the client's components
239+
func (c *Client) RefreshClient() error {
240+
// Get the updated config
241+
config, err := getConfig(c.kubeconfigPath)
242+
if err != nil {
243+
return fmt.Errorf("failed to get updated Kubernetes config: %w", err)
244+
}
245+
246+
// Create new discovery client
247+
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
248+
if err != nil {
249+
return fmt.Errorf("failed to create discovery client: %w", err)
250+
}
251+
252+
// Create new dynamic client
253+
dynamicClient, err := dynamic.NewForConfig(config)
254+
if err != nil {
255+
return fmt.Errorf("failed to create dynamic client: %w", err)
256+
}
257+
258+
// Create new clientset for typed API access
259+
clientset, err := kubernetes.NewForConfig(config)
260+
if err != nil {
261+
return fmt.Errorf("failed to create clientset: %w", err)
262+
}
263+
264+
// Update the client's components with proper locking
265+
c.mu.Lock()
266+
defer c.mu.Unlock()
267+
268+
c.discoveryClient = discoveryClient
269+
c.dynamicClient = dynamicClient
270+
c.clientset = clientset
271+
272+
return nil
273+
}
274+
275+
// StartPeriodicRefresh starts a goroutine that periodically refreshes the client's configuration
276+
// The interval specifies how often to refresh the configuration
277+
// Returns an error if refresh is already running
278+
func (c *Client) StartPeriodicRefresh(interval time.Duration) error {
279+
c.refreshMu.Lock()
280+
defer c.refreshMu.Unlock()
281+
282+
if c.refreshing {
283+
return fmt.Errorf("periodic refresh is already running")
284+
}
285+
286+
// Create a cancellable context for the refresh goroutine
287+
ctx, cancel := context.WithCancel(context.Background())
288+
c.refreshCtx = ctx
289+
c.refreshCancel = cancel
290+
c.refreshInterval = interval
291+
c.refreshing = true
292+
293+
// Start the refresh goroutine
294+
go func() {
295+
ticker := time.NewTicker(interval)
296+
defer ticker.Stop()
297+
298+
for {
299+
select {
300+
case <-ticker.C:
301+
// Refresh the client
302+
if err := c.RefreshClient(); err != nil {
303+
// Log the error but continue refreshing
304+
fmt.Printf("Error refreshing Kubernetes client: %v\n", err)
305+
}
306+
case <-ctx.Done():
307+
// Context cancelled, stop refreshing
308+
return
309+
}
310+
}
311+
}()
312+
313+
return nil
314+
}
315+
316+
// StopPeriodicRefresh stops the periodic refresh goroutine
317+
// Returns an error if refresh is not running
318+
func (c *Client) StopPeriodicRefresh() error {
319+
c.refreshMu.Lock()
320+
defer c.refreshMu.Unlock()
321+
322+
if !c.refreshing {
323+
return fmt.Errorf("periodic refresh is not running")
324+
}
325+
326+
// Cancel the refresh context to stop the goroutine
327+
c.refreshCancel()
328+
c.refreshing = false
329+
330+
return nil
331+
}
332+
333+
// IsRefreshing returns true if the client is periodically refreshing
334+
func (c *Client) IsRefreshing() bool {
335+
c.refreshMu.Lock()
336+
defer c.refreshMu.Unlock()
337+
338+
return c.refreshing
339+
}
340+
341+
// GetRefreshInterval returns the current refresh interval
342+
// Returns 0 if refresh is not running
343+
func (c *Client) GetRefreshInterval() time.Duration {
344+
c.refreshMu.Lock()
345+
defer c.refreshMu.Unlock()
346+
347+
if !c.refreshing {
348+
return 0
349+
}
350+
351+
return c.refreshInterval
179352
}

0 commit comments

Comments
 (0)