Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

only-one-copy-in-graph-per-cluster #286

Merged
merged 11 commits into from
Nov 14, 2024
11 changes: 10 additions & 1 deletion pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif
}
}

func (g *IngestorAPI) Close(ctx context.Context) {
g.providers.Close(ctx)
}

// RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package
func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) {
l := log.Logger(ctx)
l.Error("id123")
// first level key are cluster names
directories, errRet := g.puller.ListFiles(ctx, "", false)
if errRet != nil {
Expand Down Expand Up @@ -204,6 +207,12 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
}
}

// Keeping only the latest dump for each cluster in memory
err = g.providers.GraphProvider.Clean(runCtx, clusterName) //nolint: contextcheck
if err != nil {
return err
}

err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck
if err != nil {
return err
Expand Down
23 changes: 18 additions & 5 deletions pkg/kubehound/core/core_grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/DataDog/KubeHound/pkg/ingestor/notifier/noop"
"github.com/DataDog/KubeHound/pkg/ingestor/puller/blob"
"github.com/DataDog/KubeHound/pkg/kubehound/providers"
"github.com/DataDog/KubeHound/pkg/telemetry/events"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
func initCoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) (*api.IngestorAPI, error) {
l := log.Logger(ctx)
l.Info("Starting KubeHound Distributed Ingestor Service")
span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch)
Expand All @@ -28,22 +29,34 @@ func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
l.Info("Initializing providers (graph, cache, store)")
p, err := providers.NewProvidersFactoryConfig(ctx, khCfg)
if err != nil {
return fmt.Errorf("factory config creation: %w", err)
return nil, fmt.Errorf("factory config creation: %w", err)
}
defer p.Close(ctx)

l.Info("Creating Blob Storage provider")
puller, err := blob.NewBlobStorage(khCfg, khCfg.Ingestor.Blob)
if err != nil {
return err
return nil, err
}

l.Info("Creating Noop Notifier")
noopNotifier := noop.NewNoopNotifier()

l.Info("Creating Ingestor API")
ingestorApi := api.NewIngestorAPI(khCfg, puller, noopNotifier, p)

return api.NewIngestorAPI(khCfg, puller, noopNotifier, p), nil
}

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
ingestorApi, err := initCoreGrpcApi(ctx, khCfg)
if err != nil {
_ = events.PushEvent(ctx, events.IngestorFailed, "")

return err
}
defer ingestorApi.Close(ctx)
_ = events.PushEvent(ctx, events.IngestorInit, "")

l := log.Logger(ctx)
l.Info("Starting Ingestor API")
err = grpc.Listen(ctx, ingestorApi)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/DataDog/KubeHound/pkg/kubehound/graph/vertex"
"github.com/DataDog/KubeHound/pkg/kubehound/storage/cache"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
gremlin "github.com/apache/tinkerpop/gremlin-go/v3/driver"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const (
Expand Down Expand Up @@ -145,3 +147,32 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error {

return nil
}

// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for later: it would be great to have a trace for this. I have no intuition on how long it takes.

var err error
span, ctx := span.SpanRunFromContext(ctx, span.IngestorClean)
defer func() { span.Finish(tracer.WithError(err)) }()
l := log.Trace(ctx)
l.Infof("Cleaning cluster", log.FieldClusterKey, cluster)
g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

gtx, err := tx.Begin()
if err != nil {
return err
}

err = <-gtx.V().Has("cluster", cluster).Drop().Iterate()
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

return nil
}
43 changes: 43 additions & 0 deletions pkg/kubehound/storage/graphdb/mocks/graph_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/kubehound/storage/graphdb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Provider interface {
// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
Raw() any

// Droping all assets from the graph database from a cluster name
Clean(ctx context.Context, cluster string) error

// VertexWriter creates a new AsyncVertexWriter instance to enable asynchronous bulk inserts of vertices.
VertexWriter(ctx context.Context, v vertex.Builder, c cache.CacheProvider, opts ...WriterOption) (AsyncVertexWriter, error)

Expand Down
1 change: 1 addition & 0 deletions pkg/telemetry/span/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
IngestorBlobPut = "kubehound.ingestor.blob.put"
IngestorBlobExtract = "kubehound.ingestor.blob.extract"
IngestorBlobClose = "kubehound.ingestor.blob.close"
IngestorClean = "kubehound.ingestor.clean"

DumperLaunch = "kubehound.dumper.launch"

Expand Down
Loading