From 95c42e5435871a73bacaaf751b92222dcc0eaa71 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:17:49 +0200 Subject: [PATCH 01/22] adding rehydrate command --- cmd/kubehound/ingest.go | 23 +++++++++++++++++++++++ pkg/cmd/dump.go | 4 ++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/cmd/kubehound/ingest.go b/cmd/kubehound/ingest.go index 8e5e94337..47a56c7a0 100644 --- a/cmd/kubehound/ingest.go +++ b/cmd/kubehound/ingest.go @@ -59,6 +59,27 @@ var ( return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID) }, } + + remoteIngestRehydrateCmd = &cobra.Command{ + Use: "rehydrate", + Short: "Rehydrate snapshot previously dumped on a KHaaS instance", + Long: `Run an rehydratation on KHaaS from a bucket to build the attack path`, + PreRunE: func(cobraCmd *cobra.Command, args []string) error { + viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck + viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck + + return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true) + }, + RunE: func(cobraCmd *cobra.Command, args []string) error { + // Passing the Kubehound config from viper + khCfg, err := cmd.GetConfig() + if err != nil { + return fmt.Errorf("get config: %w", err) + } + + return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor) + }, + } ) func init() { @@ -70,5 +91,7 @@ func init() { ingestCmd.AddCommand(remoteIngestCmd) cmd.InitRemoteIngestCmd(remoteIngestCmd, true) + remoteIngestCmd.AddCommand(remoteIngestRehydrateCmd) + rootCmd.AddCommand(ingestCmd) } diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index 3e756f05b..76e5ef8fc 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -63,8 +63,8 @@ func InitRemoteDumpCmd(cmd *cobra.Command) { func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { - cmd.Flags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)") - cmd.Flags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint") + cmd.PersistentFlags().String("khaas-server", "", "GRPC endpoint exposed by KubeHound as a Service (KHaaS) server (e.g.: localhost:9000)") + cmd.PersistentFlags().Bool("insecure", config.DefaultIngestorAPIInsecure, "Allow insecure connection to the KHaaS server grpc endpoint") // IngestorAPIEndpoint if standalone { From 2a0b6c9bc0cc4b3b108de81fb66e3f5526f3b43b Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:20:02 +0200 Subject: [PATCH 02/22] adding list method in blob/puller --- pkg/ingestor/puller/blob/blob.go | 36 ++++++++++++++++++++++++++++++++ pkg/ingestor/puller/puller.go | 8 +++++++ 2 files changed, 44 insertions(+) diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 1b8775239..7890ab19b 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net/url" "os" "path/filepath" @@ -97,6 +98,41 @@ func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { return bucket, nil } +func (bs *BlobStore) listFiles(ctx context.Context, b *blob.Bucket, prefix string, recursive bool) ([]*puller.ListObject, error) { + objs := []*puller.ListObject{} + iter := b.List(&blob.ListOptions{ + Delimiter: "/", + Prefix: prefix, + }) + for { + obj, err := iter.Next(ctx) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + return nil, fmt.Errorf("listing objects: %w", err) + } + if obj.IsDir && recursive { + objs, _ = bs.listFiles(ctx, b, obj.Key, true) + } + objs = append(objs, &puller.ListObject{ + Key: obj.Key, + ModTime: obj.ModTime, + }) + } + + return objs, nil +} + +func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive bool) ([]*puller.ListObject, error) { + b, err := bs.openBucket(ctx) + if err != nil { + return nil, err + } + + return bs.listFiles(ctx, b, prefix, recursive) +} + // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error { log.I.Infof("Pulling data from blob store bucket %s, %s, %s", bs.bucketName, clusterName, runID) diff --git a/pkg/ingestor/puller/puller.go b/pkg/ingestor/puller/puller.go index 54c0be13a..33746ceec 100644 --- a/pkg/ingestor/puller/puller.go +++ b/pkg/ingestor/puller/puller.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/DataDog/KubeHound/pkg/telemetry/log" ) @@ -19,6 +20,13 @@ type DataPuller interface { Pull(ctx context.Context, clusterName string, runID string) (string, error) Extract(ctx context.Context, archivePath string) error Close(ctx context.Context, basePath string) error + ListFiles(ctx context.Context, prefix string, recursive bool) ([]*ListObject, error) +} + +type ListObject struct { + Key string + // ModTime is the time the blob was last modified. + ModTime time.Time } func FormatArchiveKey(clusterName string, runID string, archiveName string) string { From 50c117246e441c65322744d70b927c71cdaeeb47 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:21:53 +0200 Subject: [PATCH 03/22] grpc rehydratelatest method --- pkg/ingestor/api/api.go | 52 +++++++++++++++++++++++++- pkg/ingestor/api/grpc/README.md | 5 +++ pkg/ingestor/api/grpc/grpc.go | 14 +++++++ pkg/kubehound/core/core_grpc_client.go | 36 ++++++++++++++++-- 4 files changed, 102 insertions(+), 5 deletions(-) diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index d9eb05ce3..a1b8dd43d 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -5,10 +5,14 @@ import ( "errors" "fmt" "path/filepath" + "sort" + "strings" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump/writer" "github.com/DataDog/KubeHound/pkg/ingestor" + grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb" "github.com/DataDog/KubeHound/pkg/ingestor/notifier" "github.com/DataDog/KubeHound/pkg/ingestor/puller" "github.com/DataDog/KubeHound/pkg/kubehound/graph" @@ -21,6 +25,7 @@ import ( "github.com/DataDog/KubeHound/pkg/telemetry/tag" gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver" "go.mongodb.org/mongo-driver/bson" + "google.golang.org/protobuf/types/known/timestamppb" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -29,7 +34,6 @@ type API interface { Notify(ctx context.Context, clusterName string, runID string) error } -//go:generate protoc --go_out=./pb --go_opt=paths=source_relative --go-grpc_out=./pb --go-grpc_opt=paths=source_relative api.proto type IngestorAPI struct { puller puller.DataPuller notifier notifier.Notifier @@ -52,6 +56,52 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif } } +// RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package +func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) { + // first level key are cluster names + directories, errRet := g.puller.ListFiles(ctx, "", false) + if errRet != nil { + return nil, errRet + } + + res := []*grpc.IngestedCluster{} + + for _, dir := range directories { + clusterName := strings.TrimSuffix(dir.Key, "/") + + dumpKeys, err := g.puller.ListFiles(ctx, clusterName, true) + if err != nil { + return nil, err + } + + if l := len(dumpKeys); l > 0 { + // extracting the latest runID + sort.Slice(dumpKeys, func(a, b int) bool { + return dumpKeys[a].ModTime.Before(dumpKeys[b].ModTime) + }) + ingestTime := dumpKeys[l-1].ModTime + prefix := fmt.Sprintf("%s/kubehound_%s_", clusterName, clusterName) + runID := strings.TrimPrefix(dumpKeys[l-1].Key, prefix) + runID = strings.TrimSuffix(runID, writer.TarWriterExtension) + + clusterErr := g.Ingest(ctx, clusterName, runID) + if err != nil { + errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s/%s: %w", clusterName, runID, clusterErr)) + } + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", clusterName, ingestTime.Format("01-02-2006 15:04:05"), runID) + ingestedCluster := &grpc.IngestedCluster{ + ClusterName: clusterName, + RunId: runID, + Date: timestamppb.New(ingestTime), + } + res = append(res, ingestedCluster) + + } + } + + return res, errRet +} + func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string) error { events.PushEvent( fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), diff --git a/pkg/ingestor/api/grpc/README.md b/pkg/ingestor/api/grpc/README.md index 8708cc0dd..a353493db 100644 --- a/pkg/ingestor/api/grpc/README.md +++ b/pkg/ingestor/api/grpc/README.md @@ -5,4 +5,9 @@ You can trigger a gRPC call by doing this: ```bash grpcurl -plaintext -format text -d 'cluster_name: "test", run_id: "id"' 127.0.0.1:9000 grpc.API.Ingest +``` + +Testing rehydrating of all latest scans: +```bash +grpcurl -plaintext -format text 127.0.0.1:9000 grpc.API.RehydrateLatest ``` \ No newline at end of file diff --git a/pkg/ingestor/api/grpc/grpc.go b/pkg/ingestor/api/grpc/grpc.go index 049184131..3e7c4b26d 100644 --- a/pkg/ingestor/api/grpc/grpc.go +++ b/pkg/ingestor/api/grpc/grpc.go @@ -38,6 +38,20 @@ func (s *server) Ingest(ctx context.Context, in *pb.IngestRequest) (*pb.IngestRe return &pb.IngestResponse{}, nil } +// RehydrateLatest is just a GRPC wrapper around the RehydrateLatest method from the API package +func (s *server) RehydrateLatest(ctx context.Context, in *pb.RehydrateLatestRequest) (*pb.RehydrateLatestResponse, error) { + res, err := s.api.RehydrateLatest(ctx) + if err != nil { + log.I.Errorf("Ingest failed: %v", err) + + return nil, err + } + + return &pb.RehydrateLatestResponse{ + IngestedCluster: res, + }, nil +} + // Listen starts the GRPC server with the generic api implementation // It uses the config from the passed API for address and ports func Listen(ctx context.Context, api *api.IngestorAPI) error { diff --git a/pkg/kubehound/core/core_grpc_client.go b/pkg/kubehound/core/core_grpc_client.go index 8790d428b..40e387e5d 100644 --- a/pkg/kubehound/core/core_grpc_client.go +++ b/pkg/kubehound/core/core_grpc_client.go @@ -13,7 +13,7 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName string, runID string) error { +func getGrpcConn(ingestorConfig config.IngestorConfig) (*grpc.ClientConn, error) { var dialOpt grpc.DialOption if ingestorConfig.API.Insecure { dialOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) @@ -25,13 +25,20 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin dialOpt = grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)) } - log.I.Infof("Launching ingestion on %s [%s:%s]", ingestorConfig.API.Endpoint, clusteName, runID) conn, err := grpc.NewClient(ingestorConfig.API.Endpoint, dialOpt) if err != nil { - return fmt.Errorf("connect %s: %w", ingestorConfig.API.Endpoint, err) + return nil, fmt.Errorf("connect %s: %w", ingestorConfig.API.Endpoint, err) } - defer conn.Close() + return conn, nil +} + +func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName string, runID string) error { + conn, err := getGrpcConn(ingestorConfig) + if err != nil { + return fmt.Errorf("getGrpcClient: %w", err) + } + defer conn.Close() client := pb.NewAPIClient(conn) _, err = client.Ingest(context.Background(), &pb.IngestRequest{ @@ -44,3 +51,24 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin return nil } + +func CoreClientGRPCRehydrateLatest(ingestorConfig config.IngestorConfig) error { + conn, err := getGrpcConn(ingestorConfig) + if err != nil { + return fmt.Errorf("getGrpcClient: %w", err) + } + defer conn.Close() + client := pb.NewAPIClient(conn) + + log.I.Infof("Launching rehydratation on %s [latest]", ingestorConfig.API.Endpoint) + results, err := client.RehydrateLatest(context.Background(), &pb.RehydrateLatestRequest{}) + if err != nil { + return fmt.Errorf("call rehydratation (latest): %w", err) + } + + for _, res := range results.IngestedCluster { + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", res.ClusterName, res.Date.AsTime().Format("01-02-2006 15:04:05"), res.RunId) + } + + return nil +} From d87714fb588811f6f4cdf1c0874c5837c79f2d80 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:22:14 +0200 Subject: [PATCH 04/22] rehydrate new proto --- pkg/ingestor/api/grpc/api.proto | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/pkg/ingestor/api/grpc/api.proto b/pkg/ingestor/api/grpc/api.proto index 713a81113..f456f5a45 100644 --- a/pkg/ingestor/api/grpc/api.proto +++ b/pkg/ingestor/api/grpc/api.proto @@ -1,14 +1,28 @@ syntax = "proto3"; +import "google/protobuf/timestamp.proto"; + package grpc; +option go_package = "./grpc"; message IngestRequest { string run_id = 1; string cluster_name = 2; } - message IngestResponse {} +message RehydrateLatestRequest {} +message IngestedCluster { + string cluster_name = 1; + string run_id = 2; + google.protobuf.Timestamp date = 3 ; +} +message RehydrateLatestResponse { + repeated IngestedCluster ingested_cluster = 1; +} + + service API { rpc Ingest (IngestRequest) returns (IngestResponse); + rpc RehydrateLatest (RehydrateLatestRequest) returns (RehydrateLatestResponse); } \ No newline at end of file From 4d34244cf3aac0047b0f070b45926a7f8f04b720 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:22:40 +0200 Subject: [PATCH 05/22] generated grpc --- pkg/ingestor/api/grpc/grpc.go | 4 + pkg/ingestor/api/grpc/pb/api.pb.go | 268 +++++++++++++++++++++--- pkg/ingestor/api/grpc/pb/api_grpc.pb.go | 55 ++++- 3 files changed, 297 insertions(+), 30 deletions(-) diff --git a/pkg/ingestor/api/grpc/grpc.go b/pkg/ingestor/api/grpc/grpc.go index 3e7c4b26d..0584a983c 100644 --- a/pkg/ingestor/api/grpc/grpc.go +++ b/pkg/ingestor/api/grpc/grpc.go @@ -14,6 +14,10 @@ import ( "google.golang.org/grpc/reflection" ) +// On macOS you need to install protobuf (`brew install protobuf`) +// Need to install: go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest +//go:generate protoc --go_out=./pb --go_opt=paths=source_relative --go-grpc_out=./pb --go-grpc_opt=paths=source_relative ./api.proto + // server is used to implement the GRPC api type server struct { // grpc related embeds diff --git a/pkg/ingestor/api/grpc/pb/api.pb.go b/pkg/ingestor/api/grpc/pb/api.pb.go index 7059176cb..446a1174d 100644 --- a/pkg/ingestor/api/grpc/pb/api.pb.go +++ b/pkg/ingestor/api/grpc/pb/api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.25.0-devel -// protoc v3.14.0 +// protoc-gen-go v1.34.2 +// protoc v5.27.1 // source: api.proto package grpc @@ -9,6 +9,7 @@ package grpc import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -113,20 +114,191 @@ func (*IngestResponse) Descriptor() ([]byte, []int) { return file_api_proto_rawDescGZIP(), []int{1} } +type RehydrateLatestRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RehydrateLatestRequest) Reset() { + *x = RehydrateLatestRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RehydrateLatestRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RehydrateLatestRequest) ProtoMessage() {} + +func (x *RehydrateLatestRequest) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RehydrateLatestRequest.ProtoReflect.Descriptor instead. +func (*RehydrateLatestRequest) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{2} +} + +type IngestedCluster struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ClusterName string `protobuf:"bytes,1,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"` + RunId string `protobuf:"bytes,2,opt,name=run_id,json=runId,proto3" json:"run_id,omitempty"` + Date *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=date,proto3" json:"date,omitempty"` +} + +func (x *IngestedCluster) Reset() { + *x = IngestedCluster{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *IngestedCluster) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*IngestedCluster) ProtoMessage() {} + +func (x *IngestedCluster) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use IngestedCluster.ProtoReflect.Descriptor instead. +func (*IngestedCluster) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{3} +} + +func (x *IngestedCluster) GetClusterName() string { + if x != nil { + return x.ClusterName + } + return "" +} + +func (x *IngestedCluster) GetRunId() string { + if x != nil { + return x.RunId + } + return "" +} + +func (x *IngestedCluster) GetDate() *timestamppb.Timestamp { + if x != nil { + return x.Date + } + return nil +} + +type RehydrateLatestResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + IngestedCluster []*IngestedCluster `protobuf:"bytes,1,rep,name=ingested_cluster,json=ingestedCluster,proto3" json:"ingested_cluster,omitempty"` +} + +func (x *RehydrateLatestResponse) Reset() { + *x = RehydrateLatestResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_api_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RehydrateLatestResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RehydrateLatestResponse) ProtoMessage() {} + +func (x *RehydrateLatestResponse) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RehydrateLatestResponse.ProtoReflect.Descriptor instead. +func (*RehydrateLatestResponse) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{4} +} + +func (x *RehydrateLatestResponse) GetIngestedCluster() []*IngestedCluster { + if x != nil { + return x.IngestedCluster + } + return nil +} + var File_api_proto protoreflect.FileDescriptor var file_api_proto_rawDesc = []byte{ 0x0a, 0x09, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x67, 0x72, 0x70, - 0x63, 0x22, 0x49, 0x0a, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x10, 0x0a, 0x0e, - 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x3a, - 0x0a, 0x03, 0x41, 0x50, 0x49, 0x12, 0x33, 0x0a, 0x06, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x12, - 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, - 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x63, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x22, 0x49, 0x0a, 0x0d, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x10, 0x0a, + 0x0e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, + 0x18, 0x0a, 0x16, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, + 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x7b, 0x0a, 0x0f, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x21, 0x0a, 0x0c, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, + 0x15, 0x0a, 0x06, 0x72, 0x75, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x72, 0x75, 0x6e, 0x49, 0x64, 0x12, 0x2e, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x04, 0x64, 0x61, 0x74, 0x65, 0x22, 0x5b, 0x0a, 0x17, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, + 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x40, 0x0a, 0x10, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x5f, 0x63, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x67, 0x72, + 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x52, 0x0f, 0x69, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x65, 0x64, 0x43, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x32, 0x8a, 0x01, 0x0a, 0x03, 0x41, 0x50, 0x49, 0x12, 0x33, 0x0a, 0x06, 0x49, + 0x6e, 0x67, 0x65, 0x73, 0x74, 0x12, 0x13, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x49, 0x6e, 0x67, + 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x67, 0x72, 0x70, + 0x63, 0x2e, 0x49, 0x6e, 0x67, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x4e, 0x0a, 0x0f, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, + 0x65, 0x73, 0x74, 0x12, 0x1c, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x68, 0x79, 0x64, + 0x72, 0x61, 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1d, 0x2e, 0x67, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x65, 0x68, 0x79, 0x64, 0x72, 0x61, + 0x74, 0x65, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } @@ -142,19 +314,27 @@ func file_api_proto_rawDescGZIP() []byte { return file_api_proto_rawDescData } -var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_api_proto_goTypes = []interface{}{ - (*IngestRequest)(nil), // 0: grpc.IngestRequest - (*IngestResponse)(nil), // 1: grpc.IngestResponse +var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_api_proto_goTypes = []any{ + (*IngestRequest)(nil), // 0: grpc.IngestRequest + (*IngestResponse)(nil), // 1: grpc.IngestResponse + (*RehydrateLatestRequest)(nil), // 2: grpc.RehydrateLatestRequest + (*IngestedCluster)(nil), // 3: grpc.IngestedCluster + (*RehydrateLatestResponse)(nil), // 4: grpc.RehydrateLatestResponse + (*timestamppb.Timestamp)(nil), // 5: google.protobuf.Timestamp } var file_api_proto_depIdxs = []int32{ - 0, // 0: grpc.API.Ingest:input_type -> grpc.IngestRequest - 1, // 1: grpc.API.Ingest:output_type -> grpc.IngestResponse - 1, // [1:2] is the sub-list for method output_type - 0, // [0:1] is the sub-list for method input_type - 0, // [0:0] is the sub-list for extension type_name - 0, // [0:0] is the sub-list for extension extendee - 0, // [0:0] is the sub-list for field type_name + 5, // 0: grpc.IngestedCluster.date:type_name -> google.protobuf.Timestamp + 3, // 1: grpc.RehydrateLatestResponse.ingested_cluster:type_name -> grpc.IngestedCluster + 0, // 2: grpc.API.Ingest:input_type -> grpc.IngestRequest + 2, // 3: grpc.API.RehydrateLatest:input_type -> grpc.RehydrateLatestRequest + 1, // 4: grpc.API.Ingest:output_type -> grpc.IngestResponse + 4, // 5: grpc.API.RehydrateLatest:output_type -> grpc.RehydrateLatestResponse + 4, // [4:6] is the sub-list for method output_type + 2, // [2:4] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_api_proto_init() } @@ -163,7 +343,7 @@ func file_api_proto_init() { return } if !protoimpl.UnsafeEnabled { - file_api_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[0].Exporter = func(v any, i int) any { switch v := v.(*IngestRequest); i { case 0: return &v.state @@ -175,7 +355,7 @@ func file_api_proto_init() { return nil } } - file_api_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + file_api_proto_msgTypes[1].Exporter = func(v any, i int) any { switch v := v.(*IngestResponse); i { case 0: return &v.state @@ -187,6 +367,42 @@ func file_api_proto_init() { return nil } } + file_api_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*RehydrateLatestRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[3].Exporter = func(v any, i int) any { + switch v := v.(*IngestedCluster); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_api_proto_msgTypes[4].Exporter = func(v any, i int) any { + switch v := v.(*RehydrateLatestResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -194,7 +410,7 @@ func file_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_api_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 5, NumExtensions: 0, NumServices: 1, }, diff --git a/pkg/ingestor/api/grpc/pb/api_grpc.pb.go b/pkg/ingestor/api/grpc/pb/api_grpc.pb.go index b0c227a15..951210160 100644 --- a/pkg/ingestor/api/grpc/pb/api_grpc.pb.go +++ b/pkg/ingestor/api/grpc/pb/api_grpc.pb.go @@ -1,4 +1,8 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.4.0 +// - protoc v5.27.1 +// source: api.proto package grpc @@ -11,14 +15,20 @@ import ( // This is a compile-time assertion to ensure that this generated file // is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 +// Requires gRPC-Go v1.62.0 or later. +const _ = grpc.SupportPackageIsVersion8 + +const ( + API_Ingest_FullMethodName = "/grpc.API/Ingest" + API_RehydrateLatest_FullMethodName = "/grpc.API/RehydrateLatest" +) // APIClient is the client API for API service. // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type APIClient interface { Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) + RehydrateLatest(ctx context.Context, in *RehydrateLatestRequest, opts ...grpc.CallOption) (*RehydrateLatestResponse, error) } type aPIClient struct { @@ -30,8 +40,19 @@ func NewAPIClient(cc grpc.ClientConnInterface) APIClient { } func (c *aPIClient) Ingest(ctx context.Context, in *IngestRequest, opts ...grpc.CallOption) (*IngestResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(IngestResponse) - err := c.cc.Invoke(ctx, "/grpc.API/Ingest", in, out, opts...) + err := c.cc.Invoke(ctx, API_Ingest_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *aPIClient) RehydrateLatest(ctx context.Context, in *RehydrateLatestRequest, opts ...grpc.CallOption) (*RehydrateLatestResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RehydrateLatestResponse) + err := c.cc.Invoke(ctx, API_RehydrateLatest_FullMethodName, in, out, cOpts...) if err != nil { return nil, err } @@ -43,6 +64,7 @@ func (c *aPIClient) Ingest(ctx context.Context, in *IngestRequest, opts ...grpc. // for forward compatibility type APIServer interface { Ingest(context.Context, *IngestRequest) (*IngestResponse, error) + RehydrateLatest(context.Context, *RehydrateLatestRequest) (*RehydrateLatestResponse, error) mustEmbedUnimplementedAPIServer() } @@ -53,6 +75,9 @@ type UnimplementedAPIServer struct { func (UnimplementedAPIServer) Ingest(context.Context, *IngestRequest) (*IngestResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Ingest not implemented") } +func (UnimplementedAPIServer) RehydrateLatest(context.Context, *RehydrateLatestRequest) (*RehydrateLatestResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RehydrateLatest not implemented") +} func (UnimplementedAPIServer) mustEmbedUnimplementedAPIServer() {} // UnsafeAPIServer may be embedded to opt out of forward compatibility for this service. @@ -76,7 +101,7 @@ func _API_Ingest_Handler(srv interface{}, ctx context.Context, dec func(interfac } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: "/grpc.API/Ingest", + FullMethod: API_Ingest_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { return srv.(APIServer).Ingest(ctx, req.(*IngestRequest)) @@ -84,6 +109,24 @@ func _API_Ingest_Handler(srv interface{}, ctx context.Context, dec func(interfac return interceptor(ctx, in, info, handler) } +func _API_RehydrateLatest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RehydrateLatestRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(APIServer).RehydrateLatest(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: API_RehydrateLatest_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(APIServer).RehydrateLatest(ctx, req.(*RehydrateLatestRequest)) + } + return interceptor(ctx, in, info, handler) +} + // API_ServiceDesc is the grpc.ServiceDesc for API service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -95,6 +138,10 @@ var API_ServiceDesc = grpc.ServiceDesc{ MethodName: "Ingest", Handler: _API_Ingest_Handler, }, + { + MethodName: "RehydrateLatest", + Handler: _API_RehydrateLatest_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "api.proto", From ad62e155c0934fa8eb74c3ce7e5473eb316c03fb Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:23:04 +0200 Subject: [PATCH 06/22] generated puller mock --- pkg/ingestor/puller/mocks/mock_puller.go | 57 ++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/pkg/ingestor/puller/mocks/mock_puller.go b/pkg/ingestor/puller/mocks/mock_puller.go index fafb3074c..6599ffa29 100644 --- a/pkg/ingestor/puller/mocks/mock_puller.go +++ b/pkg/ingestor/puller/mocks/mock_puller.go @@ -5,6 +5,7 @@ package mocks import ( context "context" + puller "github.com/DataDog/KubeHound/pkg/ingestor/puller" mock "github.com/stretchr/testify/mock" ) @@ -107,6 +108,62 @@ func (_c *DataPuller_Extract_Call) RunAndReturn(run func(context.Context, string return _c } +// ListFiles provides a mock function with given fields: ctx, prefix, recursive +func (_m *DataPuller) ListFiles(ctx context.Context, prefix string, recursive bool) ([]*puller.ListObject, error) { + ret := _m.Called(ctx, prefix, recursive) + + var r0 []*puller.ListObject + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, bool) ([]*puller.ListObject, error)); ok { + return rf(ctx, prefix, recursive) + } + if rf, ok := ret.Get(0).(func(context.Context, string, bool) []*puller.ListObject); ok { + r0 = rf(ctx, prefix, recursive) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]*puller.ListObject) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, bool) error); ok { + r1 = rf(ctx, prefix, recursive) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// DataPuller_ListFiles_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ListFiles' +type DataPuller_ListFiles_Call struct { + *mock.Call +} + +// ListFiles is a helper method to define mock.On call +// - ctx context.Context +// - prefix string +// - recursive bool +func (_e *DataPuller_Expecter) ListFiles(ctx interface{}, prefix interface{}, recursive interface{}) *DataPuller_ListFiles_Call { + return &DataPuller_ListFiles_Call{Call: _e.mock.On("ListFiles", ctx, prefix, recursive)} +} + +func (_c *DataPuller_ListFiles_Call) Run(run func(ctx context.Context, prefix string, recursive bool)) *DataPuller_ListFiles_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(bool)) + }) + return _c +} + +func (_c *DataPuller_ListFiles_Call) Return(_a0 []*puller.ListObject, _a1 error) *DataPuller_ListFiles_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *DataPuller_ListFiles_Call) RunAndReturn(run func(context.Context, string, bool) ([]*puller.ListObject, error)) *DataPuller_ListFiles_Call { + _c.Call.Return(run) + return _c +} + // Pull provides a mock function with given fields: ctx, clusterName, runID func (_m *DataPuller) Pull(ctx context.Context, clusterName string, runID string) (string, error) { ret := _m.Called(ctx, clusterName, runID) From 4e37f3943c5183dd27be4ee590d6e5e50bf137dc Mon Sep 17 00:00:00 2001 From: jt-dd Date: Thu, 18 Jul 2024 16:23:21 +0200 Subject: [PATCH 07/22] update protobuff --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 32c6b14e8..019254ecb 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( gocloud.dev v0.37.0 golang.org/x/exp v0.0.0-20240604190554-fc45aab8b7f8 google.golang.org/grpc v1.64.1 - google.golang.org/protobuf v1.34.1 + google.golang.org/protobuf v1.34.2 gopkg.in/DataDog/dd-trace-go.v1 v1.64.1 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 diff --git a/go.sum b/go.sum index 2a5ae6de6..7331ec213 100644 --- a/go.sum +++ b/go.sum @@ -916,6 +916,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/DataDog/dd-trace-go.v1 v1.64.1 h1:HN/zoIV8FvrLKA1ZBkbyo4E1MnPh9hPc2Q0C/ojom3I= gopkg.in/DataDog/dd-trace-go.v1 v1.64.1/go.mod h1:qzwVu8Qr8CqzQNw2oKEXRdD+fMnjYatjYMGE0tdCVG4= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= From 0ba9b6ad31812fa277659e4c8b20ad7901a935ce Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:18:50 +0200 Subject: [PATCH 08/22] adding DumpResult to handle all path --- pkg/dump/ingestor.go | 31 +++------ pkg/dump/ingestor_test.go | 9 +-- pkg/dump/result.go | 113 +++++++++++++++++++++++++++++++ pkg/dump/writer/file_writer.go | 2 +- pkg/dump/writer/tar_writer.go | 9 +-- pkg/dump/writer/writer.go | 7 +- pkg/ingestor/puller/blob/blob.go | 16 +++-- pkg/ingestor/puller/puller.go | 6 +- 8 files changed, 143 insertions(+), 50 deletions(-) create mode 100644 pkg/dump/result.go diff --git a/pkg/dump/ingestor.go b/pkg/dump/ingestor.go index 4af9a3340..6765c9086 100644 --- a/pkg/dump/ingestor.go +++ b/pkg/dump/ingestor.go @@ -3,7 +3,6 @@ package dump import ( "context" "fmt" - "path" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" @@ -15,23 +14,10 @@ import ( ) type DumpIngestor struct { - directoryOutput string - ResultName string - collector collector.CollectorClient - writer writer.DumperWriter + collector collector.CollectorClient + writer writer.DumperWriter } -const ( - OfflineDumpDateFormat = "2006-01-02-15-04-05" - OfflineDumpPrefix = "kubehound_" -) - -// .//kubehound__ -func DumpIngestorResultName(clusterName string, runID string) string { - return path.Join(clusterName, fmt.Sprintf("%s%s_%s", OfflineDumpPrefix, clusterName, runID)) -} - -// func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string) (*DumpIngestor, error) { func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, compression bool, directoryOutput string, runID *config.RunID) (*DumpIngestor, error) { // Generate path for the dump clusterName, err := getClusterName(ctx, collector) @@ -39,18 +25,19 @@ func NewDumpIngestor(ctx context.Context, collector collector.CollectorClient, c return nil, err } - resultName := DumpIngestorResultName(clusterName, runID.String()) + dumpResult, err := NewDumpResult(clusterName, runID.String(), compression) + if err != nil { + return nil, fmt.Errorf("create dump result: %w", err) + } - dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, resultName) + dumpWriter, err := writer.DumperWriterFactory(ctx, compression, directoryOutput, dumpResult.GetFullPath()) if err != nil { return nil, fmt.Errorf("create collector writer: %w", err) } return &DumpIngestor{ - directoryOutput: directoryOutput, - collector: collector, - writer: dumpWriter, - ResultName: resultName, + collector: collector, + writer: dumpWriter, }, nil } diff --git a/pkg/dump/ingestor_test.go b/pkg/dump/ingestor_test.go index cbc296cf1..5f28cc681 100644 --- a/pkg/dump/ingestor_test.go +++ b/pkg/dump/ingestor_test.go @@ -48,8 +48,6 @@ func TestNewDumpIngestor(t *testing.T) { runID: config.NewRunID(), }, want: &DumpIngestor{ - directoryOutput: mockDirectoryOutput, - writer: &writer.FileWriter{}, }, wantErr: false, @@ -63,8 +61,7 @@ func TestNewDumpIngestor(t *testing.T) { runID: config.NewRunID(), }, want: &DumpIngestor{ - directoryOutput: mockDirectoryOutput, - writer: &writer.TarWriter{}, + writer: &writer.TarWriter{}, }, wantErr: false, }, @@ -83,10 +80,6 @@ func TestNewDumpIngestor(t *testing.T) { if !assert.Equal(t, reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer)) { t.Errorf("NewDumpIngestor() = %v, want %v", reflect.TypeOf(got.writer), reflect.TypeOf(tt.want.writer)) } - - if !assert.Equal(t, got.directoryOutput, tt.want.directoryOutput) { - t.Errorf("NewDumpIngestor() = %v, want %v", got.directoryOutput, tt.want.directoryOutput) - } }) } } diff --git a/pkg/dump/result.go b/pkg/dump/result.go new file mode 100644 index 000000000..805cadd18 --- /dev/null +++ b/pkg/dump/result.go @@ -0,0 +1,113 @@ +package dump + +import ( + "fmt" + "path" + "regexp" +) + +type DumpResult struct { + clusterName string + RunID string + isDir bool + extension string +} + +const ( + DumpResultClusterNameRegex = `([A-Za-z0-9\.\-_]+)` + DumpResultRunIDRegex = `([a-z0-9]{26})` + DumpResultExtensionRegex = `\.?([a-z0-9\.]+)?` + DumpResultPrefix = "kubehound_" + DumpResultFilenameRegex = DumpResultPrefix + DumpResultClusterNameRegex + "_" + DumpResultRunIDRegex + DumpResultExtensionRegex + DumpResultPathRegex = DumpResultClusterNameRegex + "/" + DumpResultFilenameRegex + + DumpResultTarWriterExtension = "tar.gz" +) + +func NewDumpResult(clusterName, runID string, compressed bool) (*DumpResult, error) { + dumpResult := &DumpResult{ + clusterName: clusterName, + RunID: runID, + isDir: true, + } + if compressed { + dumpResult.Compressed() + } + + err := dumpResult.Validate() + if err != nil { + return nil, err + } + + return dumpResult, nil +} + +func (i *DumpResult) Validate() error { + re := regexp.MustCompile(DumpResultClusterNameRegex) + if !re.MatchString(i.clusterName) { + return fmt.Errorf("Invalid clustername: %q", i.clusterName) + } + + matches := re.FindStringSubmatch(i.clusterName) + if len(matches) == 2 && matches[1] != i.clusterName { + return fmt.Errorf("Invalid clustername: %q", i.clusterName) + } + + re = regexp.MustCompile(DumpResultRunIDRegex) + if !re.MatchString(i.RunID) { + return fmt.Errorf("Invalid runID: %q", i.RunID) + } + + return nil +} + +func (i *DumpResult) Compressed() { + i.isDir = false + i.extension = DumpResultTarWriterExtension +} + +// .//kubehound__ +func (i *DumpResult) GetFullPath() string { + filename := i.GetFilename() + + return path.Join(i.clusterName, filename) +} + +func (i *DumpResult) GetFilename() string { + filename := fmt.Sprintf("%s%s_%s", DumpResultPrefix, i.clusterName, i.RunID) + if i.isDir { + return filename + } + + return fmt.Sprintf("%s.%s", filename, i.extension) +} + +func ParsePath(path string) (*DumpResult, error) { + // .//kubehound__[.tar.gz] + // re := regexp.MustCompile(`([a-z0-9\.\-_]+)/kubehound_([a-z0-9\.-_]+)_([a-z0-9]{26})\.?([a-z0-9\.]+)?`) + re := regexp.MustCompile(DumpResultPathRegex) + if !re.MatchString(path) { + return nil, fmt.Errorf("Invalid path provided: %q", path) + } + + matches := re.FindStringSubmatch(path) + // The cluster name should match (parent dir and in the filename) + if matches[1] != matches[2] { + return nil, fmt.Errorf("Cluster name does not match in the path provided: %q", path) + } + + clusterName := matches[1] + runID := matches[3] + extension := matches[4] + + compressed := false + if extension != "" { + compressed = true + } + result, err := NewDumpResult(clusterName, runID, compressed) + if err != nil { + return nil, err + } + + return result, nil +} diff --git a/pkg/dump/writer/file_writer.go b/pkg/dump/writer/file_writer.go index c67bbcd5f..09bea82ee 100644 --- a/pkg/dump/writer/file_writer.go +++ b/pkg/dump/writer/file_writer.go @@ -35,7 +35,7 @@ type FileWriter struct { fsWriter *FSWriter } -func NewFileWriter(ctx context.Context, directoryOutput string, resName string) (*FileWriter, error) { +func NewFileWriter(ctx context.Context, directoryOutput string) (*FileWriter, error) { fsWriter, err := NewFSWriter(ctx) if err != nil { return nil, fmt.Errorf("creating fs writer: %w", err) diff --git a/pkg/dump/writer/tar_writer.go b/pkg/dump/writer/tar_writer.go index 48e67911f..89107a128 100644 --- a/pkg/dump/writer/tar_writer.go +++ b/pkg/dump/writer/tar_writer.go @@ -6,7 +6,6 @@ import ( "context" "fmt" "os" - "path" "path/filepath" "sync" @@ -18,9 +17,8 @@ import ( ) const ( - TarWriterExtension = ".tar.gz" - TarWriterChmod = 0600 - TarTypeTag = "tar" + TarWriterChmod = 0600 + TarTypeTag = "tar" // Multi-threading the dump with one worker for each types // The number of workers is set to the number of differents entities (roles, pods, ...) @@ -40,8 +38,7 @@ type TarWriter struct { fsWriter *FSWriter } -func NewTarWriter(ctx context.Context, directoryPath string, resName string) (*TarWriter, error) { - tarPath := path.Join(directoryPath, fmt.Sprintf("%s%s", resName, TarWriterExtension)) +func NewTarWriter(ctx context.Context, tarPath string) (*TarWriter, error) { tarFile, err := createTarFile(tarPath) if err != nil { return nil, fmt.Errorf("failed to create tar file: %w", err) diff --git a/pkg/dump/writer/writer.go b/pkg/dump/writer/writer.go index 58aba95bf..36535ad5c 100644 --- a/pkg/dump/writer/writer.go +++ b/pkg/dump/writer/writer.go @@ -2,6 +2,7 @@ package writer import ( "context" + "path" "github.com/DataDog/KubeHound/pkg/telemetry/log" ) @@ -31,9 +32,11 @@ func DumperWriterFactory(ctx context.Context, compression bool, directoryPath st // if compression is enabled, create the tar.gz file if compression { log.I.Infof("Compression enabled") + tarPath := path.Join(directoryPath, resultName) - return NewTarWriter(ctx, directoryPath, resultName) + return NewTarWriter(ctx, tarPath) } - return NewFileWriter(ctx, directoryPath, resultName) + // Output the result directly in the directory provided by the user + return NewFileWriter(ctx, directoryPath) } diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 7890ab19b..22b935810 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -51,10 +51,6 @@ func NewBlobStorage(cfg *config.KubehoundConfig, blobConfig *config.BlobConfig) }, nil } -func getKeyPath(clusterName, runID string) string { - return fmt.Sprintf("%s%s", dump.DumpIngestorResultName(clusterName, runID), writer.TarWriterExtension) -} - func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { urlStruct, err := url.Parse(bs.bucketName) if err != nil { @@ -140,7 +136,11 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName var err error defer func() { spanPut.Finish(tracer.WithError(err)) }() - key := getKeyPath(clusterName, runID) + dumpResult, err := dump.NewDumpResult(clusterName, runID, true) + if err != nil { + return err + } + key := dumpResult.GetFullPath() log.I.Infof("Downloading archive (%s) from blob store", key) b, err := bs.openBucket(ctx) if err != nil { @@ -178,7 +178,11 @@ func (bs *BlobStore) Pull(outer context.Context, clusterName string, runID strin var err error defer func() { spanPull.Finish(tracer.WithError(err)) }() - key := getKeyPath(clusterName, runID) + dumpResult, err := dump.NewDumpResult(clusterName, runID, true) + if err != nil { + return "", err + } + key := dumpResult.GetFullPath() log.I.Infof("Downloading archive (%s) from blob store", key) b, err := bs.openBucket(ctx) if err != nil { diff --git a/pkg/ingestor/puller/puller.go b/pkg/ingestor/puller/puller.go index 33746ceec..f95216b9a 100644 --- a/pkg/ingestor/puller/puller.go +++ b/pkg/ingestor/puller/puller.go @@ -29,14 +29,10 @@ type ListObject struct { ModTime time.Time } -func FormatArchiveKey(clusterName string, runID string, archiveName string) string { - return strings.Join([]string{clusterName, runID, archiveName}, "/") -} - // checkSanePath just to make sure we don't delete or overwrite somewhere where we are not supposed to func CheckSanePath(path string, baseFolder string) error { if path == "/" || path == "" || !strings.HasPrefix(path, baseFolder) { - return fmt.Errorf("Invalid path provided: %q", path) + return fmt.Errorf("Invalid path provided: %q / base: %q", path, baseFolder) } return nil From 188b51d484617a64bfdb60d6529f2f0e3e2e5ede Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:20:51 +0200 Subject: [PATCH 09/22] typo logs --- pkg/ingestor/puller/blob/blob.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 22b935810..2f56bbafa 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -141,7 +141,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName return err } key := dumpResult.GetFullPath() - log.I.Infof("Downloading archive (%s) from blob store", key) + log.I.Infof("Opening bucket: %s", bs.bucketName) b, err := bs.openBucket(ctx) if err != nil { return err @@ -154,7 +154,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName } defer f.Close() - log.I.Infof("Downloading archive (%q) from blob store", key) + log.I.Infof("Uploading archive (%q) from blob store", key) w := bufio.NewReader(f) err = b.Upload(ctx, key, w, &blob.WriterOptions{ ContentType: "application/gzip", @@ -183,7 +183,7 @@ func (bs *BlobStore) Pull(outer context.Context, clusterName string, runID strin return "", err } key := dumpResult.GetFullPath() - log.I.Infof("Downloading archive (%s) from blob store", key) + log.I.Infof("Opening bucket: %s", bs.bucketName) b, err := bs.openBucket(ctx) if err != nil { return "", err From 6a063f7373127994d1f4977c49a45632a954fadb Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:21:16 +0200 Subject: [PATCH 10/22] fixing listFiles function --- pkg/ingestor/puller/blob/blob.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 2f56bbafa..dd29e33dd 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -94,8 +94,7 @@ func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { return bucket, nil } -func (bs *BlobStore) listFiles(ctx context.Context, b *blob.Bucket, prefix string, recursive bool) ([]*puller.ListObject, error) { - objs := []*puller.ListObject{} +func (bs *BlobStore) listFiles(ctx context.Context, b *blob.Bucket, prefix string, recursive bool, listObjects []*puller.ListObject) ([]*puller.ListObject, error) { iter := b.List(&blob.ListOptions{ Delimiter: "/", Prefix: prefix, @@ -108,16 +107,17 @@ func (bs *BlobStore) listFiles(ctx context.Context, b *blob.Bucket, prefix strin if err != nil { return nil, fmt.Errorf("listing objects: %w", err) } + if obj.IsDir && recursive { - objs, _ = bs.listFiles(ctx, b, obj.Key, true) + listObjects, _ = bs.listFiles(ctx, b, obj.Key, true, listObjects) } - objs = append(objs, &puller.ListObject{ + listObjects = append(listObjects, &puller.ListObject{ Key: obj.Key, ModTime: obj.ModTime, }) } - return objs, nil + return listObjects, nil } func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive bool) ([]*puller.ListObject, error) { @@ -125,8 +125,9 @@ func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive boo if err != nil { return nil, err } + listObjects := []*puller.ListObject{} - return bs.listFiles(ctx, b, prefix, recursive) + return bs.listFiles(ctx, b, prefix, recursive, listObjects) } // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive From c21ba034b97481b8561c95b993eae176fef83ede Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:22:02 +0200 Subject: [PATCH 11/22] cleaning --- pkg/ingestor/puller/blob/blob.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index dd29e33dd..e0dfe7727 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -12,7 +12,6 @@ import ( "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/dump" - "github.com/DataDog/KubeHound/pkg/dump/writer" "github.com/DataDog/KubeHound/pkg/ingestor/puller" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" From 642533906be7b3082e7539c542416c3b14fb5e2c Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:22:33 +0200 Subject: [PATCH 12/22] adding unit test for the DumpResult --- pkg/dump/result_test.go | 298 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 pkg/dump/result_test.go diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go new file mode 100644 index 000000000..33eb4285f --- /dev/null +++ b/pkg/dump/result_test.go @@ -0,0 +1,298 @@ +package dump + +import ( + "fmt" + "path" + "reflect" + "testing" +) + +const ( + validClusterName = "cluster1.k8s.local" + validRunID = "01j2qs8th6yarr5hkafysekn0j" + // cluster name with invalid characters (for instance /) + nonValidClusterName = "cluster1.k8s.local/" + // RunID with capital letters + nonValidRunID = "01j2qs8TH6yarr5hkafysekn0j" +) + +func TestParsePath(t *testing.T) { + t.Parallel() + type args struct { + path string + } + tests := []struct { + name string + args args + want *DumpResult + wantErr bool + }{ + { + name: "valid path with no compression", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: true, + extension: "", + }, + wantErr: false, + }, + { + name: "valid path with no compression", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: true, + extension: "", + }, + wantErr: false, + }, + { + name: "valid path with compressed data", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: false, + extension: "tar.gz", + }, + wantErr: false, + }, + { + name: "invalid path", + args: args{ + path: "/tmp/cluster1.k8s.local/cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "not matching clustername ", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8TH6yarr5hkafysekn0j", + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := ParsePath(tt.args.path) + if (err != nil) != tt.wantErr { + t.Errorf("ParsePath() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ParsePath() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDumpResult_GetFilename(t *testing.T) { + t.Parallel() + + type fields struct { + ClusterName string + RunID string + IsDir bool + Extension string + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "valide dump result object no compression", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: true, + Extension: "", + }, + want: fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j"), + }, + { + name: "valide dump result object compressed", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: false, + Extension: "tar.gz", + }, + want: fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz"), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + i := &DumpResult{ + clusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, + } + if got := i.GetFilename(); got != tt.want { + t.Errorf("DumpResult.GetFilename() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDumpResult_GetFullPath(t *testing.T) { + t.Parallel() + + type fields struct { + ClusterName string + RunID string + IsDir bool + Extension string + } + tests := []struct { + name string + fields fields + want string + }{ + { + name: "valide dump result object no compression", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: true, + Extension: "", + }, + want: path.Join(validClusterName, fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j")), + }, + { + name: "valide dump result object compressed", + fields: fields{ + ClusterName: validClusterName, + RunID: validRunID, + IsDir: false, + Extension: "tar.gz", + }, + want: path.Join(validClusterName, fmt.Sprintf("%s%s", "kubehound_", "cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz")), + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + i := &DumpResult{ + clusterName: tt.fields.ClusterName, + RunID: tt.fields.RunID, + isDir: tt.fields.IsDir, + extension: tt.fields.Extension, + } + if got := i.GetFullPath(); got != tt.want { + t.Errorf("DumpResult.GetFullPath() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewDumpResult(t *testing.T) { + t.Parallel() + + type args struct { + clusterName string + runID string + compressed bool + } + tests := []struct { + name string + args args + want *DumpResult + wantErr bool + }{ + { + name: "valid entry", + args: args{ + clusterName: validClusterName, + runID: validRunID, + compressed: false, + }, + want: &DumpResult{ + clusterName: validClusterName, + RunID: validRunID, + isDir: true, + }, + wantErr: false, + }, + { + name: "invalid clustername", + args: args{ + clusterName: nonValidClusterName, + runID: validRunID, + compressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "emtpy clustername", + args: args{ + clusterName: "", + runID: validRunID, + compressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + clusterName: validClusterName, + runID: nonValidRunID, + compressed: false, + }, + want: nil, + wantErr: true, + }, + { + name: "invalid runID", + args: args{ + clusterName: validClusterName, + runID: "", + compressed: false, + }, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := NewDumpResult(tt.args.clusterName, tt.args.runID, tt.args.compressed) + if (err != nil) != tt.wantErr { + t.Errorf("NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewDumpResult() = %v, want %v", got, tt.want) + } + }) + } +} From a5533dd740a834b70f77b4f35a2e8a049f4614c5 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:22:50 +0200 Subject: [PATCH 13/22] adding unit test to IsTarGz --- pkg/ingestor/puller/puller_test.go | 65 ++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/pkg/ingestor/puller/puller_test.go b/pkg/ingestor/puller/puller_test.go index 0d36977df..a6dc12f0c 100644 --- a/pkg/ingestor/puller/puller_test.go +++ b/pkg/ingestor/puller/puller_test.go @@ -142,3 +142,68 @@ func TestExtractTarGz(t *testing.T) { }) } } + +func TestIsTarGz(t *testing.T) { + t.Parallel() + type args struct { + filePath string + maxArchiveSize int64 + } + tests := []struct { + name string + args args + want bool + wantErr bool + }{ + { + name: "dump result compressed", + args: args{ + maxArchiveSize: 10000000, + filePath: "./testdata/archive.tar.gz", + }, + want: true, + wantErr: false, + }, + { + name: "Unsupported file type", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/regenerate-testdata.sh", + }, + want: false, + wantErr: true, + }, + { + name: "wrong path", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/doesnotexist.tar.gz", + }, + want: false, + wantErr: true, + }, + { + name: "dump result not compressed - directory", + args: args{ + maxArchiveSize: 100, + filePath: "./testdata/", + }, + want: false, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := IsTarGz(tt.args.filePath, tt.args.maxArchiveSize) + if (err != nil) != tt.wantErr { + t.Errorf("IsTarGz() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("IsTarGz() = %v, want %v", got, tt.want) + } + }) + } +} From ceb323e31fb793d7ba8047c1304184bb209bd577 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:23:55 +0200 Subject: [PATCH 14/22] adding unit tests for blob storage (local only) --- pkg/ingestor/puller/blob/blob.go | 4 +- pkg/ingestor/puller/blob/blob_test.go | 525 ++++++++++++++++++++++++++ 2 files changed, 527 insertions(+), 2 deletions(-) create mode 100644 pkg/ingestor/puller/blob/blob_test.go diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index e0dfe7727..f87a3c9c5 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -229,7 +229,7 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { defer func() { spanExtract.Finish(tracer.WithError(err)) }() basePath := filepath.Dir(archivePath) - err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir) + err = puller.CheckSanePath(archivePath, basePath) if err != nil { return fmt.Errorf("Dangerous file path used during extraction, aborting: %w", err) } @@ -250,7 +250,7 @@ func (bs *BlobStore) Close(ctx context.Context, archivePath string) error { var err error defer func() { spanClose.Finish(tracer.WithError(err)) }() - path := filepath.Base(archivePath) + path := filepath.Dir(archivePath) err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir) if err != nil { return fmt.Errorf("Dangerous file path used while closing, aborting: %w", err) diff --git a/pkg/ingestor/puller/blob/blob_test.go b/pkg/ingestor/puller/blob/blob_test.go new file mode 100644 index 000000000..c41e170ec --- /dev/null +++ b/pkg/ingestor/puller/blob/blob_test.go @@ -0,0 +1,525 @@ +package blob + +import ( + "context" + "fmt" + "os" + "path" + "reflect" + "regexp" + "testing" + "time" + + "github.com/DataDog/KubeHound/pkg/config" + "github.com/DataDog/KubeHound/pkg/dump" + "github.com/DataDog/KubeHound/pkg/ingestor/puller" + _ "gocloud.dev/blob/azureblob" + _ "gocloud.dev/blob/fileblob" + _ "gocloud.dev/blob/gcsblob" + _ "gocloud.dev/blob/memblob" +) + +const ( + bucketName = "file://./testdata/fakeBlobStorage" + tempFileRegexp = `.*/testdata/tmpdir/kh-[0-9]+/archive.tar.gz` + validRunID = "01j2qs8th6yarr5hkafysekn0j" + validClusterName = "cluster1.k8s.local" +) + +func getTempDir(t *testing.T) string { + t.Helper() + + return getAbsPath(t, "testdata/tmpdir") +} + +func getAbsPath(t *testing.T, filepath string) string { + t.Helper() + // Get current working directory to pass SaneCheckPath() + pwd, err := os.Getwd() + if err != nil { + t.Errorf("Error getting current working directory: %v", err) + + return "" + } + + return path.Join(pwd, filepath) +} + +func dummyKubehoundConfig(t *testing.T) *config.KubehoundConfig { + t.Helper() + + return &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + } +} + +func TestBlobStore_ListFiles(t *testing.T) { + t.Parallel() + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + prefix string + recursive bool + } + tests := []struct { + name string + fields fields + args args + want []*puller.ListObject + wantErr bool + }{ + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/", + }, + { + Key: "cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster2.k8s.local/", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "cluster1.k8s.local", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: true, + prefix: "cluster1.k8s.local/", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + { + Key: "cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz", + }, + }, + wantErr: false, + }, + { + name: "Sanitize path", + fields: fields{ + bucketName: bucketName, + }, + args: args{ + recursive: false, + prefix: "", + }, + want: []*puller.ListObject{ + { + Key: "cluster1.k8s.local/", + }, + { + Key: "cluster2.k8s.local/", + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + got, err := bs.ListFiles(ctx, tt.args.prefix, tt.args.recursive) + if (err != nil) != tt.wantErr { + t.Errorf("BlobStore.ListFiles() error = %v, wantErr %v", err, tt.wantErr) + + return + } + + // Reset modtime to avoid comparison issues + for _, v := range got { + v.ModTime = time.Time{} + } + + if !reflect.DeepEqual(got, tt.want) { + for i, v := range got { + t.Logf("Got: %d: %s", i, v.Key) + } + t.Errorf("BlobStore.ListFiles() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBlobStore_Pull(t *testing.T) { + t.Parallel() + + // Get current working directory to pass SaneCheckPath() + pwd, err := os.Getwd() + if err != nil { + t.Errorf("Error getting current working directory: %v", err) + + return + } + tmpDir := path.Join(pwd, "testdata/tmpdir") + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + clusterName string + runID string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Pulling file successfully", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: tmpDir, + }, + }, + }, + args: args{ + clusterName: validClusterName, + runID: validRunID, + }, + wantErr: false, + }, + { + name: "Empty tmp dir", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{}, + }, + }, + args: args{ + clusterName: validClusterName, + runID: validRunID, + }, + wantErr: true, + }, + { + name: "Wrong cluster name", + fields: fields{ + bucketName: bucketName, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: tmpDir, + }, + }, + }, + args: args{ + clusterName: "cluster4.k8s.local", + runID: validRunID, + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + got, err := bs.Pull(ctx, tt.args.clusterName, tt.args.runID) + if (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Pull() error = %v, wantErr %v", err, tt.wantErr) + + return + } + + // No path was returned so no need to go further + if got == "" { + return + } + + re := regexp.MustCompile(tempFileRegexp) + if !re.MatchString(got) { + t.Errorf("Path is not valid() = %q, should respect %v", got, tempFileRegexp) + + return + } + + err = bs.Close(ctx, got) + if err != nil { + t.Errorf("bs.Close() error = %v", err) + } + }) + } +} + +func TestNewBlobStorage(t *testing.T) { + t.Parallel() + type args struct { + cfg *config.KubehoundConfig + blobConfig *config.BlobConfig + } + tests := []struct { + name string + args args + want *BlobStore + wantErr bool + }{ + { + name: "empty bucket name", + args: args{ + blobConfig: &config.BlobConfig{ + Bucket: "", + }, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + }, + wantErr: true, + }, + { + name: "valid blob storage", + args: args{ + blobConfig: &config.BlobConfig{ + Bucket: "fakeBlobStorage", + }, + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + }, + want: &BlobStore{ + bucketName: "fakeBlobStorage", + cfg: &config.KubehoundConfig{ + Ingestor: config.IngestorConfig{ + TempDir: getTempDir(t), + }, + }, + region: "", + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + got, err := NewBlobStorage(tt.args.cfg, tt.args.blobConfig) + if (err != nil) != tt.wantErr { + t.Errorf("NewBlobStorage() error = %v, wantErr %v", err, tt.wantErr) + + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewBlobStorage() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBlobStore_Put(t *testing.T) { + t.Parallel() + fakeBlobStoragePut := "./testdata/fakeBlobStoragePut" + bucketNamePut := fmt.Sprintf("file://%s", fakeBlobStoragePut) + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + archivePath string + clusterName string + runID string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "Push new dump", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0j", + }, + wantErr: false, + }, + { + name: "non existing filepath", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive2.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0j", + }, + wantErr: true, + }, + { + name: "invalid runID", + fields: fields{ + bucketName: bucketNamePut, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: "./testdata/archive2.tar.gz", + clusterName: validClusterName, + runID: "91j2qs8th6yarr5hkafysekn0T", + }, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + var err error + if err = bs.Put(ctx, tt.args.archivePath, tt.args.clusterName, tt.args.runID); (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Put() error = %v, wantErr %v", err, tt.wantErr) + } + + if err != nil { + return + } + + // Building result path to clean up + dumpResult, err := dump.NewDumpResult(tt.args.clusterName, tt.args.runID, true) + if err != nil { + t.Errorf("NewDumpResult cluster:%s runID:%s", tt.args.clusterName, tt.args.runID) + } + key := path.Join(bs.cfg.Ingestor.TempDir, dumpResult.GetFullPath()) + + err = os.RemoveAll(path.Join(fakeBlobStoragePut, tt.args.clusterName)) + if err != nil { + t.Errorf("Error removing file %s.attrs: %v", key, err) + } + }) + } +} + +func TestBlobStore_Extract(t *testing.T) { + t.Parallel() + fakeBlobStoragePut := "./testdata/fakeBlobStorageExtract" + bucketNameExtract := fmt.Sprintf("file://%s", fakeBlobStoragePut) + type fields struct { + bucketName string + cfg *config.KubehoundConfig + region string + } + type args struct { + archivePath string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + }{ + { + name: "invalid runID", + fields: fields{ + bucketName: bucketNameExtract, + cfg: dummyKubehoundConfig(t), + }, + args: args{ + archivePath: getAbsPath(t, "testdata/archive.tar.gz"), + }, + wantErr: false, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ctx := context.Background() + bs := &BlobStore{ + bucketName: tt.fields.bucketName, + cfg: tt.fields.cfg, + region: tt.fields.region, + } + if err := bs.Extract(ctx, tt.args.archivePath); (err != nil) != tt.wantErr { + t.Errorf("BlobStore.Extract() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} From d24531865a80c4a8333bae701442546bf8558ac9 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:24:54 +0200 Subject: [PATCH 15/22] adding files for unit tests --- pkg/ingestor/puller/blob/testdata/archive.tar.gz | Bin 0 -> 2067 bytes ...1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz | Bin 0 -> 2067 bytes ...1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz | Bin 0 -> 2067 bytes ...1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz | Bin 0 -> 2067 bytes ...2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz | Bin 0 -> 2067 bytes ...2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz | Bin 0 -> 2067 bytes .../blob/testdata/fakeBlobStoragePut/.keep | 0 pkg/ingestor/puller/blob/testdata/tmpdir/.keep | 0 8 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 pkg/ingestor/puller/blob/testdata/archive.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz create mode 100644 pkg/ingestor/puller/blob/testdata/fakeBlobStoragePut/.keep create mode 100644 pkg/ingestor/puller/blob/testdata/tmpdir/.keep diff --git a/pkg/ingestor/puller/blob/testdata/archive.tar.gz b/pkg/ingestor/puller/blob/testdata/archive.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster1.k8s.local/kubehound_cluster1.k8s.local_21j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_01j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz b/pkg/ingestor/puller/blob/testdata/fakeBlobStorage/cluster2.k8s.local/kubehound_cluster2.k8s.local_11j2qs8th6yarr5hkafysekn0j.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..5b159e310d0f46355f6d00c7a32007b18819c13a GIT binary patch literal 2067 zcmV+u2<-PCiwFP!000001MON%bK5o$_SwI}!*fKFq9i{q9oJ1A&(w}4QF=%xlYvNB zLJSfN06K9x{_h3AhX@j+XxFjpDYgeU!~$6CE_T0Nh(pFlAkgV6QV^u!LBbtk6u_F+~k?2;yp7{iDIaAiY7gZOeae&c9pJf+Mfs zJ9yPlU5d^q$lO6GS0 zAKli`PbG@8sOwCbhVeHLP`(tPOd_%Zj0GcaI`XFr-;-d>^FEg8p-2aXL?DVp+G22v zK<9-L&*$~Mi^=M&M`PEY9lE~jd(}>_x)*#y7*}>RFPEFzh2I6b6eWx>YD0W!q}{vS z=e}+KJ<{U80E|B&{BMc>{$w_3)qh@K32+Zs)wa(xE_WAPU;`RIa$?0oD1yN170XT!dQxdl{)?&Rv5J%14W+!)t zM{eI?4)|J5DgD&ln7ns|yUtPb$emfBfy#aV`rV)Br>{SsoSwe@aJhJyDwa;_GOf$2 zuJYsRx%Ksvp`t}`0TLq;UPHRhmI~TIiFZ)Q1mX$T#~8?~XDW!*^m&HyoM03xICg@+ zf!k(BS}~V))D|}}ri5~HVJ@l1Q>wc@7b}Pbj>ewX;~{H-j)rlTqOUn`hA>@nN;nA# zRuJ-ZZno$!%gE-PI0|EPN|vzGGo34da7N)`EkNCxV3~Ht9Y;~(pF!U1=1y98>Hg;u z->f0PeEm!B!3U|4g=?5A^37t=rJWAhkw{hu;NOJz+`fkr;!76^x{jJcib%9!^t%{Y z7KTF0b^ZArtXw2%lJVN2Ek1+vTEnWth#19)caJNcq6YH%%uuU}g0uVSm%^`^xaChI z#3029--}VA@X>`ZkqkAZZ$|KDzcxI=WMzsZh83zn^*hoY`KmcL1ZOe4kWu+TK`53p zD?x7n-KSZy&!B*Mob6v{OHtxM5m4p3K#>H1@u^Q+wTwV=z5r5a1eDv3Cf|>)e)LV0 zapY}+<&kA0U}!|zHILUKl@lfGR5g*?s83-BaW)g`QVfT#`meu1OVVZ$zoCwcR5dXo z!c9p?F)3sqxu$LJH2C3`t%|6ybu-_vnST5Yrq#8)@!3m~Xtg{{z!Jma!T1l9T(iQ= zH6EGePA~$@&@ne4;C0b5==S{$L|hTF!q6>yKkWk=AxD~bZI0aWbmA6&odNzPGr*}C z&DYsvQ{%G!45r$%FdT2>tZ}Cl!dUs)M$7a3+aKhcKoTO2P}z;r<`mzK_i9;1#(7Z_ z^ByRq5@3{JNJ{}Dcm*;n%9?UO1xnkw-8+@;scpVXSPS^)=3yWc+ zcj+j!=XV8;0k8IO*8=Yu6*a`Xk&c79xJAevK0jFLet`cT*G8o(E-1wh$?;7lsNMa2GkxUpyO({hZY*Mpq{?K><(qRWh z9LD`B9=^A>@HJGIG6Zf&`%-OdVyVAk=Aen)iY&LwQ1>m_uOWH|2=BAp}m^USIg+?|EhkS{`jN5 z8J}-*G9(1qOi(3q@hX#K7@MB%1>1sxgrTzCYV5R*XCF;Tr1N2@;@CXls4^A(5D7=u zz^kV`$x)+c{D70>uVXL!3>&!k?5AmV@gH!Sxj;|_PHZ}t5eZ8ue?C`e+mla*uCJlk zwI~Ucp|d2}{0uoCBat?7+!`EoHw&m=t}#I5uY{cnbt-lIno*&frT$Y~1+jZooW~dY xI~+@vELpN-$&w{YmMmGaWXX~xOO`BIvSi7UB}M{sB*Kn0f$E004M131k2O literal 0 HcmV?d00001 diff --git a/pkg/ingestor/puller/blob/testdata/fakeBlobStoragePut/.keep b/pkg/ingestor/puller/blob/testdata/fakeBlobStoragePut/.keep new file mode 100644 index 000000000..e69de29bb diff --git a/pkg/ingestor/puller/blob/testdata/tmpdir/.keep b/pkg/ingestor/puller/blob/testdata/tmpdir/.keep new file mode 100644 index 000000000..e69de29bb From 68d175e4492389bf028114589e380c24e7388f68 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 20:33:25 +0200 Subject: [PATCH 16/22] PR comments --- pkg/ingestor/api/api.go | 48 +++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 18 deletions(-) diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index a1b8dd43d..d59a180a3 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -5,12 +5,12 @@ import ( "errors" "fmt" "path/filepath" - "sort" + "slices" "strings" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" - "github.com/DataDog/KubeHound/pkg/dump/writer" + "github.com/DataDog/KubeHound/pkg/dump" "github.com/DataDog/KubeHound/pkg/ingestor" grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb" "github.com/DataDog/KubeHound/pkg/ingestor/notifier" @@ -76,23 +76,30 @@ func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedClus if l := len(dumpKeys); l > 0 { // extracting the latest runID - sort.Slice(dumpKeys, func(a, b int) bool { - return dumpKeys[a].ModTime.Before(dumpKeys[b].ModTime) + latestDump := slices.MaxFunc(dumpKeys, func(a, b *puller.ListObject) int { + // return dumpKeys[a].ModTime.Before(dumpKeys[b].ModTime) + return a.ModTime.Compare(b.ModTime) }) - ingestTime := dumpKeys[l-1].ModTime - prefix := fmt.Sprintf("%s/kubehound_%s_", clusterName, clusterName) - runID := strings.TrimPrefix(dumpKeys[l-1].Key, prefix) - runID = strings.TrimSuffix(runID, writer.TarWriterExtension) + latestDumpIngestTime := latestDump.ModTime + latestDumpKey := latestDump.Key - clusterErr := g.Ingest(ctx, clusterName, runID) + dumpResult, err := dump.ParsePath(latestDumpKey) if err != nil { + errRet = errors.Join(errRet, fmt.Errorf("parsing dump path %s: %w", latestDumpKey, err)) + + continue + } + runID := dumpResult.RunID + + clusterErr := g.Ingest(ctx, clusterName, runID) + if clusterErr != nil { errRet = errors.Join(errRet, fmt.Errorf("ingesting cluster %s/%s: %w", clusterName, runID, clusterErr)) } - log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", clusterName, ingestTime.Format("01-02-2006 15:04:05"), runID) + log.I.Infof("Rehydrated cluster: %s, date: %s, run_id: %s", clusterName, latestDumpIngestTime.Format("01-02-2006 15:04:05"), runID) ingestedCluster := &grpc.IngestedCluster{ ClusterName: clusterName, RunId: runID, - Date: timestamppb.New(ingestTime), + Date: timestamppb.New(latestDumpIngestTime), } res = append(res, ingestedCluster) @@ -132,10 +139,11 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string if err != nil { return err } - err = g.puller.Close(runCtx, archivePath) //nolint: contextcheck - if err != nil { - return err - } + + defer func() { + err = errors.Join(err, g.puller.Close(runCtx, archivePath)) + }() + err = g.puller.Extract(runCtx, archivePath) //nolint: contextcheck if err != nil { return err @@ -164,7 +172,10 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string if err != nil { return fmt.Errorf("collector client creation: %w", err) } - defer collect.Close(runCtx) //nolint: contextcheck + + defer func() { + err = errors.Join(err, collect.Close(runCtx)) + }() log.I.Infof("Loaded %s collector client", collect.Name()) err = g.Cfg.ComputeDynamic(config.WithClusterName(clusterName), config.WithRunID(runID)) @@ -198,10 +209,11 @@ func (g *IngestorAPI) Ingest(_ context.Context, clusterName string, runID string } err = g.notifier.Notify(runCtx, clusterName, runID) //nolint: contextcheck if err != nil { - return err + return fmt.Errorf("notifying: %w", err) } - return nil + // returning err from the defer functions + return err } func (g *IngestorAPI) isAlreadyIngestedInGraph(_ context.Context, clusterName string, runID string) (bool, error) { From 11c216dc3c69a48f3a3a2c4a21fb076a87fc43b1 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 22:32:03 +0200 Subject: [PATCH 17/22] fix unit tests --- pkg/dump/writer/file_writer_test.go | 3 ++- pkg/dump/writer/tar_writer_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/dump/writer/file_writer_test.go b/pkg/dump/writer/file_writer_test.go index 05960696a..f60b0a435 100644 --- a/pkg/dump/writer/file_writer_test.go +++ b/pkg/dump/writer/file_writer_test.go @@ -33,7 +33,8 @@ func TestFileWriter_Write(t *testing.T) { collector.FakeEndpoint("name2", dummyNamespace, []int32{int32(443)}), } - writer, err := NewFileWriter(ctx, tmpDir, fileNameK8sObject) + directoryOutput := path.Join(tmpDir, fileNameK8sObject) + writer, err := NewFileWriter(ctx, directoryOutput) if err != nil { t.Fatalf("failed to create file writer: %v", err) } diff --git a/pkg/dump/writer/tar_writer_test.go b/pkg/dump/writer/tar_writer_test.go index 4b14a94f8..57c02aeac 100644 --- a/pkg/dump/writer/tar_writer_test.go +++ b/pkg/dump/writer/tar_writer_test.go @@ -52,7 +52,8 @@ func TestTarWriter_Write(t *testing.T) { vfsResourcePath2 := path.Join(dummyNamespace2, fileNameK8sObject) tarBundle[vfsResourcePath2] = dummyK8sObject2 - writer, err := NewTarWriter(ctx, tmpTarFileDir, fileNameK8sObject) + tarPath := path.Join(tmpTarFileDir, fileNameK8sObject) + writer, err := NewTarWriter(ctx, tarPath) if err != nil { t.Fatalf("failed to create file writer: %v", err) } From e438f5a9efb41fdadb8998fb8d4f4d05c1a1fa22 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Tue, 23 Jul 2024 22:34:15 +0200 Subject: [PATCH 18/22] fix linter --- pkg/dump/result_test.go | 3 ++- pkg/ingestor/puller/puller_test.go | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go index 33eb4285f..ed3f73ab5 100644 --- a/pkg/dump/result_test.go +++ b/pkg/dump/result_test.go @@ -251,7 +251,7 @@ func TestNewDumpResult(t *testing.T) { wantErr: true, }, { - name: "emtpy clustername", + name: "empty clustername", args: args{ clusterName: "", runID: validRunID, @@ -288,6 +288,7 @@ func TestNewDumpResult(t *testing.T) { got, err := NewDumpResult(tt.args.clusterName, tt.args.runID, tt.args.compressed) if (err != nil) != tt.wantErr { t.Errorf("NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) + return } if !reflect.DeepEqual(got, tt.want) { diff --git a/pkg/ingestor/puller/puller_test.go b/pkg/ingestor/puller/puller_test.go index a6dc12f0c..0d47aeef6 100644 --- a/pkg/ingestor/puller/puller_test.go +++ b/pkg/ingestor/puller/puller_test.go @@ -199,6 +199,7 @@ func TestIsTarGz(t *testing.T) { got, err := IsTarGz(tt.args.filePath, tt.args.maxArchiveSize) if (err != nil) != tt.wantErr { t.Errorf("IsTarGz() error = %v, wantErr %v", err, tt.wantErr) + return } if got != tt.want { From aac2e805d57256551fb2befc806eb692be153a72 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Wed, 24 Jul 2024 11:08:47 +0200 Subject: [PATCH 19/22] fixing unit test --- pkg/dump/ingestor_test.go | 3 +-- pkg/dump/testdata/kube-config | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 pkg/dump/testdata/kube-config diff --git a/pkg/dump/ingestor_test.go b/pkg/dump/ingestor_test.go index 5f28cc681..149146c75 100644 --- a/pkg/dump/ingestor_test.go +++ b/pkg/dump/ingestor_test.go @@ -20,9 +20,9 @@ const ( ) func TestNewDumpIngestor(t *testing.T) { - t.Parallel() ctx := context.Background() + t.Setenv("KUBECONFIG", "./testdata/kube-config") clientset := fake.NewSimpleClientset() collectorClient := collector.NewTestK8sAPICollector(ctx, clientset) @@ -69,7 +69,6 @@ func TestNewDumpIngestor(t *testing.T) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { - t.Parallel() got, err := NewDumpIngestor(ctx, tt.args.collectorClient, tt.args.compression, tt.args.directoryOutput, tt.args.runID) if (err != nil) != tt.wantErr { t.Errorf("NewDumpIngestorsss() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/dump/testdata/kube-config b/pkg/dump/testdata/kube-config new file mode 100644 index 000000000..bf569401f --- /dev/null +++ b/pkg/dump/testdata/kube-config @@ -0,0 +1,20 @@ +apiVersion: v1 +clusters: +- cluster: + certificate-authority-data: AA== + server: https://127.0.0.1:59959 + name: cluster.k8s.local +contexts: +- context: + cluster: cluster.k8s.local + user: cluster.k8s.local + name: cluster.k8s.local +current-context: cluster.k8s.local +kind: Config +preferences: {} +users: +- name: cluster.k8s.local + user: + client-certificate-data: AA== + client-key-data: AA== + From c6c1933587c878bb543890737658384efe32e76d Mon Sep 17 00:00:00 2001 From: jt-dd Date: Wed, 24 Jul 2024 11:23:51 +0200 Subject: [PATCH 20/22] PR comment --- pkg/dump/result.go | 10 ++++---- pkg/dump/result_test.go | 51 +++++++++++++++-------------------------- 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/pkg/dump/result.go b/pkg/dump/result.go index 805cadd18..3c0955983 100644 --- a/pkg/dump/result.go +++ b/pkg/dump/result.go @@ -24,13 +24,13 @@ const ( DumpResultTarWriterExtension = "tar.gz" ) -func NewDumpResult(clusterName, runID string, compressed bool) (*DumpResult, error) { +func NewDumpResult(clusterName, runID string, isCompressed bool) (*DumpResult, error) { dumpResult := &DumpResult{ clusterName: clusterName, RunID: runID, isDir: true, } - if compressed { + if isCompressed { dumpResult.Compressed() } @@ -100,11 +100,11 @@ func ParsePath(path string) (*DumpResult, error) { runID := matches[3] extension := matches[4] - compressed := false + isCompressed := false if extension != "" { - compressed = true + isCompressed = true } - result, err := NewDumpResult(clusterName, runID, compressed) + result, err := NewDumpResult(clusterName, runID, isCompressed) if err != nil { return nil, err } diff --git a/pkg/dump/result_test.go b/pkg/dump/result_test.go index ed3f73ab5..e084711d5 100644 --- a/pkg/dump/result_test.go +++ b/pkg/dump/result_test.go @@ -27,19 +27,6 @@ func TestParsePath(t *testing.T) { want *DumpResult wantErr bool }{ - { - name: "valid path with no compression", - args: args{ - path: "/tmp/cluster1.k8s.local/kubehound_cluster1.k8s.local_01j2qs8th6yarr5hkafysekn0j", - }, - want: &DumpResult{ - clusterName: validClusterName, - RunID: validRunID, - isDir: true, - extension: "", - }, - wantErr: false, - }, { name: "valid path with no compression", args: args{ @@ -216,9 +203,9 @@ func TestNewDumpResult(t *testing.T) { t.Parallel() type args struct { - clusterName string - runID string - compressed bool + clusterName string + runID string + isCompressed bool } tests := []struct { name string @@ -229,9 +216,9 @@ func TestNewDumpResult(t *testing.T) { { name: "valid entry", args: args{ - clusterName: validClusterName, - runID: validRunID, - compressed: false, + clusterName: validClusterName, + runID: validRunID, + isCompressed: false, }, want: &DumpResult{ clusterName: validClusterName, @@ -243,9 +230,9 @@ func TestNewDumpResult(t *testing.T) { { name: "invalid clustername", args: args{ - clusterName: nonValidClusterName, - runID: validRunID, - compressed: false, + clusterName: nonValidClusterName, + runID: validRunID, + isCompressed: false, }, want: nil, wantErr: true, @@ -253,9 +240,9 @@ func TestNewDumpResult(t *testing.T) { { name: "empty clustername", args: args{ - clusterName: "", - runID: validRunID, - compressed: false, + clusterName: "", + runID: validRunID, + isCompressed: false, }, want: nil, wantErr: true, @@ -263,9 +250,9 @@ func TestNewDumpResult(t *testing.T) { { name: "invalid runID", args: args{ - clusterName: validClusterName, - runID: nonValidRunID, - compressed: false, + clusterName: validClusterName, + runID: nonValidRunID, + isCompressed: false, }, want: nil, wantErr: true, @@ -273,9 +260,9 @@ func TestNewDumpResult(t *testing.T) { { name: "invalid runID", args: args{ - clusterName: validClusterName, - runID: "", - compressed: false, + clusterName: validClusterName, + runID: "", + isCompressed: false, }, want: nil, wantErr: true, @@ -285,7 +272,7 @@ func TestNewDumpResult(t *testing.T) { tt := tt t.Run(tt.name, func(t *testing.T) { t.Parallel() - got, err := NewDumpResult(tt.args.clusterName, tt.args.runID, tt.args.compressed) + got, err := NewDumpResult(tt.args.clusterName, tt.args.runID, tt.args.isCompressed) if (err != nil) != tt.wantErr { t.Errorf("NewDumpResult() error = %v, wantErr %v", err, tt.wantErr) From 04657f1f1535fe26b2ddaf57164e611c05e2e969 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Wed, 24 Jul 2024 11:25:08 +0200 Subject: [PATCH 21/22] linter fix --- pkg/dump/ingestor_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/dump/ingestor_test.go b/pkg/dump/ingestor_test.go index 149146c75..d0000a254 100644 --- a/pkg/dump/ingestor_test.go +++ b/pkg/dump/ingestor_test.go @@ -66,7 +66,9 @@ func TestNewDumpIngestor(t *testing.T) { wantErr: false, }, } - for _, tt := range tests { + // Can not run parallel tests as the environment variable KUBECONFIG is set + // t.Setenv is not compatible with parallel tests + for _, tt := range tests { //nolint:paralleltest tt := tt t.Run(tt.name, func(t *testing.T) { got, err := NewDumpIngestor(ctx, tt.args.collectorClient, tt.args.compression, tt.args.directoryOutput, tt.args.runID) From 60abdd221218a27ca3209a765a106b5ad4fb7ead Mon Sep 17 00:00:00 2001 From: jt-dd Date: Wed, 24 Jul 2024 15:54:13 +0200 Subject: [PATCH 22/22] merging rehydrate to ingest command --- cmd/kubehound/ingest.go | 40 +++++++++++++++++----------------------- pkg/cmd/dump.go | 5 ----- 2 files changed, 17 insertions(+), 28 deletions(-) diff --git a/cmd/kubehound/ingest.go b/cmd/kubehound/ingest.go index 47a56c7a0..58bffda9f 100644 --- a/cmd/kubehound/ingest.go +++ b/cmd/kubehound/ingest.go @@ -42,32 +42,17 @@ var ( remoteIngestCmd = &cobra.Command{ Use: "remote", Short: "Ingest data remotely on a KHaaS instance", - Long: `Run an ingestion on KHaaS from a bucket to build the attack path`, + Long: `Run an ingestion on KHaaS from a bucket to build the attack path, by default it will rehydrate the latest snapshot previously dumped on a KHaaS instance from all clusters`, PreRunE: func(cobraCmd *cobra.Command, args []string) error { viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck + cobraCmd.MarkFlagRequired("khaas-server") //nolint: errcheck viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck - return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true) - }, - RunE: func(cobraCmd *cobra.Command, args []string) error { - // Passing the Kubehound config from viper - khCfg, err := cmd.GetConfig() - if err != nil { - return fmt.Errorf("get config: %w", err) + if !isIngestRemoteDefault() { + cobraCmd.MarkFlagRequired("run_id") //nolint: errcheck + cobraCmd.MarkFlagRequired("cluster") //nolint: errcheck } - return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID) - }, - } - - remoteIngestRehydrateCmd = &cobra.Command{ - Use: "rehydrate", - Short: "Rehydrate snapshot previously dumped on a KHaaS instance", - Long: `Run an rehydratation on KHaaS from a bucket to build the attack path`, - PreRunE: func(cobraCmd *cobra.Command, args []string) error { - viper.BindPFlag(config.IngestorAPIEndpoint, cobraCmd.Flags().Lookup("khaas-server")) //nolint: errcheck - viper.BindPFlag(config.IngestorAPIInsecure, cobraCmd.Flags().Lookup("insecure")) //nolint: errcheck - return cmd.InitializeKubehoundConfig(cobraCmd.Context(), "", false, true) }, RunE: func(cobraCmd *cobra.Command, args []string) error { @@ -77,11 +62,22 @@ var ( return fmt.Errorf("get config: %w", err) } - return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor) + if isIngestRemoteDefault() { + return core.CoreClientGRPCRehydrateLatest(khCfg.Ingestor) + } + + return core.CoreClientGRPCIngest(khCfg.Ingestor, khCfg.Ingestor.ClusterName, khCfg.Ingestor.RunID) }, } ) +func isIngestRemoteDefault() bool { + runID := viper.GetString(config.IngestorRunID) + clusterName := viper.GetString(config.IngestorClusterName) + + return runID == "" && clusterName == "" +} + func init() { ingestCmd.AddCommand(localIngestCmd) @@ -91,7 +87,5 @@ func init() { ingestCmd.AddCommand(remoteIngestCmd) cmd.InitRemoteIngestCmd(remoteIngestCmd, true) - remoteIngestCmd.AddCommand(remoteIngestRehydrateCmd) - rootCmd.AddCommand(ingestCmd) } diff --git a/pkg/cmd/dump.go b/pkg/cmd/dump.go index 76e5ef8fc..6ad06e755 100644 --- a/pkg/cmd/dump.go +++ b/pkg/cmd/dump.go @@ -70,14 +70,9 @@ func InitRemoteIngestCmd(cmd *cobra.Command, standalone bool) { if standalone { cmd.Flags().String("run_id", "", "KubeHound run id to ingest (e.g.: 01htdgjj34mcmrrksw4bjy2e94)") viper.BindPFlag(config.IngestorRunID, cmd.Flags().Lookup("run_id")) //nolint: errcheck - cmd.MarkFlagRequired("run_id") //nolint: errcheck cmd.Flags().String("cluster", "", "Cluster name to ingest (e.g.: my-cluster-1)") viper.BindPFlag(config.IngestorClusterName, cmd.Flags().Lookup("cluster")) //nolint: errcheck - cmd.MarkFlagRequired("cluster") //nolint: errcheck - - // Reusing the same flags for the dump cloud and ingest command - cmd.MarkFlagRequired("khaas-server") //nolint: errcheck } }