From 3fd51b5a574eb09ad3833ea6f910a0d8e131f034 Mon Sep 17 00:00:00 2001 From: Luke Tucker Date: Fri, 24 Jan 2025 14:41:19 -0500 Subject: [PATCH 1/2] fix: add a placeholder changeset if diffing fails improves information available when presented as a deviation --- diode-server/reconciler/differ/differ.go | 22 ++ .../reconciler/ingestion_processor_test.go | 2 +- diode-server/reconciler/ops.go | 8 + diode-server/reconciler/ops_test.go | 218 ++++++++++++++++++ 4 files changed, 249 insertions(+), 1 deletion(-) create mode 100644 diode-server/reconciler/ops_test.go diff --git a/diode-server/reconciler/differ/differ.go b/diode-server/reconciler/differ/differ.go index 2cab7c0..7a66dc2 100644 --- a/diode-server/reconciler/differ/differ.go +++ b/diode-server/reconciler/differ/differ.go @@ -208,3 +208,25 @@ func genDeviationName(objects []netbox.ComparableData) *string { return &deviationName } + +func deviationNameForDiffFailure(entity IngestEntity) string { + e, err := extractIngestEntityData(entity) + if err != nil { + return fmt.Sprintf("Unknown %s discovered", entity.ObjectType) + } + + return fmt.Sprintf("%s %s discovered", e.ObjectTypeName(), e.ObjectPrimaryValue()) +} + +// FailedDiffChangeSet generates a placeholder change set for a failed diff +func FailedDiffChangeSet(entity IngestEntity, branchID string) *changeset.ChangeSet { + deviationName := deviationNameForDiffFailure(entity) + cs := &changeset.ChangeSet{ + ChangeSetID: uuid.NewString(), + DeviationName: &deviationName, + } + if branchID != "" { + cs.BranchID = &branchID + } + return cs +} diff --git a/diode-server/reconciler/ingestion_processor_test.go b/diode-server/reconciler/ingestion_processor_test.go index 668b9f0..86c3a69 100644 --- a/diode-server/reconciler/ingestion_processor_test.go +++ b/diode-server/reconciler/ingestion_processor_test.go @@ -235,7 +235,7 @@ func TestIngestionProcessorStart(t *testing.T) { mockRepository.On("CreateIngestionLog", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) mockRepository.On("UpdateIngestionLogStateWithError", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil) - + mockRepository.On("CreateChangeSet", ctx, mock.Anything, mock.Anything).Return(int32Ptr(1), nil) redisClient := redis.NewClient(&redis.Options{ Addr: s.Addr(), DB: 1, diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index 3f42900..ee07177 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -59,6 +59,14 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges if err2 := o.repository.UpdateIngestionLogStateWithError(ctx, ingestionLogID, reconcilerpb.State_FAILED, ingestionErr); err2 != nil { err = errors.Join(err, err2) } + + cs := differ.FailedDiffChangeSet(ingestEntity, branchID) + _, err1 := o.repository.CreateChangeSet(ctx, *cs, ingestionLogID) + if err1 != nil { + o.logger.Error("error generating diff failure placeholder change set") + return nil, nil, errors.Join(err, err1) + } + return nil, nil, err } diff --git a/diode-server/reconciler/ops_test.go b/diode-server/reconciler/ops_test.go new file mode 100644 index 0000000..3a742ab --- /dev/null +++ b/diode-server/reconciler/ops_test.go @@ -0,0 +1,218 @@ +package reconciler_test + +import ( + "context" + "fmt" + "log/slog" + "os" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/netboxlabs/diode/diode-server/gen/diode/v1/diodepb" + pb "github.com/netboxlabs/diode/diode-server/gen/diode/v1/reconcilerpb" + "github.com/netboxlabs/diode/diode-server/netbox" + "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin" + pluginmocks "github.com/netboxlabs/diode/diode-server/netboxdiodeplugin/mocks" + "github.com/netboxlabs/diode/diode-server/reconciler" + "github.com/netboxlabs/diode/diode-server/reconciler/changeset" + "github.com/netboxlabs/diode/diode-server/reconciler/mocks" +) + +func TestProOpsGenerateChangeSet(t *testing.T) { + type mockRetrieveObjectState struct { + objectType string + objectID int + branchID string + queryParams map[string]string + objectChangeID int + object netbox.ComparableData + err error + } + + type mockCreateChangeSet struct { + ingestionLogDBID int32 + deviationName *string + id int32 + } + + type mockUpdateIngestionLogStateWithError struct { + ingestionLogDBID int32 + state pb.State + err error + } + + tests := []struct { + name string + + logDBID int32 + log *pb.IngestionLog + branchID string + + retrieveObjectStates []mockRetrieveObjectState + createChangeSets []mockCreateChangeSet + updateIngestionLogStateWithErrors []mockUpdateIngestionLogStateWithError + + errorMessage string + hasError bool + }{ + { + name: "diff failure generates a placeholder change with deviation type", + logDBID: 1234, + log: &pb.IngestionLog{ + Id: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + ObjectType: netbox.DcimSiteObjectType, + State: pb.State_QUEUED, + RequestId: "1abf059c-496f-4037-83c2-0e9b1d021e85", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + }, + }, + }, + }, + retrieveObjectStates: []mockRetrieveObjectState{ + { + objectType: "dcim.site", + objectID: 0, + queryParams: map[string]string{"q": "test-site-1"}, + err: fmt.Errorf("Client.Timeout exceeded while awaiting headers"), + }, + }, + updateIngestionLogStateWithErrors: []mockUpdateIngestionLogStateWithError{ + { + ingestionLogDBID: 1234, + state: pb.State_FAILED, + }, + }, + createChangeSets: []mockCreateChangeSet{ + { + ingestionLogDBID: 1234, + deviationName: strPtr("Site test-site-1 discovered"), + id: 1235, + }, + }, + hasError: true, + errorMessage: "Client.Timeout exceeded while awaiting headers", + }, + { + name: "placeholder change reflects branch", + branchID: "branch-1", + logDBID: 1234, + log: &pb.IngestionLog{ + Id: "8a8ae517-85b9-466e-890c-aadb0771cc9e", + ObjectType: netbox.DcimSiteObjectType, + State: pb.State_QUEUED, + RequestId: "1abf059c-496f-4037-83c2-0e9b1d021e85", + Entity: &diodepb.Entity{ + Entity: &diodepb.Entity_Site{ + Site: &diodepb.Site{ + Name: "test-site-1", + }, + }, + }, + }, + retrieveObjectStates: []mockRetrieveObjectState{ + { + objectType: "dcim.site", + objectID: 0, + branchID: "branch-1", + queryParams: map[string]string{"q": "test-site-1"}, + err: fmt.Errorf("Client.Timeout exceeded while awaiting headers"), + }, + }, + updateIngestionLogStateWithErrors: []mockUpdateIngestionLogStateWithError{ + { + ingestionLogDBID: 1234, + state: pb.State_FAILED, + }, + }, + createChangeSets: []mockCreateChangeSet{ + { + ingestionLogDBID: 1234, + deviationName: strPtr("Site test-site-1 discovered"), + id: 1235, + }, + }, + hasError: true, + errorMessage: "Client.Timeout exceeded while awaiting headers", + }, + } + + ctx := context.Background() + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug, AddSource: false})) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockRepository := mocks.NewRepository(t) + mockNetBoxClient := pluginmocks.NewNetBoxAPI(t) + ops := reconciler.NewOps(mockRepository, mockNetBoxClient, logger) + + for _, m := range tt.retrieveObjectStates { + if m.err == nil { + mockNetBoxClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ + ObjectType: m.objectType, + ObjectID: m.objectID, + BranchID: m.branchID, + Params: m.queryParams, + }).Return(&netboxdiodeplugin.ObjectState{ + ObjectID: m.objectID, + ObjectType: m.objectType, + ObjectChangeID: m.objectChangeID, + Object: m.object, + }, nil) + } else { + mockNetBoxClient.EXPECT().RetrieveObjectState(ctx, netboxdiodeplugin.RetrieveObjectStateQueryParams{ + ObjectType: m.objectType, + ObjectID: m.objectID, + BranchID: m.branchID, + Params: m.queryParams, + }).Return(nil, m.err) + } + } + for _, m := range tt.createChangeSets { + mockRepository.EXPECT().CreateChangeSet(ctx, mock.MatchedBy(func(c changeset.ChangeSet) bool { + if !strPtrEq(c.DeviationName, m.deviationName) { + return false + } + if tt.branchID != "" && !strPtrEq(c.BranchID, &tt.branchID) { + return false + } + return true + }), m.ingestionLogDBID).Return(&m.id, nil) + } + for _, m := range tt.updateIngestionLogStateWithErrors { + mockRepository.EXPECT().UpdateIngestionLogStateWithError(ctx, m.ingestionLogDBID, m.state, mock.Anything).Return(m.err) + } + + csid, cs, err := ops.GenerateChangeSet(ctx, tt.logDBID, tt.log, tt.branchID) + if tt.hasError { + require.Error(t, err) + require.Nil(t, csid) + require.Nil(t, cs) + require.Contains(t, err.Error(), tt.errorMessage) + } else { + require.NoError(t, err) + require.NotNil(t, csid) + require.NotNil(t, cs) + // TODO(ltucker): positive tests + } + }) + } +} + +func strPtrEq(a *string, b *string) bool { + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + return *a == *b +} + +func strPtr(s string) *string { + return &s +} From 65fabe11820b0635f913d87d7daaa4cd67568656 Mon Sep 17 00:00:00 2001 From: Luke Tucker Date: Fri, 24 Jan 2025 15:20:54 -0500 Subject: [PATCH 2/2] send the placeholder along if generated --- diode-server/reconciler/ops.go | 4 ++-- diode-server/reconciler/ops_test.go | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/diode-server/reconciler/ops.go b/diode-server/reconciler/ops.go index ee07177..6be48ae 100644 --- a/diode-server/reconciler/ops.go +++ b/diode-server/reconciler/ops.go @@ -61,13 +61,13 @@ func (o *Ops) GenerateChangeSet(ctx context.Context, ingestionLogID int32, inges } cs := differ.FailedDiffChangeSet(ingestEntity, branchID) - _, err1 := o.repository.CreateChangeSet(ctx, *cs, ingestionLogID) + id, err1 := o.repository.CreateChangeSet(ctx, *cs, ingestionLogID) if err1 != nil { o.logger.Error("error generating diff failure placeholder change set") return nil, nil, errors.Join(err, err1) } - return nil, nil, err + return id, cs, err } changeSetID, err := o.repository.CreateChangeSet(ctx, *changeSet, ingestionLogID) diff --git a/diode-server/reconciler/ops_test.go b/diode-server/reconciler/ops_test.go index 3a742ab..39b2677 100644 --- a/diode-server/reconciler/ops_test.go +++ b/diode-server/reconciler/ops_test.go @@ -190,8 +190,6 @@ func TestProOpsGenerateChangeSet(t *testing.T) { csid, cs, err := ops.GenerateChangeSet(ctx, tt.logDBID, tt.log, tt.branchID) if tt.hasError { require.Error(t, err) - require.Nil(t, csid) - require.Nil(t, cs) require.Contains(t, err.Error(), tt.errorMessage) } else { require.NoError(t, err)