diff --git a/cmd/kubehound/backend.go b/cmd/kubehound/backend.go index afb8349bd..e44f5a554 100644 --- a/cmd/kubehound/backend.go +++ b/cmd/kubehound/backend.go @@ -1,7 +1,6 @@ package main import ( - "github.com/DataDog/KubeHound/pkg/backend" docker "github.com/DataDog/KubeHound/pkg/backend" "github.com/spf13/cobra" ) @@ -11,7 +10,7 @@ var ( hard bool composePath []string - uiProfile = backend.DefaultUIProfile + uiProfile = docker.DefaultUIProfile uiInvana bool ) diff --git a/cmd/kubehound/dumper.go b/cmd/kubehound/dumper.go index 732abbd12..1b7eec01b 100644 --- a/cmd/kubehound/dumper.go +++ b/cmd/kubehound/dumper.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + docker "github.com/DataDog/KubeHound/pkg/backend" "github.com/DataDog/KubeHound/pkg/cmd" "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/kubehound/core" @@ -12,6 +13,11 @@ import ( "github.com/spf13/viper" ) +var ( + runLocalIngest bool + startBackend bool +) + var ( dumpCmd = &cobra.Command{ Use: "dump", @@ -79,7 +85,25 @@ var ( if err != nil { return fmt.Errorf("get config: %w", err) } - _, err = core.DumpCore(cobraCmd.Context(), khCfg, false) + resultPath, err := core.DumpCore(cobraCmd.Context(), khCfg, false) + if err != nil { + return fmt.Errorf("dump core: %w", err) + } + + if startBackend { + err = docker.NewBackend(cobraCmd.Context(), composePath, docker.DefaultUIProfile) + if err != nil { + return fmt.Errorf("new backend: %w", err) + } + err = docker.Up(cobraCmd.Context()) + if err != nil { + return fmt.Errorf("docker up: %w", err) + } + } + + if runLocalIngest { + err = core.CoreLocalIngest(cobraCmd.Context(), khCfg, resultPath) + } return err }, @@ -92,6 +116,9 @@ func init() { cmd.InitRemoteDumpCmd(dumpRemoteCmd) cmd.InitRemoteIngestCmd(dumpRemoteCmd, false) + dumpLocalCmd.Flags().BoolVar(&runLocalIngest, "ingest", false, "Run the ingestion after the dump") + dumpLocalCmd.Flags().BoolVar(&startBackend, "backend", false, "Start the backend after the dump") + dumpCmd.AddCommand(dumpRemoteCmd) dumpCmd.AddCommand(dumpLocalCmd) rootCmd.AddCommand(dumpCmd) diff --git a/pkg/collector/k8s_api.go b/pkg/collector/k8s_api.go index 4efb68129..7cf13fe0a 100644 --- a/pkg/collector/k8s_api.go +++ b/pkg/collector/k8s_api.go @@ -31,14 +31,15 @@ import ( // FileCollector implements a collector based on local K8s API json files generated outside the KubeHound application via e.g kubectl. type k8sAPICollector struct { - clientset kubernetes.Interface - log *log.KubehoundLogger - rl ratelimit.Limiter - cfg *config.K8SAPICollectorConfig - tags collectorTags - waitTime map[string]time.Duration - startTime time.Time - mu *sync.Mutex + clientset kubernetes.Interface + log *log.KubehoundLogger + rl ratelimit.Limiter + cfg *config.K8SAPICollectorConfig + tags collectorTags + waitTime map[string]time.Duration + startTime time.Time + mu *sync.Mutex + isStreaming bool } const ( @@ -138,6 +139,12 @@ func (c *k8sAPICollector) wait(_ context.Context, resourceType string, tags []st defer c.mu.Unlock() c.waitTime[resourceType] += waitTime + // Display a message to tell the user the streaming has started (only once after the approval has been made) + if !c.isStreaming { + log.I.Info("Streaming data from the K8s API") + c.isStreaming = true + } + // entity := tag.Entity(resourceType) err := statsd.Gauge(metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1) if err != nil { diff --git a/pkg/dump/pipeline/pipeline.go b/pkg/dump/pipeline/pipeline.go index 15b6fd03c..d42d83790 100644 --- a/pkg/dump/pipeline/pipeline.go +++ b/pkg/dump/pipeline/pipeline.go @@ -149,6 +149,7 @@ func dumpK8sObjs(ctx context.Context, operationName string, entity string, strea var err error defer func() { span.Finish(tracer.WithError(err)) }() err = streamFunc(ctx) + log.I.Infof("Dumping %s done", entity) return ctx, err } diff --git a/pkg/kubehound/core/core_grpc_client.go b/pkg/kubehound/core/core_grpc_client.go index 40e387e5d..89f3f2da3 100644 --- a/pkg/kubehound/core/core_grpc_client.go +++ b/pkg/kubehound/core/core_grpc_client.go @@ -41,6 +41,8 @@ func CoreClientGRPCIngest(ingestorConfig config.IngestorConfig, clusteName strin defer conn.Close() client := pb.NewAPIClient(conn) + log.I.Infof("Launching ingestion on %s [rundID: %s]", ingestorConfig.API.Endpoint, runID) + _, err = client.Ingest(context.Background(), &pb.IngestRequest{ RunId: runID, ClusterName: clusteName,