diff --git a/cli/cmd/diagnose.go b/cli/cmd/diagnose.go index 74117f485..965075153 100644 --- a/cli/cmd/diagnose.go +++ b/cli/cmd/diagnose.go @@ -5,80 +5,24 @@ import ( "compress/gzip" "context" "fmt" - "github.com/odigos-io/odigos/cli/cmd/resources" + "github.com/odigos-io/odigos/cli/cmd/diagnose_util" "github.com/odigos-io/odigos/cli/pkg/kube" - "github.com/odigos-io/odigos/k8sutils/pkg/client" "github.com/spf13/cobra" "io" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" "os" "path/filepath" - "sigs.k8s.io/yaml" "sync" "time" ) const ( - logBufferSize = 1024 * 1024 // 1MB buffer size for reading logs in chunks - LogsDir = "Logs" - CRDsDir = "CRDs" - CRDName = "crdName" - CRDGroup = "crdGroup" - actionGroupName = "actions.odigos.io" - odigosGroupName = "odigos.io" + LogsDir = "Logs" + CRDsDir = "CRDs" + ProfileDir = "Profile" ) var ( - diagnoseDirs = []string{LogsDir, CRDsDir} - CRDsList = []map[string]string{ - { - CRDName: "addclusterinfos", - CRDGroup: actionGroupName, - }, - { - CRDName: "deleteattributes", - CRDGroup: actionGroupName, - }, - { - CRDName: "renameattributes", - CRDGroup: actionGroupName, - }, - { - CRDName: "probabilisticsamplers", - CRDGroup: actionGroupName, - }, - { - CRDName: "piimaskings", - CRDGroup: actionGroupName, - }, - { - CRDName: "latencysamplers", - CRDGroup: actionGroupName, - }, - { - CRDName: "errorsamplers", - CRDGroup: actionGroupName, - }, - { - CRDName: "instrumentedapplications", - CRDGroup: odigosGroupName, - }, - { - CRDName: "instrumentationconfigs", - CRDGroup: odigosGroupName, - }, - { - CRDName: "instrumentationrules", - CRDGroup: odigosGroupName, - }, - { - CRDName: "instrumentationinstances", - CRDGroup: odigosGroupName, - }, - } + diagnoseDirs = []string{LogsDir, CRDsDir, ProfileDir} ) var diagnoseCmd = &cobra.Command{ @@ -108,11 +52,11 @@ func startDiagnose(ctx context.Context, client *kube.Client) error { var wg sync.WaitGroup - // Fetch Odigos components logs + //// Fetch Odigos components logs wg.Add(1) go func() { defer wg.Done() - if err := fetchOdigosComponentsLogs(ctx, client, filepath.Join(mainTempDir, LogsDir)); err != nil { + if err := diagnose_util.FetchOdigosComponentsLogs(ctx, client, filepath.Join(mainTempDir, LogsDir)); err != nil { fmt.Printf("Error fetching Odigos components logs: %v\n", err) } }() @@ -121,11 +65,20 @@ func startDiagnose(ctx context.Context, client *kube.Client) error { wg.Add(1) go func() { defer wg.Done() - if err = fetchOdigosCRs(ctx, client, filepath.Join(mainTempDir, CRDsDir)); err != nil { + if err = diagnose_util.FetchOdigosCRs(ctx, client, filepath.Join(mainTempDir, CRDsDir)); err != nil { fmt.Printf("Error fetching Odigos CRDs: %v\n", err) } }() + // Fetch Odigos Profile + wg.Add(1) + go func() { + defer wg.Done() + if err = diagnose_util.FetchOdigosProfiles(ctx, client, filepath.Join(mainTempDir, ProfileDir)); err != nil { + fmt.Printf("Error calculating Odigos Profile: %v\n", err) + } + }() + wg.Wait() // Package the results into a tar.gz file @@ -153,175 +106,6 @@ func createAllDirs() (string, error) { return mainTempDir, nil } -func fetchOdigosComponentsLogs(ctx context.Context, client *kube.Client, logDir string) error { - odigosNamespace, err := resources.GetOdigosNamespace(client, ctx) - if err != nil { - return err - } - - pods, err := client.CoreV1().Pods(odigosNamespace).List(context.TODO(), metav1.ListOptions{}) - if err != nil { - return err - } - - var wg sync.WaitGroup - - for _, pod := range pods.Items { - wg.Add(1) - go func() { - defer wg.Done() - fetchPodLogs(ctx, client, odigosNamespace, pod, logDir) - }() - } - - wg.Wait() - - return nil -} - -func fetchPodLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, logDir string) { - for _, container := range pod.Spec.Containers { - fetchingContainerLogs(ctx, client, odigosNamespace, pod, container, logDir) - - } -} - -func fetchingContainerLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, container v1.Container, logDir string) { - logPrefix := fmt.Sprintf("Fetching logs for Pod: %s, Container: %s, Node: %s", pod.Name, container.Name, pod.Spec.NodeName) - fmt.Printf(logPrefix + "\n") - - // Define the log file path for saving compressed logs - logFilePath := filepath.Join(logDir, pod.Name+"_"+container.Name+"_"+pod.Spec.NodeName+".log.gz") - logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - fmt.Printf(logPrefix+" - Failed - Error creating log file: %v\n", err) - return - } - defer logFile.Close() - - req := client.CoreV1().Pods(odigosNamespace).GetLogs(pod.Name, &v1.PodLogOptions{}) - logStream, err := req.Stream(ctx) - if err != nil { - fmt.Printf(logPrefix+" - Failed - Error creating log stream: %v\n", err) - return - } - defer logStream.Close() - - if err = saveLogsToGzipFileInBatches(logFile, logStream, logBufferSize); err != nil { - fmt.Printf(logPrefix+" - Failed - Error saving logs to file: %v\n", err) - return - } -} - -func saveLogsToGzipFileInBatches(logFile *os.File, logStream io.ReadCloser, bufferSize int) error { - // Create a gzip writer to compress the logs - gzipWriter := gzip.NewWriter(logFile) - defer gzipWriter.Close() - - // Read logs in chunks and write them to the file - buffer := make([]byte, bufferSize) - for { - n, err := logStream.Read(buffer) - if n > 0 { - // Write the chunk to the gzip file - if _, err := gzipWriter.Write(buffer[:n]); err != nil { - return err - } - } - - if err == io.EOF { - // End of the log stream; break out of the loop - break - } - - if err != nil { - return err - } - } - - return nil -} - -func fetchOdigosCRs(ctx context.Context, kubeClient *kube.Client, crdDir string) error { - var wg sync.WaitGroup - - for _, resourceData := range CRDsList { - crdDataDirPath := filepath.Join(crdDir, resourceData[CRDName]) - err := os.Mkdir(crdDataDirPath, os.ModePerm) // os.ModePerm gives full permissions (0777) - if err != nil { - fmt.Printf("Error creating directory for CRD: %v, because: %v", resourceData, err) - continue - } - - wg.Add(1) - - go func() { - defer wg.Done() - err = fetchSingleResource(ctx, kubeClient, crdDataDirPath, resourceData) - if err != nil { - fmt.Printf("Error Getting CRDs of: %v, because: %v\n", resourceData[CRDName], err) - } - }() - } - - wg.Wait() - - return nil -} - -func fetchSingleResource(ctx context.Context, kubeClient *kube.Client, crdDataDirPath string, resourceData map[string]string) error { - fmt.Printf("Fetching Resource: %s\n", resourceData[CRDName]) - - gvr := schema.GroupVersionResource{ - Group: resourceData[CRDGroup], // The API group - Version: "v1alpha1", // The version of the resourceData - Resource: resourceData[CRDName], // The resourceData type - } - - err := client.ListWithPages(client.DefaultPageSize, kubeClient.Dynamic.Resource(gvr).List, ctx, metav1.ListOptions{}, func(crds *unstructured.UnstructuredList) error { - for _, crd := range crds.Items { - if err := saveCrdToFile(crd, crdDataDirPath); err != nil { - fmt.Printf("Fetching Resource %s Failed because: %s\n", resourceData[CRDName], err) - } - } - return nil - }, - ) - - if err != nil { - return err - } - - return nil -} - -func saveCrdToFile(crd unstructured.Unstructured, crdDataDirPath string) error { - crdDirPath := filepath.Join(crdDataDirPath, crd.GetName()+".yaml.gz") - crdFile, err := os.OpenFile(crdDirPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) - if err != nil { - return err - } - defer crdFile.Close() - - gzipWriter := gzip.NewWriter(crdFile) - defer gzipWriter.Close() - - crdYAML, err := yaml.Marshal(crd) - if err != nil { - return err - } - - _, err = gzipWriter.Write(crdYAML) - if err != nil { - return err - } - if err = gzipWriter.Flush(); err != nil { - return err - } - - return nil -} - func createTarGz(sourceDir string) error { timestamp := time.Now().Format("02012006150405") tarGzFileName := fmt.Sprintf("odigos_debug_%s.tar.gz", timestamp) diff --git a/cli/cmd/diagnose_util/crd_utils.go b/cli/cmd/diagnose_util/crd_utils.go new file mode 100644 index 000000000..7170273d1 --- /dev/null +++ b/cli/cmd/diagnose_util/crd_utils.go @@ -0,0 +1,146 @@ +package diagnose_util + +import ( + "context" + "fmt" + "github.com/odigos-io/odigos/cli/pkg/kube" + "github.com/odigos-io/odigos/k8sutils/pkg/client" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "os" + "path/filepath" + "sigs.k8s.io/yaml" + "sync" +) + +const ( + CRDName = "crdName" + CRDGroup = "crdGroup" + actionGroupName = "actions.odigos.io" + odigosGroupName = "odigos.io" +) + +var CRDsList = []map[string]string{ + { + CRDName: "addclusterinfos", + CRDGroup: actionGroupName, + }, + { + CRDName: "deleteattributes", + CRDGroup: actionGroupName, + }, + { + CRDName: "renameattributes", + CRDGroup: actionGroupName, + }, + { + CRDName: "probabilisticsamplers", + CRDGroup: actionGroupName, + }, + { + CRDName: "piimaskings", + CRDGroup: actionGroupName, + }, + { + CRDName: "latencysamplers", + CRDGroup: actionGroupName, + }, + { + CRDName: "errorsamplers", + CRDGroup: actionGroupName, + }, + { + CRDName: "instrumentedapplications", + CRDGroup: odigosGroupName, + }, + { + CRDName: "instrumentationconfigs", + CRDGroup: odigosGroupName, + }, + { + CRDName: "instrumentationrules", + CRDGroup: odigosGroupName, + }, + { + CRDName: "instrumentationinstances", + CRDGroup: odigosGroupName, + }, +} + +func FetchOdigosCRs(ctx context.Context, kubeClient *kube.Client, crdDir string) error { + var wg sync.WaitGroup + + for _, resourceData := range CRDsList { + crdDataDirPath := filepath.Join(crdDir, resourceData[CRDName]) + err := os.Mkdir(crdDataDirPath, os.ModePerm) // os.ModePerm gives full permissions (0777) + if err != nil { + fmt.Printf("Error creating directory for CRD: %v, because: %v", resourceData, err) + continue + } + + wg.Add(1) + + go func() { + defer wg.Done() + err = fetchSingleResource(ctx, kubeClient, crdDataDirPath, resourceData) + if err != nil { + fmt.Printf("Error Getting CRDs of: %v, because: %v\n", resourceData[CRDName], err) + } + }() + } + + wg.Wait() + + return nil +} + +func fetchSingleResource(ctx context.Context, kubeClient *kube.Client, crdDataDirPath string, resourceData map[string]string) error { + fmt.Printf("Fetching Resource: %s\n", resourceData[CRDName]) + + gvr := schema.GroupVersionResource{ + Group: resourceData[CRDGroup], // The API group + Version: "v1alpha1", // The version of the resourceData + Resource: resourceData[CRDName], // The resourceData type + } + + err := client.ListWithPages(client.DefaultPageSize, kubeClient.Dynamic.Resource(gvr).List, ctx, metav1.ListOptions{}, func(crds *unstructured.UnstructuredList) error { + for _, crd := range crds.Items { + if err := saveCrdToFile(crd, crdDataDirPath); err != nil { + fmt.Printf("Fetching Resource %s Failed because: %s\n", resourceData[CRDName], err) + } + } + return nil + }, + ) + + if err != nil { + return err + } + + return nil +} + +func saveCrdToFile(crd unstructured.Unstructured, crdDataDirPath string) error { + crdDirPath := filepath.Join(crdDataDirPath, crd.GetName()+".yaml.gz") + crdFile, err := os.OpenFile(crdDirPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + return err + } + defer crdFile.Close() + + crdYAML, err := yaml.Marshal(crd) + if err != nil { + return err + } + + _, err = crdFile.Write(crdYAML) + if err != nil { + return err + } + if err = crdFile.Sync(); err != nil { + return err + } + + return nil +} diff --git a/cli/cmd/diagnose_util/logs_util.go b/cli/cmd/diagnose_util/logs_util.go new file mode 100644 index 000000000..b91b6b73d --- /dev/null +++ b/cli/cmd/diagnose_util/logs_util.go @@ -0,0 +1,106 @@ +package diagnose_util + +import ( + "compress/gzip" + "context" + "fmt" + "github.com/odigos-io/odigos/cli/cmd/resources" + "github.com/odigos-io/odigos/cli/pkg/kube" + "io" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" + "path/filepath" + "sync" +) + +const logBufferSize = 1024 * 1024 // 1MB buffer size for reading logs in chunks + +func FetchOdigosComponentsLogs(ctx context.Context, client *kube.Client, logDir string) error { + odigosNamespace, err := resources.GetOdigosNamespace(client, ctx) + if err != nil { + return err + } + + pods, err := client.CoreV1().Pods(odigosNamespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return err + } + + var wg sync.WaitGroup + + for _, pod := range pods.Items { + wg.Add(1) + go func() { + defer wg.Done() + fetchPodLogs(ctx, client, odigosNamespace, pod, logDir) + }() + } + + wg.Wait() + + return nil +} + +func fetchPodLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, logDir string) { + for _, container := range pod.Spec.Containers { + fetchingContainerLogs(ctx, client, odigosNamespace, pod, container, logDir) + + } +} + +func fetchingContainerLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, container v1.Container, logDir string) { + logPrefix := fmt.Sprintf("Fetching logs for Pod: %s, Container: %s, Node: %s", pod.Name, container.Name, pod.Spec.NodeName) + fmt.Printf(logPrefix + "\n") + + // Define the log file path for saving compressed logs + logFilePath := filepath.Join(logDir, pod.Name+"_"+container.Name+"_"+pod.Spec.NodeName+".log.gz") + logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + fmt.Printf(logPrefix+" - Failed - Error creating log file: %v\n", err) + return + } + defer logFile.Close() + + req := client.CoreV1().Pods(odigosNamespace).GetLogs(pod.Name, &v1.PodLogOptions{}) + logStream, err := req.Stream(ctx) + if err != nil { + fmt.Printf(logPrefix+" - Failed - Error creating log stream: %v\n", err) + return + } + defer logStream.Close() + + if err = saveLogsToGzipFileInBatches(logFile, logStream, logBufferSize); err != nil { + fmt.Printf(logPrefix+" - Failed - Error saving logs to file: %v\n", err) + return + } +} + +func saveLogsToGzipFileInBatches(logFile *os.File, logStream io.ReadCloser, bufferSize int) error { + // Create a gzip writer to compress the logs + gzipWriter := gzip.NewWriter(logFile) + defer gzipWriter.Close() + + // Read logs in chunks and write them to the file + buffer := make([]byte, bufferSize) + for { + n, err := logStream.Read(buffer) + if n > 0 { + // Write the chunk to the gzip file + if _, err := gzipWriter.Write(buffer[:n]); err != nil { + return err + } + } + + if err == io.EOF { + // End of the log stream; break out of the loop + break + } + + if err != nil { + return err + } + } + + return nil +} diff --git a/cli/cmd/diagnose_util/profiling_util.go b/cli/cmd/diagnose_util/profiling_util.go new file mode 100644 index 000000000..b4d74ea06 --- /dev/null +++ b/cli/cmd/diagnose_util/profiling_util.go @@ -0,0 +1,135 @@ +package diagnose_util + +import ( + "bytes" + "context" + "fmt" + "github.com/odigos-io/odigos/cli/cmd/resources" + "github.com/odigos-io/odigos/cli/pkg/kube" + "io" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "os" + "path/filepath" + "sync" +) + +var ProfilingMetricsFunctions = []ProfileInterface{CPUProfiler{}, HeapProfiler{}, GoRoutineProfiler{}, AllocsProfiler{}} + +type ProfileInterface interface { + GetFileName() string + GetUrlSuffix() string +} + +type CPUProfiler struct{} + +func (c CPUProfiler) GetFileName() string { + return "cpu_profile.prof" +} + +func (c CPUProfiler) GetUrlSuffix() string { + return "/profile" +} + +type HeapProfiler struct{} + +func (h HeapProfiler) GetFileName() string { + return "heap_profile.prof" +} + +func (h HeapProfiler) GetUrlSuffix() string { + return "/heap" +} + +type GoRoutineProfiler struct{} + +func (h GoRoutineProfiler) GetFileName() string { + return "goroutine_profile.prof" +} + +func (h GoRoutineProfiler) GetUrlSuffix() string { + return "/goroutine" +} + +type AllocsProfiler struct{} + +func (h AllocsProfiler) GetFileName() string { + return "allocs_profile.prof" +} + +func (h AllocsProfiler) GetUrlSuffix() string { + return "/allocs" +} + +func FetchOdigosProfiles(ctx context.Context, client *kube.Client, profileDir string) error { + + odigosNamespace, err := resources.GetOdigosNamespace(client, ctx) + if err != nil { + return nil + } + + odigletPods, err := client.CoreV1().Pods(odigosNamespace).List(ctx, metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=odiglet", + }) + if err != nil { + return err + } + + var wg sync.WaitGroup + + for _, odigletPod := range odigletPods.Items { + fmt.Printf("Fetching profile for node: %v", odigletPod.Spec.NodeName) + nodeFilePath := filepath.Join(profileDir, odigletPod.Spec.NodeName) + err := os.Mkdir(nodeFilePath, os.ModePerm) + if err != nil { + fmt.Printf("Error creating directory for node: %v, because: %v", nodeFilePath, err) + continue + } + + for _, profileMetricFunction := range ProfilingMetricsFunctions { + metricFilePath := filepath.Join(nodeFilePath, profileMetricFunction.GetFileName()) + metricFile, err := os.OpenFile(metricFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) + if err != nil { + fmt.Printf("Error creating file: %v, because: %v", metricFilePath, err) + continue + } + defer metricFile.Close() + + wg.Add(1) + + go func() { + defer wg.Done() + podName := odigletPod.Name + err = captureProfile(ctx, client, podName, odigosNamespace, metricFile, profileMetricFunction) + if err != nil { + fmt.Printf("Error Getting Profile Data of: %v, because: %v\n", profileMetricFunction, err) + } + }() + } + + wg.Wait() + } + + return nil +} + +func captureProfile(ctx context.Context, client *kube.Client, podName string, namespace string, metricFile *os.File, profileInterface ProfileInterface) error { + proxyURL := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s:6060/proxy/debug/pprof%s", namespace, podName, profileInterface.GetUrlSuffix()) + + // Make the HTTP GET request via the API server proxy + request := client.Clientset.CoreV1().RESTClient(). + Get(). + AbsPath(proxyURL). + Do(ctx) + + response, err := request.Raw() + if err != nil { + return err + } + + _, err = io.Copy(metricFile, bytes.NewReader(response)) + if err != nil { + return err + } + + return nil +}