Skip to content

Commit

Permalink
fix: syncs dashboard (#4937)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored Jul 29, 2024
1 parent 2a654cf commit d22720e
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 46 deletions.
53 changes: 36 additions & 17 deletions warehouse/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/controlplane"
proto "github.com/rudderlabs/rudder-server/proto/warehouse"
Expand Down Expand Up @@ -198,7 +199,12 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
offset = 0
}

sourceIDs := g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId]
var sourceIDs []string
if request.SourceId != "" {
sourceIDs = []string{request.SourceId}
} else {
sourceIDs = g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId]
}
if len(sourceIDs) == 0 {
return &proto.WHUploadsResponse{},
status.Errorf(codes.Code(code.Code_UNAUTHENTICATED), "no sources found for workspace: %v", request.WorkspaceId)
Expand Down Expand Up @@ -237,7 +243,7 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
}

uploads := lo.Map(uploadInfos, func(item model.UploadInfo, index int) *proto.WHUploadResponse {
return &proto.WHUploadResponse{
ur := &proto.WHUploadResponse{
Id: item.ID,
SourceId: item.SourceID,
DestinationId: item.DestinationID,
Expand All @@ -249,12 +255,17 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
CreatedAt: timestamppb.New(item.CreatedAt),
FirstEventAt: timestamppb.New(item.FirstEventAt),
LastEventAt: timestamppb.New(item.LastEventAt),
LastExecAt: timestamppb.New(item.LastExecAt),
NextRetryTime: timestamppb.New(item.NextRetryTime),
Duration: int32(item.Duration),
Tables: []*proto.WHTable{},
IsArchivedUpload: item.IsArchivedUpload,
}
if !item.NextRetryTime.IsZero() {
ur.NextRetryTime = timestamppb.New(item.NextRetryTime)
}
if !item.LastExecAt.IsZero() {
ur.LastExecAt = timestamppb.New(item.LastExecAt)
}
return ur
})

response := &proto.WHUploadsResponse{Uploads: uploads, Pagination: &proto.Pagination{
Expand Down Expand Up @@ -309,19 +320,22 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest)
}

tables := lo.Map(syncTableInfos, func(item model.TableUploadInfo, index int) *proto.WHTable {
return &proto.WHTable{
Id: item.ID,
UploadId: item.UploadID,
Name: item.Name,
Status: item.Status,
Error: item.Error,
LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt),
Count: int32(item.Count),
Duration: int32(item.Duration),
wht := &proto.WHTable{
Id: item.ID,
UploadId: item.UploadID,
Name: item.Name,
Status: item.Status,
Error: item.Error,
Count: int32(item.Count),
Duration: int32(item.Duration),
}
if !item.LastExecAt.IsZero() {
wht.LastExecAt = timestamppb.New(item.LastExecAt)
}
return wht
})

return &proto.WHUploadResponse{
ur := &proto.WHUploadResponse{
Id: syncUploadInfo.ID,
SourceId: syncUploadInfo.SourceID,
DestinationId: syncUploadInfo.DestinationID,
Expand All @@ -333,12 +347,17 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest)
CreatedAt: timestamppb.New(syncUploadInfo.CreatedAt),
FirstEventAt: timestamppb.New(syncUploadInfo.FirstEventAt),
LastEventAt: timestamppb.New(syncUploadInfo.LastEventAt),
LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt),
NextRetryTime: timestamppb.New(syncUploadInfo.NextRetryTime),
Duration: int32(syncUploadInfo.Duration),
Tables: tables,
IsArchivedUpload: syncUploadInfo.IsArchivedUpload,
}, nil
}
if !syncUploadInfo.NextRetryTime.IsZero() {
ur.NextRetryTime = timestamppb.New(syncUploadInfo.NextRetryTime)
}
if !syncUploadInfo.LastExecAt.IsZero() {
ur.LastExecAt = timestamppb.New(syncUploadInfo.LastExecAt)
}
return ur, nil
}

