From a3aeacc903b56c4130060272cf4b83f7d69765bf Mon Sep 17 00:00:00 2001 From: Jason Madigan Date: Tue, 28 Jan 2025 12:46:42 +0000 Subject: [PATCH] initial PoC, add a websocket and pipe Signed-off-by: Jason Madigan --- cmd/topology.go | 316 ++++++++++++++++++++++++++++++++++++++++++------ go.mod | 1 + go.sum | 2 + 3 files changed, 284 insertions(+), 35 deletions(-) diff --git a/cmd/topology.go b/cmd/topology.go index 0cec6d4..028fcb3 100644 --- a/cmd/topology.go +++ b/cmd/topology.go @@ -2,49 +2,83 @@ package cmd import ( "bytes" + "context" "errors" + "net/http" "os" "os/exec" - "strings" + "os/signal" + "sync" + "syscall" "github.com/goccy/go-graphviz" + "github.com/gorilla/websocket" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes/scheme" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/manager" ) var ( topologyNS string topologySVGOutputFile string topologyDOTOutputFile string + watchFlag bool + websocketFlag bool ) +var ( + connections = make(map[*websocket.Conn]bool) + connectionsMu sync.Mutex + upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } +) + +type configMapWatcher struct { + client.Client + updateCh chan struct{} +} + func topologyCommand() *cobra.Command { cmd := &cobra.Command{ Use: "topology", - Short: "Export and visualize kuadrant topology", - Long: "Export and visualize kuadrant topology", + Short: "Export and visualize Kuadrant topology", + Long: "Export and visualize Kuadrant topology, optionally streaming updates via WebSocket", RunE: runTopology, } - cmd.Flags().StringVarP(&topologyNS, "namespace", "n", "kuadrant-system", "Topology's namespace") - cmd.Flags().StringVarP(&topologySVGOutputFile, "output", "o", "", "SVG image output file") + cmd.Flags().StringVarP(&topologyNS, "namespace", "n", "kuadrant-system", "Namespace of the topology ConfigMap") + cmd.Flags().StringVarP(&topologySVGOutputFile, "svg", "s", "", "SVG image output file") cmd.Flags().StringVarP(&topologyDOTOutputFile, "dot", "d", "", "Graphviz DOT output file") - err := cmd.MarkFlagRequired("output") - if err != nil { - panic(err) - } + cmd.Flags().BoolVar(&watchFlag, "watch", false, "Enable resource watching for continuous updates") + cmd.Flags().BoolVarP(&websocketFlag, "ws", "w", false, "Start a WebSocket server to stream topology updates") return cmd } func runTopology(cmd *cobra.Command, args []string) error { - if !strings.HasSuffix(topologySVGOutputFile, ".svg") { - return errors.New("output file must have .svg extension") + // Validate that at least one output flag is provided + if topologySVGOutputFile == "" && topologyDOTOutputFile == "" && !websocketFlag { + return errors.New("at least one of --svg, --dot, or --ws must be provided") + } + + // If --ws is set without --watch, enable --watch implicitly + if websocketFlag && !watchFlag { + watchFlag = true } - ctx := cmd.Context() + + ctx, cancel := context.WithCancel(cmd.Context()) + defer cancel() + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + configuration, err := config.GetConfig() if err != nil { return err @@ -55,81 +89,293 @@ func runTopology(cmd *cobra.Command, args []string) error { return err } + // Fetch the initial ConfigMap topologyKey := client.ObjectKey{Name: "topology", Namespace: topologyNS} topologyConfigMap := &corev1.ConfigMap{} err = k8sClient.Get(ctx, topologyKey, topologyConfigMap) - logf.Log.V(1).Info("reading topology", "object", client.ObjectKeyFromObject(topologyConfigMap), "error", err) + logf.Log.V(1).Info("Reading topology ConfigMap", "object", topologyKey, "error", err) if err != nil { return err } - if topologyDOTOutputFile != "" { - fDot, err := os.Create(topologyDOTOutputFile) + updateCh := make(chan struct{}, 1) + + var mgr manager.Manager + if watchFlag { + mgr, err = manager.New(configuration, manager.Options{ + Scheme: scheme.Scheme, + }) if err != nil { return err } - defer fDot.Close() - _, err = fDot.WriteString(topologyConfigMap.Data["topology"]) - logf.Log.V(1).Info("write topology in DOT format to file", "file", topologyDOTOutputFile, "error", err) - if err != nil { + watcher := &configMapWatcher{ + Client: k8sClient, + updateCh: updateCh, + } + if err := ctrl.NewControllerManagedBy(mgr). + For(&corev1.ConfigMap{}). + Complete(watcher); err != nil { + logf.Log.Error(err, "Failed to create ConfigMap watcher") return err } + + if websocketFlag { + http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { + handleWebSocket(w, r, topologyConfigMap, mgr, ctx) + }) + + go func() { + logf.Log.Info("Starting WebSocket server on :4000") + if err := http.ListenAndServe(":4000", nil); err != nil { + logf.Log.Error(err, "Failed to start WebSocket server") + } + }() + } + + go func() { + if err := mgr.Start(ctx); err != nil { + logf.Log.Error(err, "Failed to start manager") + } + }() } + if topologyDOTOutputFile != "" { + if err := writeDOTFile(topologyDOTOutputFile, topologyConfigMap.Data["topology"]); err != nil { + return err + } + } + + if topologySVGOutputFile != "" { + if err := renderAndWriteSVG(ctx, topologyConfigMap.Data["topology"], topologySVGOutputFile); err != nil { + return err + } + + if err := openSVG(topologySVGOutputFile); err != nil { + logf.Log.Error(err, "Failed to open SVG file") + } + } + + if watchFlag { + // Goroutine to listen for ConfigMap updates and re-render the outputs + go func() { + for { + select { + case <-updateCh: + logf.Log.Info("Received update signal. Re-rendering outputs.") + // Re-fetch the updated ConfigMap + updatedConfigMap := &corev1.ConfigMap{} + err := k8sClient.Get(ctx, topologyKey, updatedConfigMap) + if err != nil { + logf.Log.Error(err, "Failed to re-fetch topology ConfigMap during update") + continue + } + + // Re-render the SVG if specified + if topologySVGOutputFile != "" { + if err := renderAndWriteSVG(ctx, updatedConfigMap.Data["topology"], topologySVGOutputFile); err != nil { + logf.Log.Error(err, "Failed to re-render SVG during update") + continue + } + } + + // update DOT file if specified + if topologyDOTOutputFile != "" { + if err := writeDOTFile(topologyDOTOutputFile, updatedConfigMap.Data["topology"]); err != nil { + logf.Log.Error(err, "Failed to update DOT file during update") + } + } + + // notify WebSocket clients + if websocketFlag { + broadcastTopologyUpdate(updatedConfigMap.Data["topology"]) + } + + logf.Log.Info("Successfully re-rendered outputs and notified clients.") + case <-ctx.Done(): + return + } + } + }() + } + + <-stop + logf.Log.Info("Shutting down gracefully") + + if websocketFlag { + closeAllWebSocketConnections() + } + + return nil +} + +func writeDOTFile(filePath, topologyData string) error { + fDot, err := os.Create(filePath) + if err != nil { + return err + } + defer fDot.Close() + + _, err = fDot.WriteString(topologyData) + logf.Log.V(1).Info("Wrote topology in DOT format to file", "file", filePath, "error", err) + if err != nil { + return err + } + return nil +} + +func renderAndWriteSVG(ctx context.Context, topologyData, outputFile string) error { g, err := graphviz.New(ctx) if err != nil { return err } + defer g.Close() - graph, err := graphviz.ParseBytes([]byte(topologyConfigMap.Data["topology"])) - logf.Log.V(1).Info("parse DOT graph", "graph empty", graph == nil, "error", err) + graph, err := graphviz.ParseBytes([]byte(topologyData)) + logf.Log.V(1).Info("Parsed DOT graph", "graph is nil", graph == nil, "error", err) if err != nil { return err } nodeNum, err := graph.NodeNum() - logf.Log.V(1).Info("info graph", "graph nodenum", nodeNum, "error", err) + logf.Log.V(1).Info("Graph info", "number of nodes", nodeNum, "error", err) if err != nil { return err } - // 1. write encoded PNG data to buffer var buf bytes.Buffer err = g.Render(ctx, graph, graphviz.SVG, &buf) - logf.Log.V(1).Info("render graph to SVG", "buf len", buf.Len(), "error", err) + logf.Log.V(1).Info("Rendered graph to SVG", "buffer length", buf.Len(), "error", err) if err != nil { return err } - // write to file - fSvg, err := os.Create(topologySVGOutputFile) + fSvg, err := os.Create(outputFile) if err != nil { return err } defer fSvg.Close() _, err = fSvg.Write(buf.Bytes()) - logf.Log.V(1).Info("write topology in SVG format to file", "file", topologySVGOutputFile, "error", err) + logf.Log.V(1).Info("Wrote topology in SVG format to file", "file", outputFile, "error", err) if err != nil { return err } + return nil +} + +func openSVG(filePath string) error { externalCommand := "xdg-open" if _, err := exec.LookPath("open"); err == nil { externalCommand = "open" } - openCmd := exec.Command(externalCommand, topologySVGOutputFile) - // pipe the commands output to the applications - // standard output + openCmd := exec.Command(externalCommand, filePath) openCmd.Stdout = os.Stdout + openCmd.Stderr = os.Stderr - // Run still runs the command and waits for completion - // but the output is instantly piped to Stdout - if err := openCmd.Run(); err != nil { - return err + return openCmd.Start() +} + +func handleWebSocket(w http.ResponseWriter, r *http.Request, topologyConfigMap *corev1.ConfigMap, mgr manager.Manager, ctx context.Context) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + logf.Log.Error(err, "Failed to upgrade to WebSocket") + return } - return nil + connectionsMu.Lock() + connections[conn] = true + connectionsMu.Unlock() + + logf.Log.Info("New WebSocket connection established") + + if err := conn.WriteJSON(topologyConfigMap.Data["topology"]); err != nil { + logf.Log.Error(err, "Failed to send initial topology") + removeConnection(conn) + return + } + + go func() { + defer removeConnection(conn) + for { + _, _, err := conn.ReadMessage() + if err != nil { + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + logf.Log.Info("WebSocket connection closed gracefully") + } else { + logf.Log.Error(err, "WebSocket read error") + } + break + } + } + }() +} + +func removeConnection(conn *websocket.Conn) { + connectionsMu.Lock() + defer connectionsMu.Unlock() + if _, exists := connections[conn]; exists { + conn.Close() + delete(connections, conn) + logf.Log.Info("WebSocket connection removed") + } +} + +func broadcastTopologyUpdate(topology string) { + connectionsMu.Lock() + defer connectionsMu.Unlock() + for conn := range connections { + if err := conn.WriteJSON(topology); err != nil { + logf.Log.Error(err, "Failed to send topology update to a client") + conn.Close() + delete(connections, conn) + } + } +} + +func (w *configMapWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + configMap := &corev1.ConfigMap{} + if err := w.Get(ctx, req.NamespacedName, configMap); err != nil { + logf.Log.Error(err, "Failed to get ConfigMap", "name", req.Name, "namespace", req.Namespace) + return ctrl.Result{}, err + } + + if configMap.Name != "topology" || configMap.Namespace != topologyNS { + return ctrl.Result{}, nil + } + + if _, exists := configMap.Data["topology"]; !exists { + logf.Log.Error(errors.New("topology data not found in ConfigMap"), "ConfigMap missing 'topology' key") + return ctrl.Result{}, nil + } + + select { + case w.updateCh <- struct{}{}: + logf.Log.V(1).Info("Sent update signal to main function") + default: + logf.Log.V(1).Info("Update signal already in queue") + } + + return ctrl.Result{}, nil +} + +func closeAllWebSocketConnections() { + connectionsMu.Lock() + defer connectionsMu.Unlock() + for conn := range connections { + err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down")) + if err != nil { + logf.Log.Error(err, "Failed to send close message to WebSocket client") + } + + err = conn.Close() + if err != nil { + logf.Log.Error(err, "Failed to close WebSocket connection") + } + + delete(connections, conn) + } + logf.Log.Info("All WebSocket connections have been closed") } diff --git a/go.mod b/go.mod index 86399dd..0276b03 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20231205033806-a5a03c77bf08 // indirect github.com/google/uuid v1.4.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/imdario/mergo v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/invopop/yaml v0.2.0 // indirect diff --git a/go.sum b/go.sum index d1530ee..2f91d33 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/google/pprof v0.0.0-20231205033806-a5a03c77bf08 h1:PxlBVtIFHR/mtWk2i0 github.com/google/pprof v0.0.0-20231205033806-a5a03c77bf08/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/invopop/yaml v0.2.0 h1:7zky/qH+O0DwAyoobXUqvVBwgBFRxKoQ/3FjcVpjTMY=