diff --git a/diode-server/dbstore/postgres/repository.go b/diode-server/dbstore/postgres/repository.go index 687fd653..e33c9dc8 100644 --- a/diode-server/dbstore/postgres/repository.go +++ b/diode-server/dbstore/postgres/repository.go @@ -111,61 +111,9 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil return nil, err } - changeSetsMap := make(map[int32]*changeset.ChangeSet) - for _, row := range rawIngestionLogs { - if !row.VIngestionLogsChangeSet.ID.Valid || !row.VChangeSetsChange.ChangeUuid.Valid { - continue - } - - var changeData map[string]any - if row.VChangeSetsChange.Data != nil { - if err := json.Unmarshal(row.VChangeSetsChange.Data, &changeData); err != nil { - return nil, fmt.Errorf("failed to unmarshal change data: %w", err) - } - } - - change := changeset.Change{ - ChangeID: row.VChangeSetsChange.ChangeUuid.String, - ChangeType: row.VChangeSetsChange.ChangeType.String, - ObjectType: row.VChangeSetsChange.ObjectType.String, - Data: changeData, - } - objID := int(row.VChangeSetsChange.ObjectID.Int32) - if row.VChangeSetsChange.ObjectID.Valid { - change.ObjectID = &objID - } - objVersion := int(row.VChangeSetsChange.ObjectVersion.Int32) - if row.VChangeSetsChange.ObjectVersion.Valid { - change.ObjectVersion = &objVersion - } - - changeSet, ok := changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32] - if !ok { - changes := make([]changeset.Change, 0) - changes = append(changes, change) - - changeSet = &changeset.ChangeSet{ - ChangeSetID: row.VIngestionLogsChangeSet.ChangeSetUuid.String, - ChangeSet: changes, - } - if row.VIngestionLogsChangeSet.BranchID.Valid { - changeSet.BranchID = &row.VIngestionLogsChangeSet.BranchID.String - } - changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32] = changeSet - continue - } - - changeSet.ChangeSet = append(changeSet.ChangeSet, change) - } - ingestionLogs := make([]*reconcilerpb.IngestionLog, 0, len(rawIngestionLogs)) - ingestionLogsMap := make(map[int32]*reconcilerpb.IngestionLog) for _, row := range rawIngestionLogs { - if _, ok := ingestionLogsMap[row.IngestionLog.ID]; ok { - continue - } - - ingestionLog := row.IngestionLog + ingestionLog := row entity := &diodepb.Entity{} if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil { return nil, fmt.Errorf("failed to unmarshal entity: %w", err) @@ -191,10 +139,40 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil Error: &ingestionErr, } - changeSet, ok := changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32] - if ok { + if row.Changes != nil { + var dbChanges []postgres.Change + if err := json.Unmarshal(row.Changes, &dbChanges); err != nil { + return nil, fmt.Errorf("failed to unmarshal changes: %w", err) + } + + changes := make([]changeset.Change, 0, len(dbChanges)) + for _, dbChange := range dbChanges { + change := changeset.Change{ + ChangeID: dbChange.ChangeUuid, + ChangeType: dbChange.ChangeType, + ObjectType: dbChange.ObjectType, + Data: dbChange.Data, + } + + objID := int(dbChange.ObjectID.Int32) + if dbChange.ObjectID.Valid { + change.ObjectID = &objID + } + objVersion := int(dbChange.ObjectVersion.Int32) + if dbChange.ObjectVersion.Valid { + change.ObjectVersion = &objVersion + } + + changes = append(changes, change) + } + + changeSet := &changeset.ChangeSet{ + ChangeSetID: row.ChangeSet.ChangeSetUuid, + ChangeSet: changes, + } + var compressedChangeSet []byte - if len(changeSet.ChangeSet) > 0 { + if len(changes) > 0 { b, err := changeset.CompressChangeSet(changeSet) if err != nil { return nil, fmt.Errorf("failed to compress change set: %w", err) @@ -203,12 +181,11 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil } log.ChangeSet = &reconcilerpb.ChangeSet{ - Id: row.VIngestionLogsChangeSet.ChangeSetUuid.String, + Id: row.ChangeSet.ChangeSetUuid, Data: compressedChangeSet, } } - ingestionLogsMap[ingestionLog.ID] = log ingestionLogs = append(ingestionLogs, log) } diff --git a/diode-server/reconciler/logs_retriever.go b/diode-server/reconciler/logs_retriever.go index f677fa57..6abffe25 100644 --- a/diode-server/reconciler/logs_retriever.go +++ b/diode-server/reconciler/logs_retriever.go @@ -41,9 +41,14 @@ func retrieveIngestionMetrics(ctx context.Context, repository Repository) (*reco } func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, repository Repository, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) { + ingestionLogsMetricsResponse, err := retrieveIngestionMetrics(ctx, repository) + if err != nil { + return nil, err + } + if in.GetOnlyMetrics() { logger.Debug("retrieving only ingestion metrics") - return retrieveIngestionMetrics(ctx, repository) + return ingestionLogsMetricsResponse, nil } pageSize := in.GetPageSize() @@ -73,25 +78,9 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, repository } // Fill metrics - var metrics reconcilerpb.IngestionMetrics - total := int32(len(logs)) - if in.State != nil { - if in.GetState() == reconcilerpb.State_UNSPECIFIED { - metrics.Total = total - } else if in.GetState() == reconcilerpb.State_QUEUED { - metrics.Queued = total - } else if in.GetState() == reconcilerpb.State_RECONCILED { - metrics.Reconciled = total - } else if in.GetState() == reconcilerpb.State_FAILED { - metrics.Failed = total - } else if in.GetState() == reconcilerpb.State_NO_CHANGES { - metrics.NoChanges = total - } - } else { - metrics.Total = total - } + metrics := ingestionLogsMetricsResponse.GetMetrics() - return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: &metrics, NextPageToken: nextPageToken}, nil + return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: metrics, NextPageToken: nextPageToken}, nil } func decodeBase64ToInt(encoded string) (int32, error) { diff --git a/diode-server/reconciler/server_internal_test.go b/diode-server/reconciler/server_internal_test.go index e34b8e96..f80f438c 100644 --- a/diode-server/reconciler/server_internal_test.go +++ b/diode-server/reconciler/server_internal_test.go @@ -108,15 +108,19 @@ func TestIsAuthenticated(t *testing.T) { func TestRetrieveLogs(t *testing.T) { tests := []struct { - name string - in reconcilerpb.RetrieveIngestionLogsRequest - ingestionLogs []*reconcilerpb.IngestionLog - response *reconcilerpb.RetrieveIngestionLogsResponse - hasError bool + name string + in reconcilerpb.RetrieveIngestionLogsRequest + ingestionLogsPerState map[reconcilerpb.State]int32 + ingestionLogs []*reconcilerpb.IngestionLog + response *reconcilerpb.RetrieveIngestionLogsResponse + hasError bool }{ { name: "valid request", in: reconcilerpb.RetrieveIngestionLogsRequest{}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 2, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -220,7 +224,8 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 2, + Total: 2, + Reconciled: 2, }, NextPageToken: "F/Jk/zc08gA=", }, @@ -229,6 +234,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "request with reconciliation error", in: reconcilerpb.RetrieveIngestionLogsRequest{}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_FAILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { DataType: "ipam.ipaddress", @@ -299,7 +307,8 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Failed: 1, }, NextPageToken: "AAAFlw==", }, @@ -308,6 +317,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by new state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_QUEUED.Enum()}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_QUEUED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { DataType: "dcim.interface", @@ -356,6 +368,7 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Queued: 1, }, NextPageToken: "AAAFlw==", @@ -365,6 +378,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by reconciled state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_RECONCILED.Enum()}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -415,6 +431,7 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Reconciled: 1, }, NextPageToken: "AAAFlw==", @@ -424,6 +441,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by failed state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_FAILED.Enum()}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_FAILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -474,6 +494,7 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, Failed: 1, }, NextPageToken: "AAAFlw==", @@ -483,6 +504,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by no changes state", in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_NO_CHANGES.Enum()}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_NO_CHANGES: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -533,6 +557,7 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ + Total: 1, NoChanges: 1, }, NextPageToken: "AAAFlw==", @@ -542,6 +567,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by data type", in: reconcilerpb.RetrieveIngestionLogsRequest{DataType: "dcim.interface"}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -592,7 +620,8 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, @@ -601,6 +630,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "filter by timestamp", in: reconcilerpb.RetrieveIngestionLogsRequest{IngestionTsStart: 1725552914392208639}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -651,7 +683,8 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, @@ -660,6 +693,9 @@ func TestRetrieveLogs(t *testing.T) { { name: "pagination check", in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "AAAFlg=="}, + ingestionLogsPerState: map[reconcilerpb.State]int32{ + reconcilerpb.State_RECONCILED: 1, + }, ingestionLogs: []*reconcilerpb.IngestionLog{ { Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ", @@ -710,7 +746,8 @@ func TestRetrieveLogs(t *testing.T) { }, }, Metrics: &reconcilerpb.IngestionMetrics{ - Total: 1, + Total: 1, + Reconciled: 1, }, NextPageToken: "AAAFlw==", }, @@ -741,6 +778,8 @@ func TestRetrieveLogs(t *testing.T) { retrieveErr = errors.New("failed to retrieve ingestion logs") } + mockRepository.On("CountIngestionLogsPerState", ctx).Return(tt.ingestionLogsPerState, nil) + if !tt.hasError { mockRepository.On("RetrieveIngestionLogs", ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.ingestionLogs, retrieveErr) }