func (g *GRPC) TriggerWHUploads(ctx context.Context, request *proto.WHUploadsRequest) (*proto.TriggerWhUploadsResponse, error) {
Expand Down
136 changes: 121 additions & 15 deletions warehouse/api/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestGRPC(t *testing.T) {

totalUploads := 100
firstEventAt, lastEventAt := now.Add(-2*time.Hour), now.Add(-1*time.Hour)
lastExecAt := now.Add(-1 * time.Minute)

for i := 0; i < totalUploads; i++ {
fid, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
Expand Down Expand Up @@ -271,6 +272,23 @@ func TestGRPC(t *testing.T) {

err = repoTableUploads.Insert(ctx, uploadID, tables)
require.NoError(t, err)

if (i+1)%2 == 0 {
err = repoUpload.Update(ctx, uploadID, []repo.UpdateKeyValue{
repo.UploadFieldStatus(model.ExportedData),
repo.UploadFieldLastExecAt(lastExecAt),
})
require.NoError(t, err)

for _, table := range tables {
err = repoTableUploads.Set(ctx, uploadID, table, repo.TableUploadSetOptions{
Status: lo.ToPtr(model.TableUploadExported),
LastExecTime: lo.ToPtr(lastExecAt),
TotalEvents: lo.ToPtr(int64(10)),
})
require.NoError(t, err)
}
}
}

t.Run("GetWHUpload", func(t *testing.T) {
Expand All @@ -288,7 +306,7 @@ func TestGRPC(t *testing.T) {
require.Equal(t, "upload_id should be greater than 0", statusError.Message())
})

t.Run("no sources", func(t *testing.T) {
t.Run("unknown workspace", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 1,
WorkspaceId: "unknown_workspace_id",
Expand All @@ -315,7 +333,8 @@ func TestGRPC(t *testing.T) {
require.Equal(t, codes.NotFound, statusError.Code())
require.Equal(t, "no sync found for id 1001", statusError.Message())
})
t.Run("success", func(t *testing.T) {

t.Run("success (waiting)", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 1,
WorkspaceId: workspaceID,
Expand All @@ -333,7 +352,8 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC())
require.EqualValues(t, now.UTC(), res.GetNextRetryTime().AsTime().UTC())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), res.GetDuration())
require.Zero(t, res.GetDuration())
require.Zero(t, res.GetLastExecAt())
require.NotEmpty(t, res.GetTables())
require.False(t, res.GetIsArchivedUpload())
require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string {
Expand All @@ -344,21 +364,55 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, 1, table.GetUploadId())
require.EqualValues(t, model.TableUploadWaiting, table.GetStatus())
require.EqualValues(t, "{}", table.GetError())
require.Empty(t, table.GetLastExecAt().AsTime().UTC())
require.Zero(t, table.GetLastExecAt())
require.Zero(t, table.GetCount())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), table.GetDuration())
require.Zero(t, table.GetDuration())
}
})

t.Run("success (exported)", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 2,
WorkspaceId: workspaceID,
})
require.NoError(t, err)
require.NotNil(t, res)

require.EqualValues(t, sourceID, res.GetSourceId())
require.EqualValues(t, destinationID, res.GetDestinationId())
require.EqualValues(t, destinationType, res.GetDestinationType())
require.Equal(t, "{}", res.GetError())
require.Zero(t, res.GetAttempt())
require.EqualValues(t, model.ExportedData, res.GetStatus())
require.EqualValues(t, now.UTC(), res.GetCreatedAt().AsTime().UTC())
require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC())
require.Zero(t, res.GetNextRetryTime())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, res.GetDuration())
require.EqualValues(t, lastExecAt, res.GetLastExecAt().AsTime().UTC())
require.NotEmpty(t, res.GetTables())
require.False(t, res.GetIsArchivedUpload())
require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string {
return item.GetName()
}))

for _, table := range res.GetTables() {
require.EqualValues(t, 2, table.GetUploadId())
require.EqualValues(t, model.TableUploadExported, table.GetStatus())
require.EqualValues(t, "{}", table.GetError())
require.EqualValues(t, lastExecAt, table.GetLastExecAt().AsTime().UTC())
require.EqualValues(t, 10, table.GetCount())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, table.GetDuration())
}
})
})

