Skip to content

Commit

Permalink
feat: refactor retrieving ingestion logs
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Fiedorowicz <mfiedorowicz@netboxlabs.com>
  • Loading branch information
mfiedorowicz committed Dec 20, 2024
1 parent e8c32fc commit bb7fe25
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 87 deletions.
93 changes: 35 additions & 58 deletions diode-server/dbstore/postgres/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,61 +111,9 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil
return nil, err
}

changeSetsMap := make(map[int32]*changeset.ChangeSet)
for _, row := range rawIngestionLogs {
if !row.VIngestionLogsChangeSet.ID.Valid || !row.VChangeSetsChange.ChangeUuid.Valid {
continue
}

var changeData map[string]any
if row.VChangeSetsChange.Data != nil {
if err := json.Unmarshal(row.VChangeSetsChange.Data, &changeData); err != nil {
return nil, fmt.Errorf("failed to unmarshal change data: %w", err)
}
}

change := changeset.Change{
ChangeID: row.VChangeSetsChange.ChangeUuid.String,
ChangeType: row.VChangeSetsChange.ChangeType.String,
ObjectType: row.VChangeSetsChange.ObjectType.String,
Data: changeData,
}
objID := int(row.VChangeSetsChange.ObjectID.Int32)
if row.VChangeSetsChange.ObjectID.Valid {
change.ObjectID = &objID
}
objVersion := int(row.VChangeSetsChange.ObjectVersion.Int32)
if row.VChangeSetsChange.ObjectVersion.Valid {
change.ObjectVersion = &objVersion
}

changeSet, ok := changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32]
if !ok {
changes := make([]changeset.Change, 0)
changes = append(changes, change)

changeSet = &changeset.ChangeSet{
ChangeSetID: row.VIngestionLogsChangeSet.ChangeSetUuid.String,
ChangeSet: changes,
}
if row.VIngestionLogsChangeSet.BranchID.Valid {
changeSet.BranchID = &row.VIngestionLogsChangeSet.BranchID.String
}
changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32] = changeSet
continue
}

changeSet.ChangeSet = append(changeSet.ChangeSet, change)
}

