diff --git a/cmd/kubehound/ingest.go b/cmd/kubehound/ingest.go index 8e5e94337..58bffda9f 100644 --- a/cmd/kubehound/ingest.go +++ b/cmd/kubehound/ingest.go @@ -42,11 +42,17 @@ var ( remoteIngestCmd = &cobra.Command{ Use: "remote", Short: "Ingest data remotely on a KHaaS instance", - Long: `Run an ingestion on KHaaS from a bucket to build the attack path`, + Long: `Run an ingestion on KHaaS from a bucket to build the attack path, by default it will rehydrate the latest snapshot previously dumped on a KHaaS instance from all clusters`, PreRunE: func(cobraCmd *cobra.Command, args []string) error { viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck + cobraCmd.MarkFlagRequired("khaas-server") //nolint: errcheck viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck + if !isIngestRemoteDefault() { + cobraCmd.MarkFlagRequired("run_id") //nolint: errcheck + cobraCmd.MarkFlagRequired("cluster") //nolint: errcheck + } + return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true) }, RunE: func(cobraCmd *cobra.Command, args []string) error { @@ -56,11 +62,22 @@ var ( return fmt.Errorf("get config: %w", err) } + if isIngestRemoteDefault() { + return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor) + } + return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID) }, } ) +func isIngestRemoteDefault() bool { + runID := viper.GetString(config.IngestorRunID) + clusterName := viper.GetString(config.IngestorClusterName) + + return runID == "" && clusterName == "" +} + func init() { ingestCmd.AddCommand(localIngestCmd) diff --git a/go.mod b/go.mod index 32c6b14e8..019254ecb 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( gocloud.dev v0.37.0 golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 google.golang.org/grpc v1.64.1 - google.golang.org/protobuf v1.34.1 + google.golang.org/protobuf v1.34.2 gopkg.in/DataDog/dd-trace-go.v1 v1.64.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 2a5ae6de6..7331ec213 100644 --- a/go.sum +++ b/go.sum @@ -916,6 +916,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/DataDog/dd-trace-go.v1 v1.64.1 h1:HN/zoIV8FvrLKA1ZBkbyo4E1MnPh9hPc2Q0C/ojom3I= gopkg.in/DataDog/dd-trace-go.v1 v1.64.1/go.mod h1:qzwVu8Qr8CqzQNw2oKEXRdD+fMnjYatjYMGE0tdCVG4= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index 3e756f05b..6ad06e755 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -63,21 +63,16 @@ func InitRemoteDumpCmd(cmd *cobra.Command) { func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { - cmd.Flags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)") - cmd.Flags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint") + cmd.PersistentFlags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)") + cmd.PersistentFlags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint") // IngestorAPIEndpoint if standalone { cmd.Flags().String("run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)") viper.BindPFlag(config.IngestorRunID, cmd.Flags().Lookup("run_id")) //nolint: errcheck - cmd.MarkFlagRequired("run_id") //nolint: errcheck cmd.Flags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)") viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck - cmd.MarkFlagRequired("cluster") //nolint: errcheck - - // Reusing the same flags for the dump cloud and ingest command - cmd.MarkFlagRequired("khaas-server") //nolint: errcheck } } diff --git a/pkg/dump/ingestor.go b/pkg/dump/ingestor.go index 4af9a3340..6765c9086 100644 --- a/pkg/dump/ingestor.go +++ b/pkg/dump/ingestor.go @@ -3,7 +3,6 @@ package dump import ( "context" "fmt" - "path" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" @@ -15,23 +14,10 @@ import ( ) type DumpIngestor struct { - directoryOutput string - ResultName string - collector collector.CollectorClient - writer writer.DumperWriter + collector collector.CollectorClient + writer writer.DumperWriter } -const ( - OfflineDumpDateFormat = "2006-01-02-15-04-05" - OfflineDumpPrefix = "kubehound_" -) - -// .//kubehound__ -func DumpIngestorResultName(clusterName string, runID string) string { - return path.Join(clusterName, fmt.Sprintf("%s%s_%s", OfflineDumpPrefix, clusterName, runID)) -} - -// func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string) (*DumpIngestor, error) { func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string, runID *config.RunID) (*DumpIngestor, error) { // Generate path for the dump clusterName, err := getClusterName(ctx, collector) @@ -39,18 +25,19 @@ func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, c return nil, err } - resultName := DumpIngestorResultName(clusterName, runID.String()) + dumpResult, err := NewDumpResult(clusterName, runID.String(), compression) + if err != nil { + return nil, fmt.Errorf("create dump result: %w", err) + } - dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, resultName) + dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, dumpResult.GetFullPath()) if err != nil { return nil, fmt.Errorf("create collector writer: %w", err) } return &DumpIngestor{ - directoryOutput: directoryOutput, - collector: collector, - writer: dumpWriter, - ResultName: resultName, + collector: collector, + writer: dumpWriter, }, nil } diff --git a/pkg/dump/ingestor_test.go b/pkg/dump/ingestor_test.go index cbc296cf1..d0000a254 100644 --- a/pkg/dump/ingestor_test.go +++ b/pkg/dump/ingestor_test.go @@ -20,9 +20,9 @@ const ( ) func TestNewDumpIngestor(t *testing.T) { - t.Parallel() ctx := context.Background() + t.Setenv("KUBECONFIG", "./testdata/kube-config") clientset := fake.NewSimpleClientset() collectorClient := collector.NewTestK8sAPICollector(ctx, clientset) @@ -48,8 +48,6 @@ func TestNewDumpIngestor(t *testing.T) { runID: config.NewRunID(), }, want: &DumpIngestor{ - directoryOutput: mockDirectoryOutput, - writer: &writer.FileWriter{}, }, wantErr: false, @@ -63,16 +61,16 @@ func TestNewDumpIngestor(t *testing.T) { runID: config.NewRunID(), }, want: &DumpIngestor{ - directoryOutput: mockDirectoryOutput, - writer: &writer.TarWriter{}, + writer: &writer.TarWriter{}, }, wantErr: false, }, } - for _, tt := range tests { + // Can not run parallel tests as the environment variable KUBECONFIG is set + // t.Setenv is not compatible with parallel tests + for _, tt := range tests { //nolint:paralleltest tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() got, err := NewDumpIngestor(ctx, tt.args.collectorClient, tt.args.compression, tt.args.directoryOutput, tt.args.runID) if (err != nil) != tt.wantErr { t.Errorf("NewDumpIngestorsss() error = %v, wantErr %v", err, tt.wantErr) @@ -83,10 +81,6 @@ func TestNewDumpIngestor(t *testing.T) { if !assert.Equal(t, reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer)) { t.Errorf("NewDumpIngestor() = %v, want %v", reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer)) } - - if !assert.Equal(t, got.directoryOutput, tt.want.directoryOutput) { - t.Errorf("NewDumpIngestor() = %v, want %v", got.directoryOutput, tt.want.directoryOutput) - } }) } } diff --git a/pkg/dump/result.go b/pkg/dump/result.go new file mode 100644 index 000000000..3c0955983 --- /dev/null +++ b/pkg/dump/result.go @@ -0,0 +1,113 @@ +package dump + +import ( + "fmt" + "path" + "regexp" +) + +type DumpResult struct { + clusterName string + RunID string + isDir bool + extension string +} + +const ( + DumpResultClusterNameRegex = `([A-Za-z0-9\.\-_]+)` + DumpResultRunIDRegex = `([a-z0-9]{26})` + DumpResultExtensionRegex = `\.?([a-z0-9\.]+)?` + DumpResultPrefix = "kubehound_" + DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex + DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex + + DumpResultTarWriterExtension = "tar.gz" +) + +func NewDumpResult(clusterName, runID string, isCompressed bool) (*DumpResult, error) { + dumpResult := &DumpResult{ + clusterName: clusterName, + RunID: runID, + isDir: true, + } + if isCompressed { + dumpResult.Compressed() + } + + err := dumpResult.Validate() + if err != nil { + return nil, err + } + + return dumpResult, nil +} + +func (i *DumpResult) Validate() error { + re := regexp.MustCompile(DumpResultClusterNameRegex) + if !re.MatchString(i.clusterName) { + return fmt.Errorf("Invalid clustername: %q", i.clusterName) + } + + matches := re.FindStringSubmatch(i.clusterName) + if len(matches) == 2 && matches[1] != i.clusterName { + return fmt.Errorf("Invalid clustername: %q", i.clusterName) + } + + re = regexp.MustCompile(DumpResultRunIDRegex) + if !re.MatchString(i.RunID) { + return fmt.Errorf("Invalid runID: %q", i.RunID) + } + + return nil +} + +func (i *DumpResult) Compressed() { + i.isDir = false + i.extension = DumpResultTarWriterExtension +} + +// .//kubehound__ +func (i *DumpResult) GetFullPath() string { + filename := i.GetFilename() + + return path.Join(i.clusterName, filename) +} + +func (i *DumpResult) GetFilename() string { + filename := fmt.Sprintf("%s%s_%s", DumpResultPrefix, i.clusterName, i.RunID) + if i.isDir { + return filename + } + + return fmt.Sprintf("%s.%s", filename, i.extension) +} + +func ParsePath(path string) (*DumpResult, error) { + // .//kubehound__[.tar.gz] + // re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`) + re := regexp.MustCompile(DumpResultPathRegex) + if !re.MatchString(path) { + return nil, fmt.Errorf("Invalid path provided: %q", path) + } + + matches := re.FindStringSubmatch(path) + // The cluster name should match (parent dir and in the filename) + if matches[1] != matches[2] { + return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path) + } + + clusterName := matches[1] + runID := matches[3] + extension := matches[4] + + isCompressed := false + if extension != "" { + isCompressed = true + } + result, err := NewDumpResult(clusterName, runID, isCompressed) + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go new file mode 100644 index 000000000..e084711d5 --- /dev/null +++ b/pkg/dump/result_test.go @@ -0,0 +1,286 @@ +package dump + +import ( + "fmt" + "path" + "reflect" + "testing" +) + +const ( + validClusterName = "cluster1.k8s.local" + validRunID = "01j2qs8th6yarr5hkafysekn0j" + // cluster name with invalid characters (for instance /) + nonValidClusterName = "cluster1.k8s.local/" + // RunID with capital letters + nonValidRunID = "01j2qs8TH6yarr5hkafysekn0j" +) + +func TestParsePath(t *testing.T) { + t.Parallel() + type args struct { + path string + } + tests := []struct { + name string + args args + want *DumpResult + wantErr bool + }{ + { + name: "valid path with no compression", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: true, + extension: "", + }, + wantErr: false, + }, + { + name: "valid path with compressed data", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: false, + extension: "tar.gz", + }, + wantErr: false, + }, + { + name: "invalid path", + args: args{ + path: "/tmp/cluster1.k8s.local/cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "not matching clustername ", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8TH6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := ParsePath(tt.args.path) + if (err != nil) != tt.wantErr { + t.Errorf("ParsePath() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParsePath() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDumpResult_GetFilename(t *testing.T) { + t.Parallel() + + type fields struct { + ClusterName string + RunID string + IsDir bool + Extension string + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "valide dump result object no compression", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: true, + Extension: "", + }, + want: fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j"), + }, + { + name: "valide dump result object compressed", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: false, + Extension: "tar.gz", + }, + want: fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz"), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + i := &DumpResult{ + clusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, + } + if got := i.GetFilename(); got != tt.want { + t.Errorf("DumpResult.GetFilename() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDumpResult_GetFullPath(t *testing.T) { + t.Parallel() + + type fields struct { + ClusterName string + RunID string + IsDir bool + Extension string + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "valide dump result object no compression", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: true, + Extension: "", + }, + want: path.Join(validClusterName, fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j")), + }, + { + name: "valide dump result object compressed", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: false, + Extension: "tar.gz", + }, + want: path.Join(validClusterName, fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz")), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + i := &DumpResult{ + clusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, + } + if got := i.GetFullPath(); got != tt.want { + t.Errorf("DumpResult.GetFullPath() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewDumpResult(t *testing.T) { + t.Parallel() + + type args struct { + clusterName string + runID string + isCompressed bool + } + tests := []struct { + name string + args args + want *DumpResult + wantErr bool + }{ + { + name: "valid entry", + args: args{ + clusterName: validClusterName, + runID: validRunID, + isCompressed: false, + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: true, + }, + wantErr: false, + }, + { + name: "invalid clustername", + args: args{ + clusterName: nonValidClusterName, + runID: validRunID, + isCompressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "empty clustername", + args: args{ + clusterName: "", + runID: validRunID, + isCompressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + clusterName: validClusterName, + runID: nonValidRunID, + isCompressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + clusterName: validClusterName, + runID: "", + isCompressed: false, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := NewDumpResult(tt.args.clusterName, tt.args.runID, tt.args.isCompressed) + if (err != nil) != tt.wantErr { + t.Errorf("NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDumpResult() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/dump/testdata/kube-config b/pkg/dump/testdata/kube-config new file mode 100644 index 000000000..bf569401f --- /dev/null +++ b/pkg/dump/testdata/kube-config @@ -0,0 +1,20 @@ +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: AA== + server: https://127.0.0.1:59959 + name: cluster.k8s.local +contexts: +- context: + cluster: cluster.k8s.local + user: cluster.k8s.local + name: cluster.k8s.local +current-context: cluster.k8s.local +kind: Config +preferences: {} +users: +- name: cluster.k8s.local + user: + client-certificate-data: AA== + client-key-data: AA== + diff --git a/pkg/dump/writer/file_writer.go b/pkg/dump/writer/file_writer.go index c67bbcd5f..09bea82ee 100644 --- a/pkg/dump/writer/file_writer.go +++ b/pkg/dump/writer/file_writer.go @@ -35,7 +35,7 @@ type FileWriter struct { fsWriter *FSWriter } -func NewFileWriter(ctx context.Context, directoryOutput string, resName string) (*FileWriter, error) { +func NewFileWriter(ctx context.Context, directoryOutput string) (*FileWriter, error) { fsWriter, err := NewFSWriter(ctx) if err != nil { return nil, fmt.Errorf("creating fs writer: %w", err) diff --git a/pkg/dump/writer/file_writer_test.go b/pkg/dump/writer/file_writer_test.go index 05960696a..f60b0a435 100644 --- a/pkg/dump/writer/file_writer_test.go +++ b/pkg/dump/writer/file_writer_test.go @@ -33,7 +33,8 @@ func TestFileWriter_Write(t *testing.T) { collector.FakeEndpoint("name2", dummyNamespace, []int32{int32(443)}), } - writer, err := NewFileWriter(ctx, tmpDir, fileNameK8sObject) + directoryOutput := path.Join(tmpDir, fileNameK8sObject) + writer, err := NewFileWriter(ctx, directoryOutput) if err != nil { t.Fatalf("failed to create file writer: %v", err) } diff --git a/pkg/dump/writer/tar_writer.go b/pkg/dump/writer/tar_writer.go index 48e67911f..89107a128 100644 --- a/pkg/dump/writer/tar_writer.go +++ b/pkg/dump/writer/tar_writer.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "os" - "path" "path/filepath" "sync" @@ -18,9 +17,8 @@ import ( ) const ( - TarWriterExtension = ".tar.gz" - TarWriterChmod = 0600 - TarTypeTag = "tar" + TarWriterChmod = 0600 + TarTypeTag = "tar" // Multi-threading the dump with one worker for each types // The number of workers is set to the number of differents entities (roles, pods, ...) @@ -40,8 +38,7 @@ type TarWriter struct { fsWriter *FSWriter } -func NewTarWriter(ctx context.Context, directoryPath string, resName string) (*TarWriter, error) { - tarPath := path.Join(directoryPath, fmt.Sprintf("%s%s", resName, TarWriterExtension)) +func NewTarWriter(ctx context.Context, tarPath string) (*TarWriter, error) { tarFile, err := createTarFile(tarPath) if err != nil { return nil, fmt.Errorf("failed to create tar file: %w", err) diff --git a/pkg/dump/writer/tar_writer_test.go b/pkg/dump/writer/tar_writer_test.go index 4b14a94f8..57c02aeac 100644 --- a/pkg/dump/writer/tar_writer_test.go +++ b/pkg/dump/writer/tar_writer_test.go @@ -52,7 +52,8 @@ func TestTarWriter_Write(t *testing.T) { vfsResourcePath2 := path.Join(dummyNamespace2, fileNameK8sObject) tarBundle[vfsResourcePath2] = dummyK8sObject2 - writer, err := NewTarWriter(ctx, tmpTarFileDir, fileNameK8sObject) + tarPath := path.Join(tmpTarFileDir, fileNameK8sObject) + writer, err := NewTarWriter(ctx, tarPath) if err != nil { t.Fatalf("failed to create file writer: %v", err) } diff --git a/pkg/dump/writer/writer.go b/pkg/dump/writer/writer.go index 58aba95bf..36535ad5c 100644 --- a/pkg/dump/writer/writer.go +++ b/pkg/dump/writer/writer.go @@ -2,6 +2,7 @@ package writer import ( "context" + "path" "github.com/DataDog/KubeHound/pkg/telemetry/log" ) @@ -31,9 +32,11 @@ func DumperWriterFactory(ctx context.Context, compression bool, directoryPath st // if compression is enabled, create the tar.gz file if compression { log.I.Infof("Compression enabled") + tarPath := path.Join(directoryPath, resultName) - return NewTarWriter(ctx, directoryPath, resultName) + return NewTarWriter(ctx, tarPath) } - return NewFileWriter(ctx, directoryPath, resultName) + // Output the result directly in the directory provided by the user + return NewFileWriter(ctx, directoryPath) } diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index d9eb05ce3..d59a180a3 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -5,10 +5,14 @@ import ( "errors" "fmt" "path/filepath" + "slices" + "strings" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump" "github.com/DataDog/KubeHound/pkg/ingestor" + grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb" "github.com/DataDog/KubeHound/pkg/ingestor/notifier" "github.com/DataDog/KubeHound/pkg/ingestor/puller" "github.com/DataDog/KubeHound/pkg/kubehound/graph" @@ -21,6 +25,7 @@ import ( "github.com/DataDog/KubeHound/pkg/telemetry/tag" gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver" "go.mongodb.org/mongo-driver/bson" + "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -29,7 +34,6 @@ type API interface { Notify(ctx context.Context, clusterName string, runID string) error } -//go:generate protoc --go_out=./pb --go_opt=paths=source_relative --go-grpc_out=./pb --go-grpc_opt=paths=source_relative api.proto type IngestorAPI struct { puller puller.DataPuller notifier notifier.Notifier @@ -52,6 +56,59 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif } } +// RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package +func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) { + // first level key are cluster names + directories, errRet := g.puller.ListFiles(ctx, "", false) + if errRet != nil { + return nil, errRet + } + + res := []*grpc.IngestedCluster{} + + for _, dir := range directories { + clusterName := strings.TrimSuffix(dir.Key, "/") + + dumpKeys, err := g.puller.ListFiles(ctx, clusterName, true) + if err != nil { + return nil, err + } + + if l := len(dumpKeys); l > 0 { + // extracting the latest runID + latestDump := slices.MaxFunc(dumpKeys, func(a, b *puller.ListObject) int { + // return dumpKeys[a].ModTime.Before(dumpKeys[b].ModTime) + return a.ModTime.Compare(b.ModTime) + }) + latestDumpIngestTime := latestDump.ModTime + latestDumpKey := latestDump.Key + + dumpResult, err := dump.ParsePath(latestDumpKey) + if err != nil { + errRet = errors.Join(errRet, fmt.Errorf("parsing dump path %s: %w", latestDumpKey, err)) + + continue + } + runID := dumpResult.RunID + + clusterErr := g.Ingest(ctx, clusterName, runID) + if clusterErr != nil { + errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s/%s: %w", clusterName, runID, clusterErr)) + } + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", clusterName, latestDumpIngestTime.Format("01-02-2006 15:04:05"), runID) + ingestedCluster := &grpc.IngestedCluster{ + ClusterName: clusterName, + RunId: runID, + Date: timestamppb.New(latestDumpIngestTime), + } + res = append(res, ingestedCluster) + + } + } + + return res, errRet +} + func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string) error { events.PushEvent( fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), @@ -82,10 +139,11 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string if err != nil { return err } - err = g.puller.Close(runCtx, archivePath) //nolint: contextcheck - if err != nil { - return err - } + + defer func() { + err = errors.Join(err, g.puller.Close(runCtx, archivePath)) + }() + err = g.puller.Extract(runCtx, archivePath) //nolint: contextcheck if err != nil { return err @@ -114,7 +172,10 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string if err != nil { return fmt.Errorf("collector client creation: %w", err) } - defer collect.Close(runCtx) //nolint: contextcheck + + defer func() { + err = errors.Join(err, collect.Close(runCtx)) + }() log.I.Infof("Loaded %s collector client", collect.Name()) err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID)) @@ -148,10 +209,11 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string } err = g.notifier.Notify(runCtx, clusterName, runID) //nolint: contextcheck if err != nil { - return err + return fmt.Errorf("notifying: %w", err) } - return nil + // returning err from the defer functions + return err } func (g *IngestorAPI) isAlreadyIngestedInGraph(_ context.Context, clusterName string, runID string) (bool, error) { diff --git a/pkg/ingestor/api/grpc/README.md b/pkg/ingestor/api/grpc/README.md index 8708cc0dd..a353493db 100644 --- a/pkg/ingestor/api/grpc/README.md +++ b/pkg/ingestor/api/grpc/README.md @@ -5,4 +5,9 @@ You can trigger a gRPC call by doing this: ```bash grpcurl -plaintext -format text -d 'cluster_name: "test", run_id: "id"' 127.0.0.1:9000 grpc.API.Ingest +``` + +Testing rehydrating of all latest scans: +```bash +grpcurl -plaintext -format text 127.0.0.1:9000 grpc.API.RehydrateLatest ``` \ No newline at end of file diff --git a/pkg/ingestor/api/grpc/api.proto b/pkg/ingestor/api/grpc/api.proto index 713a81113..f456f5a45 100644 --- a/pkg/ingestor/api/grpc/api.proto +++ b/pkg/ingestor/api/grpc/api.proto @@ -1,14 +1,28 @@ syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + package grpc; +option go_package = "./grpc"; message IngestRequest { string run_id = 1; string cluster_name = 2; } - message IngestResponse {} +message RehydrateLatestRequest {} +message IngestedCluster { + string cluster_name = 1; + string run_id = 2; + google.protobuf.Timestamp date = 3 ; +} +message RehydrateLatestResponse { + repeated IngestedCluster ingested_cluster = 1; +} + + service API { rpc Ingest (IngestRequest) returns (IngestResponse); + rpc RehydrateLatest (RehydrateLatestRequest) returns (RehydrateLatestResponse); } \ No newline at end of file diff --git a/pkg/ingestor/api/grpc/grpc.go b/pkg/ingestor/api/grpc/grpc.go index 049184131..0584a983c 100644 --- a/pkg/ingestor/api/grpc/grpc.go +++ b/pkg/ingestor/api/grpc/grpc.go @@ -14,6 +14,10 @@ import ( "google.golang.org/grpc/reflection" ) +// On macOS you need to install protobuf (`brew install protobuf`) +// Need to install: go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +//go:generate protoc --go_out=./pb --go_opt=paths=source_relative --go-grpc_out=./pb --go-grpc_opt=paths=source_relative ./api.proto + // server is used to implement the GRPC api type server struct { // grpc related embeds @@ -38,6 +42,20 @@ func (s *server) Ingest(ctx context.Context, in *pb.IngestRequest) (*pb.IngestRe return &pb.IngestResponse{}, nil } +// RehydrateLatest is just a GRPC wrapper around the RehydrateLatest method from the API package +func (s *server) RehydrateLatest(ctx context.Context, in *pb.RehydrateLatestRequest) (*pb.RehydrateLatestResponse, error) { + res, err := s.api.RehydrateLatest(ctx) + if err != nil { + log.I.Errorf("Ingest failed: %v", err) + + return nil, err + } + + return &pb.RehydrateLatestResponse{ + IngestedCluster: res, + }, nil +} + // Listen starts the GRPC server with the generic api implementation // It uses the config from the passed API for address and ports func Listen(ctx context.Context, api *api.IngestorAPI) error { diff --git a/pkg/ingestor/api/grpc/pb/api.pb.go b/pkg/ingestor/api/grpc/pb/api.pb.go index 7059176cb..446a1174d 100644 --- a/pkg/ingestor/api/grpc/pb/api.pb.go +++ b/pkg/ingestor/api/grpc/pb/api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0-devel -// protoc v3.14.0 +// protoc-gen-go v1.34.2 +// protoc v5.27.1 // source: api.proto package grpc @@ -9,6 +9,7 @@ package grpc import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -113,20 +114,191 @@ func (*IngestResponse) Descriptor() ([]byte, []int) { return file_api_proto_rawDescGZIP(), []int{1} } +type RehydrateLatestRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RehydrateLatestRequest) Reset() { + *x = RehydrateLatestRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RehydrateLatestRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RehydrateLatestRequest) ProtoMessage() {} + +func (x *RehydrateLatestRequest) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RehydrateLatestRequest.ProtoReflect.Descriptor instead. +func (*RehydrateLatestRequest) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{2} +} + +type IngestedCluster struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClusterName string `protobuf:"bytes,1,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"` + RunId string `protobuf:"bytes,2,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` + Date *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=date,proto3" json:"date,omitempty"` +} + +func (x *IngestedCluster) Reset() { + *x = IngestedCluster{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IngestedCluster) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IngestedCluster) ProtoMessage() {} + +func (x *IngestedCluster) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IngestedCluster.ProtoReflect.Descriptor instead. +func (*IngestedCluster) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{3} +} + +func (x *IngestedCluster) GetClusterName() string { + if x != nil { + return x.ClusterName + } + return "" +} + +func (x *IngestedCluster) GetRunId() string { + if x != nil { + return x.RunId + } + return "" +} + +func (x *IngestedCluster) GetDate() *timestamppb.Timestamp { + if x != nil { + return x.Date + } + return nil +} + +type RehydrateLatestResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IngestedCluster []*IngestedCluster `protobuf:"bytes,1,rep,name=ingested_cluster,json=ingestedCluster,proto3" json:"ingested_cluster,omitempty"` +} + +func (x *RehydrateLatestResponse) Reset() { + *x = RehydrateLatestResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RehydrateLatestResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RehydrateLatestResponse) ProtoMessage() {} + +func (x *RehydrateLatestResponse) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RehydrateLatestResponse.ProtoReflect.Descriptor instead. +func (*RehydrateLatestResponse) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{4} +} + +func (x *RehydrateLatestResponse) GetIngestedCluster() []*IngestedCluster { + if x != nil { + return x.IngestedCluster + } + return nil +} + var File_api_proto protoreflect.FileDescriptor var file_api_proto_rawDesc = []byte{ 0x0a, 0x09, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, - 0x63, 0x22, 0x49, 0x0a, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x10, 0x0a, 0x0e, - 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x3a, - 0x0a, 0x03, 0x41, 0x50, 0x49, 0x12, 0x33, 0x0a, 0x06, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x12, - 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x63, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0x49, 0x0a, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x10, 0x0a, + 0x0e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x18, 0x0a, 0x16, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, + 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x7b, 0x0a, 0x0f, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, + 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x04, 0x64, 0x61, 0x74, 0x65, 0x22, 0x5b, 0x0a, 0x17, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, + 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x40, 0x0a, 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x5f, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x0f, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x32, 0x8a, 0x01, 0x0a, 0x03, 0x41, 0x50, 0x49, 0x12, 0x33, 0x0a, 0x06, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x4e, 0x0a, 0x0f, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x68, 0x79, 0x64, + 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, + 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } @@ -142,19 +314,27 @@ func file_api_proto_rawDescGZIP() []byte { return file_api_proto_rawDescData } -var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_api_proto_goTypes = []interface{}{ - (*IngestRequest)(nil), // 0: grpc.IngestRequest - (*IngestResponse)(nil), // 1: grpc.IngestResponse +var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_api_proto_goTypes = []any{ + (*IngestRequest)(nil), // 0: grpc.IngestRequest + (*IngestResponse)(nil), // 1: grpc.IngestResponse + (*RehydrateLatestRequest)(nil), // 2: grpc.RehydrateLatestRequest + (*IngestedCluster)(nil), // 3: grpc.IngestedCluster + (*RehydrateLatestResponse)(nil), // 4: grpc.RehydrateLatestResponse + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp } var file_api_proto_depIdxs = []int32{ - 0, // 0: grpc.API.Ingest:input_type -> grpc.IngestRequest - 1, // 1: grpc.API.Ingest:output_type -> grpc.IngestResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 5, // 0: grpc.IngestedCluster.date:type_name -> google.protobuf.Timestamp + 3, // 1: grpc.RehydrateLatestResponse.ingested_cluster:type_name -> grpc.IngestedCluster + 0, // 2: grpc.API.Ingest:input_type -> grpc.IngestRequest + 2, // 3: grpc.API.RehydrateLatest:input_type -> grpc.RehydrateLatestRequest + 1, // 4: grpc.API.Ingest:output_type -> grpc.IngestResponse + 4, // 5: grpc.API.RehydrateLatest:output_type -> grpc.RehydrateLatestResponse + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_api_proto_init() } @@ -163,7 +343,7 @@ func file_api_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_api_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*IngestRequest); i { case 0: return &v.state @@ -175,7 +355,7 @@ func file_api_proto_init() { return nil } } - file_api_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*IngestResponse); i { case 0: return &v.state @@ -187,6 +367,42 @@ func file_api_proto_init() { return nil } } + file_api_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*RehydrateLatestRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[3].Exporter = func(v any, i int) any { + switch v := v.(*IngestedCluster); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*RehydrateLatestResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -194,7 +410,7 @@ func file_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/ingestor/api/grpc/pb/api_grpc.pb.go b/pkg/ingestor/api/grpc/pb/api_grpc.pb.go index b0c227a15..951210160 100644 --- a/pkg/ingestor/api/grpc/pb/api_grpc.pb.go +++ b/pkg/ingestor/api/grpc/pb/api_grpc.pb.go @@ -1,4 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.4.0 +// - protoc v5.27.1 +// source: api.proto package grpc @@ -11,14 +15,20 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 + +const ( + API_Ingest_FullMethodName = "/grpc.API/Ingest" + API_RehydrateLatest_FullMethodName = "/grpc.API/RehydrateLatest" +) // APIClient is the client API for API service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type APIClient interface { Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) + RehydrateLatest(ctx context.Context, in *RehydrateLatestRequest, opts ...grpc.CallOption) (*RehydrateLatestResponse, error) } type aPIClient struct { @@ -30,8 +40,19 @@ func NewAPIClient(cc grpc.ClientConnInterface) APIClient { } func (c *aPIClient) Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(IngestResponse) - err := c.cc.Invoke(ctx, "/grpc.API/Ingest", in, out, opts...) + err := c.cc.Invoke(ctx, API_Ingest_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *aPIClient) RehydrateLatest(ctx context.Context, in *RehydrateLatestRequest, opts ...grpc.CallOption) (*RehydrateLatestResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RehydrateLatestResponse) + err := c.cc.Invoke(ctx, API_RehydrateLatest_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -43,6 +64,7 @@ func (c *aPIClient) Ingest(ctx context.Context, in *IngestRequest, opts ...grpc. // for forward compatibility type APIServer interface { Ingest(context.Context, *IngestRequest) (*IngestResponse, error) + RehydrateLatest(context.Context, *RehydrateLatestRequest) (*RehydrateLatestResponse, error) mustEmbedUnimplementedAPIServer() } @@ -53,6 +75,9 @@ type UnimplementedAPIServer struct { func (UnimplementedAPIServer) Ingest(context.Context, *IngestRequest) (*IngestResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Ingest not implemented") } +func (UnimplementedAPIServer) RehydrateLatest(context.Context, *RehydrateLatestRequest) (*RehydrateLatestResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RehydrateLatest not implemented") +} func (UnimplementedAPIServer) mustEmbedUnimplementedAPIServer() {} // UnsafeAPIServer may be embedded to opt out of forward compatibility for this service. @@ -76,7 +101,7 @@ func _API_Ingest_Handler(srv interface{}, ctx context.Context, dec func(interfac } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.API/Ingest", + FullMethod: API_Ingest_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).Ingest(ctx, req.(*IngestRequest)) @@ -84,6 +109,24 @@ func _API_Ingest_Handler(srv interface{}, ctx context.Context, dec func(interfac return interceptor(ctx, in, info, handler) } +func _API_RehydrateLatest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RehydrateLatestRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(APIServer).RehydrateLatest(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: API_RehydrateLatest_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(APIServer).RehydrateLatest(ctx, req.(*RehydrateLatestRequest)) + } + return interceptor(ctx, in, info, handler) +} + // API_ServiceDesc is the grpc.ServiceDesc for API service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -95,6 +138,10 @@ var API_ServiceDesc = grpc.ServiceDesc{ MethodName: "Ingest", Handler: _API_Ingest_Handler, }, + { + MethodName: "RehydrateLatest", + Handler: _API_RehydrateLatest_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "api.proto", diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 1b8775239..f87a3c9c5 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -5,13 +5,13 @@ import ( "context" "errors" "fmt" + "io" "net/url" "os" "path/filepath" "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/dump" - "github.com/DataDog/KubeHound/pkg/dump/writer" "github.com/DataDog/KubeHound/pkg/ingestor/puller" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" @@ -50,10 +50,6 @@ func NewBlobStorage(cfg *config.KubehoundConfig, blobConfig *config.BlobConfig) }, nil } -func getKeyPath(clusterName, runID string) string { - return fmt.Sprintf("%s%s", dump.DumpIngestorResultName(clusterName, runID), writer.TarWriterExtension) -} - func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { urlStruct, err := url.Parse(bs.bucketName) if err != nil { @@ -97,6 +93,42 @@ func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { return bucket, nil } +func (bs *BlobStore) listFiles(ctx context.Context, b *blob.Bucket, prefix string, recursive bool, listObjects []*puller.ListObject) ([]*puller.ListObject, error) { + iter := b.List(&blob.ListOptions{ + Delimiter: "/", + Prefix: prefix, + }) + for { + obj, err := iter.Next(ctx) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("listing objects: %w", err) + } + + if obj.IsDir && recursive { + listObjects, _ = bs.listFiles(ctx, b, obj.Key, true, listObjects) + } + listObjects = append(listObjects, &puller.ListObject{ + Key: obj.Key, + ModTime: obj.ModTime, + }) + } + + return listObjects, nil +} + +func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive bool) ([]*puller.ListObject, error) { + b, err := bs.openBucket(ctx) + if err != nil { + return nil, err + } + listObjects := []*puller.ListObject{} + + return bs.listFiles(ctx, b, prefix, recursive, listObjects) +} + // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error { log.I.Infof("Pulling data from blob store bucket %s, %s, %s", bs.bucketName, clusterName, runID) @@ -104,8 +136,12 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName var err error defer func() { spanPut.Finish(tracer.WithError(err)) }() - key := getKeyPath(clusterName, runID) - log.I.Infof("Downloading archive (%s) from blob store", key) + dumpResult, err := dump.NewDumpResult(clusterName, runID, true) + if err != nil { + return err + } + key := dumpResult.GetFullPath() + log.I.Infof("Opening bucket: %s", bs.bucketName) b, err := bs.openBucket(ctx) if err != nil { return err @@ -118,7 +154,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName } defer f.Close() - log.I.Infof("Downloading archive (%q) from blob store", key) + log.I.Infof("Uploading archive (%q) from blob store", key) w := bufio.NewReader(f) err = b.Upload(ctx, key, w, &blob.WriterOptions{ ContentType: "application/gzip", @@ -142,8 +178,12 @@ func (bs *BlobStore) Pull(outer context.Context, clusterName string, runID strin var err error defer func() { spanPull.Finish(tracer.WithError(err)) }() - key := getKeyPath(clusterName, runID) - log.I.Infof("Downloading archive (%s) from blob store", key) + dumpResult, err := dump.NewDumpResult(clusterName, runID, true) + if err != nil { + return "", err + } + key := dumpResult.GetFullPath() + log.I.Infof("Opening bucket: %s", bs.bucketName) b, err := bs.openBucket(ctx) if err != nil { return "", err @@ -189,7 +229,7 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { defer func() { spanExtract.Finish(tracer.WithError(err)) }() basePath := filepath.Dir(archivePath) - err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir) + err = puller.CheckSanePath(archivePath, basePath) if err != nil { return fmt.Errorf("Dangerous file path used during extraction, aborting: %w", err) } @@ -210,7 +250,7 @@ func (bs *BlobStore) Close(ctx context.Context, archivePath string) error { var err error defer func() { spanClose.Finish(tracer.WithError(err)) }() - path := filepath.Base(archivePath) + path := filepath.Dir(archivePath) err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir) if err != nil { return fmt.Errorf("Dangerous file path used while closing, aborting: %w", err) diff --git a/pkg/ingestor/puller/blob/blob_test.go b/pkg/ingestor/puller/blob/blob_test.go new file mode 100644 index 000000000..c41e170ec --- /dev/null +++ b/pkg/ingestor/puller/blob/blob_test.go @@ -0,0 +1,525 @@ +package blob + +import ( + "context" + "fmt" + "os" + "path" + "reflect" + "regexp" + "testing" + "time" + + "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump" + "github.com/DataDog/KubeHound/pkg/ingestor/puller" + _ "gocloud.dev/blob/azureblob" + _ "gocloud.dev/blob/fileblob" + _ "gocloud.dev/blob/gcsblob" + _ "gocloud.dev/blob/memblob" +) + +const ( + bucketName = "file://./testdata/fakeBlobStorage" + tempFileRegexp = `.*/testdata/tmpdir/kh-[0-9]+/archive.tar.gz` + validRunID = "01j2qs8th6yarr5hkafysekn0j" + validClusterName = "cluster1.k8s.local" +) + +func getTempDir(t *testing.T) string { + t.Helper() + + return getAbsPath(t, "testdata/tmpdir") +} + +func getAbsPath(t *testing.T, filepath string) string { + t.Helper() + // Get current working directory to pass SaneCheckPath() + pwd, err := os.Getwd() + if err != nil { + t.Errorf("Error getting current working directory: %v", err) + + return "" + } + + return path.Join(pwd, filepath) +} + +func dummyKubehoundConfig(t *testing.T) *config.KubehoundConfig { + t.Helper() + + return &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + } +} + +func TestBlobStore_ListFiles(t *testing.T) { + t.Parallel() + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + prefix string + recursive bool + } + tests := []struct { + name string + fields fields + args args + want []*puller.ListObject + wantErr bool + }{ + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/", + }, + { + Key: "cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster2.k8s.local/", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "cluster1.k8s.local", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "cluster1.k8s.local/", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: false, + prefix: "", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/", + }, + { + Key: "cluster2.k8s.local/", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + got, err := bs.ListFiles(ctx, tt.args.prefix, tt.args.recursive) + if (err != nil) != tt.wantErr { + t.Errorf("BlobStore.ListFiles() error = %v, wantErr %v", err, tt.wantErr) + + return + } + + // Reset modtime to avoid comparison issues + for _, v := range got { + v.ModTime = time.Time{} + } + + if !reflect.DeepEqual(got, tt.want) { + for i, v := range got { + t.Logf("Got: %d: %s", i, v.Key) + } + t.Errorf("BlobStore.ListFiles() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBlobStore_Pull(t *testing.T) { + t.Parallel() + + // Get current working directory to pass SaneCheckPath() + pwd, err := os.Getwd() + if err != nil { + t.Errorf("Error getting current working directory: %v", err) + + return + } + tmpDir := path.Join(pwd, "testdata/tmpdir") + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + clusterName string + runID string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Pulling file successfully", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: tmpDir, + }, + }, + }, + args: args{ + clusterName: validClusterName, + runID: validRunID, + }, + wantErr: false, + }, + { + name: "Empty tmp dir", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{}, + }, + }, + args: args{ + clusterName: validClusterName, + runID: validRunID, + }, + wantErr: true, + }, + { + name: "Wrong cluster name", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: tmpDir, + }, + }, + }, + args: args{ + clusterName: "cluster4.k8s.local", + runID: validRunID, + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + got, err := bs.Pull(ctx, tt.args.clusterName, tt.args.runID) + if (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Pull() error = %v, wantErr %v", err, tt.wantErr) + + return + } + + // No path was returned so no need to go further + if got == "" { + return + } + + re := regexp.MustCompile(tempFileRegexp) + if !re.MatchString(got) { + t.Errorf("Path is not valid() = %q, should respect %v", got, tempFileRegexp) + + return + } + + err = bs.Close(ctx, got) + if err != nil { + t.Errorf("bs.Close() error = %v", err) + } + }) + } +} + +func TestNewBlobStorage(t *testing.T) { + t.Parallel() + type args struct { + cfg *config.KubehoundConfig + blobConfig *config.BlobConfig + } + tests := []struct { + name string + args args + want *BlobStore + wantErr bool + }{ + { + name: "empty bucket name", + args: args{ + blobConfig: &config.BlobConfig{ + Bucket: "", + }, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + }, + wantErr: true, + }, + { + name: "valid blob storage", + args: args{ + blobConfig: &config.BlobConfig{ + Bucket: "fakeBlobStorage", + }, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + }, + want: &BlobStore{ + bucketName: "fakeBlobStorage", + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + region: "", + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := NewBlobStorage(tt.args.cfg, tt.args.blobConfig) + if (err != nil) != tt.wantErr { + t.Errorf("NewBlobStorage() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewBlobStorage() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBlobStore_Put(t *testing.T) { + t.Parallel() + fakeBlobStoragePut := "./testdata/fakeBlobStoragePut" + bucketNamePut := fmt.Sprintf("file://%s", fakeBlobStoragePut) + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + archivePath string + clusterName string + runID string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Push new dump", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0j", + }, + wantErr: false, + }, + { + name: "non existing filepath", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive2.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0j", + }, + wantErr: true, + }, + { + name: "invalid runID", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive2.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0T", + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + var err error + if err = bs.Put(ctx, tt.args.archivePath, tt.args.clusterName, tt.args.runID); (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Put() error = %v, wantErr %v", err, tt.wantErr) + } + + if err != nil { + return + } + + // Building result path to clean up + dumpResult, err := dump.NewDumpResult(tt.args.clusterName, tt.args.runID, true) + if err != nil { + t.Errorf("NewDumpResult cluster:%s runID:%s", tt.args.clusterName, tt.args.runID) + } + key := path.Join(bs.cfg.Ingestor.TempDir, dumpResult.GetFullPath()) + + err = os.RemoveAll(path.Join(fakeBlobStoragePut, tt.args.clusterName)) + if err != nil { + t.Errorf("Error removing file %s.attrs: %v", key, err) + } + }) + } +} + +func TestBlobStore_Extract(t *testing.T) { + t.Parallel() + fakeBlobStoragePut := "./testdata/fakeBlobStorageExtract" + bucketNameExtract := fmt.Sprintf("file://%s", fakeBlobStoragePut) + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + archivePath string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "invalid runID", + fields: fields{ + bucketName: bucketNameExtract, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: getAbsPath(t, "testdata/archive.tar.gz"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + if err := bs.Extract(ctx, tt.args.archivePath); (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Extract() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/pkg/ingestor/puller/blob/testdata/archive.tar.gz b/pkg/ingestor/puller/blob/testdata/archive.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/archive.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 000000000..5b159e310 Binary files /dev/null and b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz differ diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStoragePut/.keep b/pkg/ingestor/puller/blob/testdata/fakeBlobStoragePut/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/ingestor/puller/blob/testdata/tmpdir/.keep b/pkg/ingestor/puller/blob/testdata/tmpdir/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/ingestor/puller/mocks/mock_puller.go b/pkg/ingestor/puller/mocks/mock_puller.go index fafb3074c..6599ffa29 100644 --- a/pkg/ingestor/puller/mocks/mock_puller.go +++ b/pkg/ingestor/puller/mocks/mock_puller.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + puller "github.com/DataDog/KubeHound/pkg/ingestor/puller" mock "github.com/stretchr/testify/mock" ) @@ -107,6 +108,62 @@ func (_c *DataPuller_Extract_Call) RunAndReturn(run func(context.Context, string return _c } +// ListFiles provides a mock function with given fields: ctx, prefix, recursive +func (_m *DataPuller) ListFiles(ctx context.Context, prefix string, recursive bool) ([]*puller.ListObject, error) { + ret := _m.Called(ctx, prefix, recursive) + + var r0 []*puller.ListObject + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, bool) ([]*puller.ListObject, error)); ok { + return rf(ctx, prefix, recursive) + } + if rf, ok := ret.Get(0).(func(context.Context, string, bool) []*puller.ListObject); ok { + r0 = rf(ctx, prefix, recursive) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*puller.ListObject) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok { + r1 = rf(ctx, prefix, recursive) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataPuller_ListFiles_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListFiles' +type DataPuller_ListFiles_Call struct { + *mock.Call +} + +// ListFiles is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +// - recursive bool +func (_e *DataPuller_Expecter) ListFiles(ctx interface{}, prefix interface{}, recursive interface{}) *DataPuller_ListFiles_Call { + return &DataPuller_ListFiles_Call{Call: _e.mock.On("ListFiles", ctx, prefix, recursive)} +} + +func (_c *DataPuller_ListFiles_Call) Run(run func(ctx context.Context, prefix string, recursive bool)) *DataPuller_ListFiles_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(bool)) + }) + return _c +} + +func (_c *DataPuller_ListFiles_Call) Return(_a0 []*puller.ListObject, _a1 error) *DataPuller_ListFiles_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DataPuller_ListFiles_Call) RunAndReturn(run func(context.Context, string, bool) ([]*puller.ListObject, error)) *DataPuller_ListFiles_Call { + _c.Call.Return(run) + return _c +} + // Pull provides a mock function with given fields: ctx, clusterName, runID func (_m *DataPuller) Pull(ctx context.Context, clusterName string, runID string) (string, error) { ret := _m.Called(ctx, clusterName, runID) diff --git a/pkg/ingestor/puller/puller.go b/pkg/ingestor/puller/puller.go index 54c0be13a..f95216b9a 100644 --- a/pkg/ingestor/puller/puller.go +++ b/pkg/ingestor/puller/puller.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/DataDog/KubeHound/pkg/telemetry/log" ) @@ -19,16 +20,19 @@ type DataPuller interface { Pull(ctx context.Context, clusterName string, runID string) (string, error) Extract(ctx context.Context, archivePath string) error Close(ctx context.Context, basePath string) error + ListFiles(ctx context.Context, prefix string, recursive bool) ([]*ListObject, error) } -func FormatArchiveKey(clusterName string, runID string, archiveName string) string { - return strings.Join([]string{clusterName, runID, archiveName}, "/") +type ListObject struct { + Key string + // ModTime is the time the blob was last modified. + ModTime time.Time } // checkSanePath just to make sure we don't delete or overwrite somewhere where we are not supposed to func CheckSanePath(path string, baseFolder string) error { if path == "/" || path == "" || !strings.HasPrefix(path, baseFolder) { - return fmt.Errorf("Invalid path provided: %q", path) + return fmt.Errorf("Invalid path provided: %q / base: %q", path, baseFolder) } return nil diff --git a/pkg/ingestor/puller/puller_test.go b/pkg/ingestor/puller/puller_test.go index 0d36977df..0d47aeef6 100644 --- a/pkg/ingestor/puller/puller_test.go +++ b/pkg/ingestor/puller/puller_test.go @@ -142,3 +142,69 @@ func TestExtractTarGz(t *testing.T) { }) } } + +func TestIsTarGz(t *testing.T) { + t.Parallel() + type args struct { + filePath string + maxArchiveSize int64 + } + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + { + name: "dump result compressed", + args: args{ + maxArchiveSize: 10000000, + filePath: "./testdata/archive.tar.gz", + }, + want: true, + wantErr: false, + }, + { + name: "Unsupported file type", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/regenerate-testdata.sh", + }, + want: false, + wantErr: true, + }, + { + name: "wrong path", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/doesnotexist.tar.gz", + }, + want: false, + wantErr: true, + }, + { + name: "dump result not compressed - directory", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/", + }, + want: false, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := IsTarGz(tt.args.filePath, tt.args.maxArchiveSize) + if (err != nil) != tt.wantErr { + t.Errorf("IsTarGz() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if got != tt.want { + t.Errorf("IsTarGz() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/kubehound/core/core_grpc_client.go b/pkg/kubehound/core/core_grpc_client.go index 8790d428b..40e387e5d 100644 --- a/pkg/kubehound/core/core_grpc_client.go +++ b/pkg/kubehound/core/core_grpc_client.go @@ -13,7 +13,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName string, runID string) error { +func getGrpcConn(ingestorConfig config.IngestorConfig) (*grpc.ClientConn, error) { var dialOpt grpc.DialOption if ingestorConfig.API.Insecure { dialOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -25,13 +25,20 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } - log.I.Infof("Launching ingestion on %s [%s:%s]", ingestorConfig.API.Endpoint, clusteName, runID) conn, err := grpc.NewClient(ingestorConfig.API.Endpoint, dialOpt) if err != nil { - return fmt.Errorf("connect %s: %w", ingestorConfig.API.Endpoint, err) + return nil, fmt.Errorf("connect %s: %w", ingestorConfig.API.Endpoint, err) } - defer conn.Close() + return conn, nil +} + +func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName string, runID string) error { + conn, err := getGrpcConn(ingestorConfig) + if err != nil { + return fmt.Errorf("getGrpcClient: %w", err) + } + defer conn.Close() client := pb.NewAPIClient(conn) _, err = client.Ingest(context.Background(), &pb.IngestRequest{ @@ -44,3 +51,24 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin return nil } + +func CoreClientGRPCRehydrateLatest(ingestorConfig config.IngestorConfig) error { + conn, err := getGrpcConn(ingestorConfig) + if err != nil { + return fmt.Errorf("getGrpcClient: %w", err) + } + defer conn.Close() + client := pb.NewAPIClient(conn) + + log.I.Infof("Launching rehydratation on %s [latest]", ingestorConfig.API.Endpoint) + results, err := client.RehydrateLatest(context.Background(), &pb.RehydrateLatestRequest{}) + if err != nil { + return fmt.Errorf("call rehydratation (latest): %w", err) + } + + for _, res := range results.IngestedCluster { + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", res.ClusterName, res.Date.AsTime().Format("01-02-2006 15:04:05"), res.RunId) + } + + return nil +}