Skip to content

Commit

Permalink
feat: Adding Profiling for the Odiglets (#1685)
Browse files Browse the repository at this point in the history
  • Loading branch information
yodigos authored Nov 4, 2024
1 parent 362d502 commit f2e27ea
Show file tree
Hide file tree
Showing 4 changed files with 404 additions and 233 deletions.
250 changes: 17 additions & 233 deletions cli/cmd/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,80 +5,24 @@ import (
"compress/gzip"
"context"
"fmt"
"github.com/odigos-io/odigos/cli/cmd/resources"
"github.com/odigos-io/odigos/cli/cmd/diagnose_util"
"github.com/odigos-io/odigos/cli/pkg/kube"
"github.com/odigos-io/odigos/k8sutils/pkg/client"
"github.com/spf13/cobra"
"io"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"os"
"path/filepath"
"sigs.k8s.io/yaml"
"sync"
"time"
)

const (
logBufferSize = 1024 * 1024 // 1MB buffer size for reading logs in chunks
LogsDir = "Logs"
CRDsDir = "CRDs"
CRDName = "crdName"
CRDGroup = "crdGroup"
actionGroupName = "actions.odigos.io"
odigosGroupName = "odigos.io"
LogsDir = "Logs"
CRDsDir = "CRDs"
ProfileDir = "Profile"
)

var (
diagnoseDirs = []string{LogsDir, CRDsDir}
CRDsList = []map[string]string{
{
CRDName: "addclusterinfos",
CRDGroup: actionGroupName,
},
{
CRDName: "deleteattributes",
CRDGroup: actionGroupName,
},
{
CRDName: "renameattributes",
CRDGroup: actionGroupName,
},
{
CRDName: "probabilisticsamplers",
CRDGroup: actionGroupName,
},
{
CRDName: "piimaskings",
CRDGroup: actionGroupName,
},
{
CRDName: "latencysamplers",
CRDGroup: actionGroupName,
},
{
CRDName: "errorsamplers",
CRDGroup: actionGroupName,
},
{
CRDName: "instrumentedapplications",
CRDGroup: odigosGroupName,
},
{
CRDName: "instrumentationconfigs",
CRDGroup: odigosGroupName,
},
{
CRDName: "instrumentationrules",
CRDGroup: odigosGroupName,
},
{
CRDName: "instrumentationinstances",
CRDGroup: odigosGroupName,
},
}
diagnoseDirs = []string{LogsDir, CRDsDir, ProfileDir}
)

var diagnoseCmd = &cobra.Command{
Expand Down Expand Up @@ -108,11 +52,11 @@ func startDiagnose(ctx context.Context, client *kube.Client) error {

var wg sync.WaitGroup

// Fetch Odigos components logs
//// Fetch Odigos components logs
wg.Add(1)
go func() {
defer wg.Done()
if err := fetchOdigosComponentsLogs(ctx, client, filepath.Join(mainTempDir, LogsDir)); err != nil {
if err := diagnose_util.FetchOdigosComponentsLogs(ctx, client, filepath.Join(mainTempDir, LogsDir)); err != nil {
fmt.Printf("Error fetching Odigos components logs: %v\n", err)
}
}()
Expand All @@ -121,11 +65,20 @@ func startDiagnose(ctx context.Context, client *kube.Client) error {
wg.Add(1)
go func() {
defer wg.Done()
if err = fetchOdigosCRs(ctx, client, filepath.Join(mainTempDir, CRDsDir)); err != nil {
if err = diagnose_util.FetchOdigosCRs(ctx, client, filepath.Join(mainTempDir, CRDsDir)); err != nil {
fmt.Printf("Error fetching Odigos CRDs: %v\n", err)
}
}()

// Fetch Odigos Profile
wg.Add(1)
go func() {
defer wg.Done()
if err = diagnose_util.FetchOdigosProfiles(ctx, client, filepath.Join(mainTempDir, ProfileDir)); err != nil {
fmt.Printf("Error calculating Odigos Profile: %v\n", err)
}
}()

wg.Wait()

// Package the results into a tar.gz file
Expand Down Expand Up @@ -153,175 +106,6 @@ func createAllDirs() (string, error) {
return mainTempDir, nil
}

func fetchOdigosComponentsLogs(ctx context.Context, client *kube.Client, logDir string) error {
odigosNamespace, err := resources.GetOdigosNamespace(client, ctx)
if err != nil {
return err
}

pods, err := client.CoreV1().Pods(odigosNamespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

var wg sync.WaitGroup

for _, pod := range pods.Items {
wg.Add(1)
go func() {
defer wg.Done()
fetchPodLogs(ctx, client, odigosNamespace, pod, logDir)
}()
}

wg.Wait()

return nil
}

func fetchPodLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, logDir string) {
for _, container := range pod.Spec.Containers {
fetchingContainerLogs(ctx, client, odigosNamespace, pod, container, logDir)

}
}

func fetchingContainerLogs(ctx context.Context, client *kube.Client, odigosNamespace string, pod v1.Pod, container v1.Container, logDir string) {
logPrefix := fmt.Sprintf("Fetching logs for Pod: %s, Container: %s, Node: %s", pod.Name, container.Name, pod.Spec.NodeName)
fmt.Printf(logPrefix + "\n")

// Define the log file path for saving compressed logs
logFilePath := filepath.Join(logDir, pod.Name+"_"+container.Name+"_"+pod.Spec.NodeName+".log.gz")
logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
fmt.Printf(logPrefix+" - Failed - Error creating log file: %v\n", err)
return
}
defer logFile.Close()

req := client.CoreV1().Pods(odigosNamespace).GetLogs(pod.Name, &v1.PodLogOptions{})
logStream, err := req.Stream(ctx)
if err != nil {
fmt.Printf(logPrefix+" - Failed - Error creating log stream: %v\n", err)
return
}
defer logStream.Close()

if err = saveLogsToGzipFileInBatches(logFile, logStream, logBufferSize); err != nil {
fmt.Printf(logPrefix+" - Failed - Error saving logs to file: %v\n", err)
return
}
}

func saveLogsToGzipFileInBatches(logFile *os.File, logStream io.ReadCloser, bufferSize int) error {
// Create a gzip writer to compress the logs
gzipWriter := gzip.NewWriter(logFile)
defer gzipWriter.Close()

// Read logs in chunks and write them to the file
buffer := make([]byte, bufferSize)
for {
n, err := logStream.Read(buffer)
if n > 0 {
// Write the chunk to the gzip file
if _, err := gzipWriter.Write(buffer[:n]); err != nil {
return err
}
}

if err == io.EOF {
// End of the log stream; break out of the loop
break
}

if err != nil {
return err
}
}

return nil
}

func fetchOdigosCRs(ctx context.Context, kubeClient *kube.Client, crdDir string) error {
var wg sync.WaitGroup

for _, resourceData := range CRDsList {
crdDataDirPath := filepath.Join(crdDir, resourceData[CRDName])
err := os.Mkdir(crdDataDirPath, os.ModePerm) // os.ModePerm gives full permissions (0777)
if err != nil {
fmt.Printf("Error creating directory for CRD: %v, because: %v", resourceData, err)
continue
}

wg.Add(1)

go func() {
defer wg.Done()
err = fetchSingleResource(ctx, kubeClient, crdDataDirPath, resourceData)
if err != nil {
fmt.Printf("Error Getting CRDs of: %v, because: %v\n", resourceData[CRDName], err)
}
}()
}

wg.Wait()

return nil
}

func fetchSingleResource(ctx context.Context, kubeClient *kube.Client, crdDataDirPath string, resourceData map[string]string) error {
fmt.Printf("Fetching Resource: %s\n", resourceData[CRDName])

gvr := schema.GroupVersionResource{
Group: resourceData[CRDGroup], // The API group
Version: "v1alpha1", // The version of the resourceData
Resource: resourceData[CRDName], // The resourceData type
}

err := client.ListWithPages(client.DefaultPageSize, kubeClient.Dynamic.Resource(gvr).List, ctx, metav1.ListOptions{}, func(crds *unstructured.UnstructuredList) error {
for _, crd := range crds.Items {
if err := saveCrdToFile(crd, crdDataDirPath); err != nil {
fmt.Printf("Fetching Resource %s Failed because: %s\n", resourceData[CRDName], err)
}
}
return nil
},
)

if err != nil {
return err
}

return nil
}

func saveCrdToFile(crd unstructured.Unstructured, crdDataDirPath string) error {
crdDirPath := filepath.Join(crdDataDirPath, crd.GetName()+".yaml.gz")
crdFile, err := os.OpenFile(crdDirPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
if err != nil {
return err
}
defer crdFile.Close()

gzipWriter := gzip.NewWriter(crdFile)
defer gzipWriter.Close()

crdYAML, err := yaml.Marshal(crd)
if err != nil {
return err
}

_, err = gzipWriter.Write(crdYAML)
if err != nil {
return err
}
if err = gzipWriter.Flush(); err != nil {
return err
}

return nil
}

func createTarGz(sourceDir string) error {
timestamp := time.Now().Format("02012006150405")
tarGzFileName := fmt.Sprintf("odigos_debug_%s.tar.gz", timestamp)
Expand Down
Loading

0 comments on commit f2e27ea

Please sign in to comment.