ingestionLogs := make([]*reconcilerpb.IngestionLog, 0, len(rawIngestionLogs))
ingestionLogsMap := make(map[int32]*reconcilerpb.IngestionLog)
for _, row := range rawIngestionLogs {
if _, ok := ingestionLogsMap[row.IngestionLog.ID]; ok {
continue
}

ingestionLog := row.IngestionLog
ingestionLog := row
entity := &diodepb.Entity{}
if err := protojson.Unmarshal(ingestionLog.Entity, entity); err != nil {
return nil, fmt.Errorf("failed to unmarshal entity: %w", err)
Expand All @@ -191,10 +139,40 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil
Error: &ingestionErr,
}

changeSet, ok := changeSetsMap[row.VIngestionLogsChangeSet.ID.Int32]
if ok {
if row.Changes != nil {
var dbChanges []postgres.Change
if err := json.Unmarshal(row.Changes, &dbChanges); err != nil {
return nil, fmt.Errorf("failed to unmarshal changes: %w", err)
}

changes := make([]changeset.Change, 0, len(dbChanges))
for _, dbChange := range dbChanges {
change := changeset.Change{
ChangeID: dbChange.ChangeUuid,
ChangeType: dbChange.ChangeType,
ObjectType: dbChange.ObjectType,
Data: dbChange.Data,
}

objID := int(dbChange.ObjectID.Int32)
if dbChange.ObjectID.Valid {
change.ObjectID = &objID
}
objVersion := int(dbChange.ObjectVersion.Int32)
if dbChange.ObjectVersion.Valid {
change.ObjectVersion = &objVersion
}

changes = append(changes, change)
}

changeSet := &changeset.ChangeSet{
ChangeSetID: row.ChangeSet.ChangeSetUuid,
ChangeSet: changes,
}

var compressedChangeSet []byte
if len(changeSet.ChangeSet) > 0 {
if len(changes) > 0 {
b, err := changeset.CompressChangeSet(changeSet)
if err != nil {
return nil, fmt.Errorf("failed to compress change set: %w", err)
Expand All @@ -203,12 +181,11 @@ func (r *Repository) RetrieveIngestionLogs(ctx context.Context, filter *reconcil
}

log.ChangeSet = &reconcilerpb.ChangeSet{
Id: row.VIngestionLogsChangeSet.ChangeSetUuid.String,
Id: row.ChangeSet.ChangeSetUuid,
Data: compressedChangeSet,
}
}

ingestionLogsMap[ingestionLog.ID] = log
ingestionLogs = append(ingestionLogs, log)
}

Expand Down
27 changes: 8 additions & 19 deletions diode-server/reconciler/logs_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ func retrieveIngestionMetrics(ctx context.Context, repository Repository) (*reco
}

func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, repository Repository, in *reconcilerpb.RetrieveIngestionLogsRequest) (*reconcilerpb.RetrieveIngestionLogsResponse, error) {
ingestionLogsMetricsResponse, err := retrieveIngestionMetrics(ctx, repository)
if err != nil {
return nil, err
}

if in.GetOnlyMetrics() {
logger.Debug("retrieving only ingestion metrics")
return retrieveIngestionMetrics(ctx, repository)
return ingestionLogsMetricsResponse, nil
}

pageSize := in.GetPageSize()
Expand Down Expand Up @@ -73,25 +78,9 @@ func retrieveIngestionLogs(ctx context.Context, logger *slog.Logger, repository
}

// Fill metrics
var metrics reconcilerpb.IngestionMetrics
total := int32(len(logs))
if in.State != nil {
if in.GetState() == reconcilerpb.State_UNSPECIFIED {
metrics.Total = total
} else if in.GetState() == reconcilerpb.State_QUEUED {
metrics.Queued = total
} else if in.GetState() == reconcilerpb.State_RECONCILED {
metrics.Reconciled = total
} else if in.GetState() == reconcilerpb.State_FAILED {
metrics.Failed = total
} else if in.GetState() == reconcilerpb.State_NO_CHANGES {
metrics.NoChanges = total
}
} else {
metrics.Total = total
}
metrics := ingestionLogsMetricsResponse.GetMetrics()

return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: &metrics, NextPageToken: nextPageToken}, nil
return &reconcilerpb.RetrieveIngestionLogsResponse{Logs: logs, Metrics: metrics, NextPageToken: nextPageToken}, nil
}

func decodeBase64ToInt(encoded string) (int32, error) {
Expand Down
59 changes: 49 additions & 10 deletions diode-server/reconciler/server_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,19 @@ func TestIsAuthenticated(t *testing.T) {

func TestRetrieveLogs(t *testing.T) {
tests := []struct {
name string
in reconcilerpb.RetrieveIngestionLogsRequest
ingestionLogs []*reconcilerpb.IngestionLog
response *reconcilerpb.RetrieveIngestionLogsResponse
hasError bool
name string
in reconcilerpb.RetrieveIngestionLogsRequest
ingestionLogsPerState map[reconcilerpb.State]int32
ingestionLogs []*reconcilerpb.IngestionLog
response *reconcilerpb.RetrieveIngestionLogsResponse
hasError bool
}{
{
name: "valid request",
in: reconcilerpb.RetrieveIngestionLogsRequest{},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_RECONCILED: 2,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -220,7 +224,8 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 2,
Total: 2,
Reconciled: 2,
},
NextPageToken: "F/Jk/zc08gA=",
},
Expand All @@ -229,6 +234,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "request with reconciliation error",
in: reconcilerpb.RetrieveIngestionLogsRequest{},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_FAILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
DataType: "ipam.ipaddress",
Expand Down Expand Up @@ -299,7 +307,8 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Total: 1,
Failed: 1,
},
NextPageToken: "AAAFlw==",
},
Expand All @@ -308,6 +317,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by new state",
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_QUEUED.Enum()},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_QUEUED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
DataType: "dcim.interface",
Expand Down Expand Up @@ -356,6 +368,7 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Queued: 1,
},
NextPageToken: "AAAFlw==",
Expand All @@ -365,6 +378,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by reconciled state",
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_RECONCILED.Enum()},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_RECONCILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -415,6 +431,7 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Reconciled: 1,
},
NextPageToken: "AAAFlw==",
Expand All @@ -424,6 +441,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by failed state",
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_FAILED.Enum()},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_FAILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -474,6 +494,7 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Failed: 1,
},
NextPageToken: "AAAFlw==",
Expand All @@ -483,6 +504,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by no changes state",
in: reconcilerpb.RetrieveIngestionLogsRequest{State: reconcilerpb.State_NO_CHANGES.Enum()},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_NO_CHANGES: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -533,6 +557,7 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
NoChanges: 1,
},
NextPageToken: "AAAFlw==",
Expand All @@ -542,6 +567,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by data type",
in: reconcilerpb.RetrieveIngestionLogsRequest{DataType: "dcim.interface"},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_RECONCILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -592,7 +620,8 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Total: 1,
Reconciled: 1,
},
NextPageToken: "AAAFlw==",
},
Expand All @@ -601,6 +630,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "filter by timestamp",
in: reconcilerpb.RetrieveIngestionLogsRequest{IngestionTsStart: 1725552914392208639},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_RECONCILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -651,7 +683,8 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Total: 1,
Reconciled: 1,
},
NextPageToken: "AAAFlw==",
},
Expand All @@ -660,6 +693,9 @@ func TestRetrieveLogs(t *testing.T) {
{
name: "pagination check",
in: reconcilerpb.RetrieveIngestionLogsRequest{PageToken: "AAAFlg=="},
ingestionLogsPerState: map[reconcilerpb.State]int32{
reconcilerpb.State_RECONCILED: 1,
},
ingestionLogs: []*reconcilerpb.IngestionLog{
{
Id: "2mAT7vZ38H4ttI0i5dBebwJbSnZ",
Expand Down Expand Up @@ -710,7 +746,8 @@ func TestRetrieveLogs(t *testing.T) {
},
},
Metrics: &reconcilerpb.IngestionMetrics{
Total: 1,
Total: 1,
Reconciled: 1,
},
NextPageToken: "AAAFlw==",
},
Expand Down Expand Up @@ -741,6 +778,8 @@ func TestRetrieveLogs(t *testing.T) {
retrieveErr = errors.New("failed to retrieve ingestion logs")
}

mockRepository.On("CountIngestionLogsPerState", ctx).Return(tt.ingestionLogsPerState, nil)

if !tt.hasError {
mockRepository.On("RetrieveIngestionLogs", ctx, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.ingestionLogs, retrieveErr)
}
Expand Down

0 comments on commit bb7fe25

Please sign in to comment.