t.Run("GetWHUploads", func(t *testing.T) {
t.Run("no sources", func(t *testing.T) {
t.Run("unknown workspace", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 10,
Offset: 0,
WorkspaceId: "unknown_workspace_id",
SourceId: "unknown_source_id",
DestinationId: "unknown_destination_id",
Limit: 10,
Offset: 0,
WorkspaceId: "unknown_workspace_id",
})
require.Error(t, err)
require.Empty(t, res)
Expand All @@ -368,6 +422,22 @@ func TestGRPC(t *testing.T) {
require.Equal(t, codes.Unauthenticated, statusError.Code())
require.Equal(t, "no sources found for workspace: unknown_workspace_id", statusError.Message())
})
t.Run("unknown source", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 10,
Offset: 0,
WorkspaceId: workspaceID,
SourceId: "unknown_source_id",
DestinationId: destinationID,
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 0)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(0), res.GetPagination().GetTotal())
require.EqualValues(t, int64(10), res.GetPagination().GetLimit())
require.EqualValues(t, int64(0), res.GetPagination().GetOffset())
})
t.Run("invalid limit and offset", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: -1,
Expand All @@ -382,21 +452,21 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, int64(10), res.GetPagination().GetLimit())
require.EqualValues(t, int64(0), res.GetPagination().GetOffset())
})
t.Run("success", func(t *testing.T) {
t.Run("success (waiting)", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 25,
Offset: 6,
WorkspaceId: workspaceID,
SourceId: sourceID,
DestinationId: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
Status: "waiting",
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 25)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(100), res.GetPagination().GetTotal())
require.EqualValues(t, int64(50), res.GetPagination().GetTotal())
require.EqualValues(t, int64(25), res.GetPagination().GetLimit())
require.EqualValues(t, int64(6), res.GetPagination().GetOffset())

Expand All @@ -411,7 +481,43 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC())
require.EqualValues(t, now.UTC(), upload.GetNextRetryTime().AsTime().UTC())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), upload.GetDuration())
require.Zero(t, upload.GetLastExecAt())
require.Zero(t, upload.GetDuration())
require.Empty(t, upload.GetTables())
require.False(t, upload.GetIsArchivedUpload())
}
})
t.Run("success (exported)", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 25,
Offset: 6,
WorkspaceId: workspaceID,
SourceId: sourceID,
DestinationId: destinationID,
DestinationType: destinationType,
Status: "success",
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 25)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(50), res.GetPagination().GetTotal())
require.EqualValues(t, int64(25), res.GetPagination().GetLimit())
require.EqualValues(t, int64(6), res.GetPagination().GetOffset())

for _, upload := range res.GetUploads() {
require.EqualValues(t, sourceID, upload.GetSourceId())
require.EqualValues(t, destinationID, upload.GetDestinationId())
require.EqualValues(t, destinationType, upload.GetDestinationType())
require.Equal(t, "{}", upload.GetError())
require.Zero(t, upload.GetAttempt())
require.EqualValues(t, model.ExportedData, upload.GetStatus())
require.EqualValues(t, now.UTC(), upload.GetCreatedAt().AsTime().UTC())
require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC())
require.Zero(t, upload.GetNextRetryTime())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, upload.GetDuration())
require.EqualValues(t, lastExecAt, upload.GetLastExecAt().AsTime().UTC())
require.Empty(t, upload.GetTables())
require.False(t, upload.GetIsArchivedUpload())
}
Expand Down
7 changes: 5 additions & 2 deletions warehouse/internal/repo/table_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,19 @@ func (tu *TableUploads) SyncsInfo(ctx context.Context, uploadID int64) ([]model.
}

tableUploadInfos := lo.Map(tableUploads, func(item model.TableUpload, index int) model.TableUploadInfo {
return model.TableUploadInfo{
tuf := model.TableUploadInfo{
ID: item.ID,
UploadID: item.UploadID,
Name: item.TableName,
Status: item.Status,
Error: item.Error,
LastExecAt: item.LastExecTime,
Count: item.TotalEvents,
Duration: int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second),
}
if !item.LastExecTime.IsZero() {
tuf.Duration = int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second)
}
return tuf
})
return tableUploadInfos, nil
}
Expand Down
15 changes: 15 additions & 0 deletions warehouse/internal/repo/table_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ func TestTableUploadRepo(t *testing.T) {
}
})

t.Run("last exec time is nil", func(t *testing.T) {
uploadID := int64(2)

err := r.Insert(ctx, uploadID, tables)
require.NoError(t, err)

syncsInfos, err := r.SyncsInfo(ctx, uploadID)
require.NoError(t, err)
require.Len(t, syncsInfos, len(tables))
for i := range syncsInfos {
require.Zero(t, syncsInfos[i].LastExecAt)
require.Zero(t, syncsInfos[i].Duration)
}
})

t.Run("invalid sync id", func(t *testing.T) {
syncsInfos, err := r.SyncsInfo(ctx, int64(-1))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit d22720e

Please sign in to comment.