Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: syncs dashboard #4937

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 36 additions & 17 deletions warehouse/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/filemanager"
"github.com/rudderlabs/rudder-go-kit/logger"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/controlplane"
proto "github.com/rudderlabs/rudder-server/proto/warehouse"
Expand Down Expand Up @@ -198,7 +199,12 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
offset = 0
}

sourceIDs := g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId]
var sourceIDs []string
if request.SourceId != "" {
sourceIDs = []string{request.SourceId}
} else {
sourceIDs = g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId]
}
if len(sourceIDs) == 0 {
return &proto.WHUploadsResponse{},
status.Errorf(codes.Code(code.Code_UNAUTHENTICATED), "no sources found for workspace: %v", request.WorkspaceId)
Expand Down Expand Up @@ -237,7 +243,7 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
}

uploads := lo.Map(uploadInfos, func(item model.UploadInfo, index int) *proto.WHUploadResponse {
return &proto.WHUploadResponse{
ur := &proto.WHUploadResponse{
Id: item.ID,
SourceId: item.SourceID,
DestinationId: item.DestinationID,
Expand All @@ -249,12 +255,17 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest
CreatedAt: timestamppb.New(item.CreatedAt),
FirstEventAt: timestamppb.New(item.FirstEventAt),
LastEventAt: timestamppb.New(item.LastEventAt),
LastExecAt: timestamppb.New(item.LastExecAt),
NextRetryTime: timestamppb.New(item.NextRetryTime),
Duration: int32(item.Duration),
Tables: []*proto.WHTable{},
IsArchivedUpload: item.IsArchivedUpload,
}
if !item.NextRetryTime.IsZero() {
ur.NextRetryTime = timestamppb.New(item.NextRetryTime)
}
if !item.LastExecAt.IsZero() {
ur.LastExecAt = timestamppb.New(item.LastExecAt)
}
return ur
})

response := &proto.WHUploadsResponse{Uploads: uploads, Pagination: &proto.Pagination{
Expand Down Expand Up @@ -309,19 +320,22 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest)
}

tables := lo.Map(syncTableInfos, func(item model.TableUploadInfo, index int) *proto.WHTable {
return &proto.WHTable{
Id: item.ID,
UploadId: item.UploadID,
Name: item.Name,
Status: item.Status,
Error: item.Error,
LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt),
Count: int32(item.Count),
Duration: int32(item.Duration),
wht := &proto.WHTable{
Id: item.ID,
UploadId: item.UploadID,
Name: item.Name,
Status: item.Status,
Error: item.Error,
Count: int32(item.Count),
Duration: int32(item.Duration),
}
if !item.LastExecAt.IsZero() {
wht.LastExecAt = timestamppb.New(item.LastExecAt)
}
return wht
})

return &proto.WHUploadResponse{
ur := &proto.WHUploadResponse{
Id: syncUploadInfo.ID,
SourceId: syncUploadInfo.SourceID,
DestinationId: syncUploadInfo.DestinationID,
Expand All @@ -333,12 +347,17 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest)
CreatedAt: timestamppb.New(syncUploadInfo.CreatedAt),
FirstEventAt: timestamppb.New(syncUploadInfo.FirstEventAt),
LastEventAt: timestamppb.New(syncUploadInfo.LastEventAt),
LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt),
NextRetryTime: timestamppb.New(syncUploadInfo.NextRetryTime),
Duration: int32(syncUploadInfo.Duration),
Tables: tables,
IsArchivedUpload: syncUploadInfo.IsArchivedUpload,
}, nil
}
if !syncUploadInfo.NextRetryTime.IsZero() {
ur.NextRetryTime = timestamppb.New(syncUploadInfo.NextRetryTime)
}
if !syncUploadInfo.LastExecAt.IsZero() {
ur.LastExecAt = timestamppb.New(syncUploadInfo.LastExecAt)
}
return ur, nil
}

func (g *GRPC) TriggerWHUploads(ctx context.Context, request *proto.WHUploadsRequest) (*proto.TriggerWhUploadsResponse, error) {
Expand Down
136 changes: 121 additions & 15 deletions warehouse/api/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func TestGRPC(t *testing.T) {

totalUploads := 100
firstEventAt, lastEventAt := now.Add(-2*time.Hour), now.Add(-1*time.Hour)
lastExecAt := now.Add(-1 * time.Minute)

for i := 0; i < totalUploads; i++ {
fid, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{})
Expand Down Expand Up @@ -271,6 +272,23 @@ func TestGRPC(t *testing.T) {

err = repoTableUploads.Insert(ctx, uploadID, tables)
require.NoError(t, err)

if (i+1)%2 == 0 {
err = repoUpload.Update(ctx, uploadID, []repo.UpdateKeyValue{
repo.UploadFieldStatus(model.ExportedData),
repo.UploadFieldLastExecAt(lastExecAt),
})
require.NoError(t, err)

for _, table := range tables {
err = repoTableUploads.Set(ctx, uploadID, table, repo.TableUploadSetOptions{
Status: lo.ToPtr(model.TableUploadExported),
LastExecTime: lo.ToPtr(lastExecAt),
TotalEvents: lo.ToPtr(int64(10)),
})
require.NoError(t, err)
}
}
}

t.Run("GetWHUpload", func(t *testing.T) {
Expand All @@ -288,7 +306,7 @@ func TestGRPC(t *testing.T) {
require.Equal(t, "upload_id should be greater than 0", statusError.Message())
})

t.Run("no sources", func(t *testing.T) {
t.Run("unknown workspace", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 1,
WorkspaceId: "unknown_workspace_id",
Expand All @@ -315,7 +333,8 @@ func TestGRPC(t *testing.T) {
require.Equal(t, codes.NotFound, statusError.Code())
require.Equal(t, "no sync found for id 1001", statusError.Message())
})
t.Run("success", func(t *testing.T) {

t.Run("success (waiting)", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 1,
WorkspaceId: workspaceID,
Expand All @@ -333,7 +352,8 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC())
require.EqualValues(t, now.UTC(), res.GetNextRetryTime().AsTime().UTC())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), res.GetDuration())
require.Zero(t, res.GetDuration())
require.Zero(t, res.GetLastExecAt())
require.NotEmpty(t, res.GetTables())
require.False(t, res.GetIsArchivedUpload())
require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string {
Expand All @@ -344,21 +364,55 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, 1, table.GetUploadId())
require.EqualValues(t, model.TableUploadWaiting, table.GetStatus())
require.EqualValues(t, "{}", table.GetError())
require.Empty(t, table.GetLastExecAt().AsTime().UTC())
require.Zero(t, table.GetLastExecAt())
require.Zero(t, table.GetCount())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), table.GetDuration())
require.Zero(t, table.GetDuration())
}
})

t.Run("success (exported)", func(t *testing.T) {
res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{
UploadId: 2,
WorkspaceId: workspaceID,
})
require.NoError(t, err)
require.NotNil(t, res)

require.EqualValues(t, sourceID, res.GetSourceId())
require.EqualValues(t, destinationID, res.GetDestinationId())
require.EqualValues(t, destinationType, res.GetDestinationType())
require.Equal(t, "{}", res.GetError())
require.Zero(t, res.GetAttempt())
require.EqualValues(t, model.ExportedData, res.GetStatus())
require.EqualValues(t, now.UTC(), res.GetCreatedAt().AsTime().UTC())
require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC())
require.Zero(t, res.GetNextRetryTime())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, res.GetDuration())
require.EqualValues(t, lastExecAt, res.GetLastExecAt().AsTime().UTC())
require.NotEmpty(t, res.GetTables())
require.False(t, res.GetIsArchivedUpload())
require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string {
return item.GetName()
}))

for _, table := range res.GetTables() {
require.EqualValues(t, 2, table.GetUploadId())
require.EqualValues(t, model.TableUploadExported, table.GetStatus())
require.EqualValues(t, "{}", table.GetError())
require.EqualValues(t, lastExecAt, table.GetLastExecAt().AsTime().UTC())
require.EqualValues(t, 10, table.GetCount())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, table.GetDuration())
}
})
})

t.Run("GetWHUploads", func(t *testing.T) {
t.Run("no sources", func(t *testing.T) {
t.Run("unknown workspace", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 10,
Offset: 0,
WorkspaceId: "unknown_workspace_id",
SourceId: "unknown_source_id",
DestinationId: "unknown_destination_id",
Limit: 10,
Offset: 0,
WorkspaceId: "unknown_workspace_id",
})
require.Error(t, err)
require.Empty(t, res)
Expand All @@ -368,6 +422,22 @@ func TestGRPC(t *testing.T) {
require.Equal(t, codes.Unauthenticated, statusError.Code())
require.Equal(t, "no sources found for workspace: unknown_workspace_id", statusError.Message())
})
t.Run("unknown source", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 10,
Offset: 0,
WorkspaceId: workspaceID,
SourceId: "unknown_source_id",
DestinationId: destinationID,
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 0)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(0), res.GetPagination().GetTotal())
require.EqualValues(t, int64(10), res.GetPagination().GetLimit())
require.EqualValues(t, int64(0), res.GetPagination().GetOffset())
})
t.Run("invalid limit and offset", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: -1,
Expand All @@ -382,21 +452,21 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, int64(10), res.GetPagination().GetLimit())
require.EqualValues(t, int64(0), res.GetPagination().GetOffset())
})
t.Run("success", func(t *testing.T) {
t.Run("success (waiting)", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 25,
Offset: 6,
WorkspaceId: workspaceID,
SourceId: sourceID,
DestinationId: destinationID,
DestinationType: destinationType,
Status: model.Waiting,
Status: "waiting",
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 25)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(100), res.GetPagination().GetTotal())
require.EqualValues(t, int64(50), res.GetPagination().GetTotal())
require.EqualValues(t, int64(25), res.GetPagination().GetLimit())
require.EqualValues(t, int64(6), res.GetPagination().GetOffset())

Expand All @@ -411,7 +481,43 @@ func TestGRPC(t *testing.T) {
require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC())
require.EqualValues(t, now.UTC(), upload.GetNextRetryTime().AsTime().UTC())
require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), upload.GetDuration())
require.Zero(t, upload.GetLastExecAt())
require.Zero(t, upload.GetDuration())
require.Empty(t, upload.GetTables())
require.False(t, upload.GetIsArchivedUpload())
}
})
t.Run("success (exported)", func(t *testing.T) {
res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{
Limit: 25,
Offset: 6,
WorkspaceId: workspaceID,
SourceId: sourceID,
DestinationId: destinationID,
DestinationType: destinationType,
Status: "success",
})
require.NoError(t, err)
require.NotNil(t, res)
require.Len(t, res.GetUploads(), 25)
require.NotNil(t, res.GetPagination())
require.EqualValues(t, int64(50), res.GetPagination().GetTotal())
require.EqualValues(t, int64(25), res.GetPagination().GetLimit())
require.EqualValues(t, int64(6), res.GetPagination().GetOffset())

for _, upload := range res.GetUploads() {
require.EqualValues(t, sourceID, upload.GetSourceId())
require.EqualValues(t, destinationID, upload.GetDestinationId())
require.EqualValues(t, destinationType, upload.GetDestinationType())
require.Equal(t, "{}", upload.GetError())
require.Zero(t, upload.GetAttempt())
require.EqualValues(t, model.ExportedData, upload.GetStatus())
require.EqualValues(t, now.UTC(), upload.GetCreatedAt().AsTime().UTC())
require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC())
require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC())
require.Zero(t, upload.GetNextRetryTime())
require.EqualValues(t, now.Sub(lastExecAt)/time.Second, upload.GetDuration())
require.EqualValues(t, lastExecAt, upload.GetLastExecAt().AsTime().UTC())
require.Empty(t, upload.GetTables())
require.False(t, upload.GetIsArchivedUpload())
}
Expand Down
7 changes: 5 additions & 2 deletions warehouse/internal/repo/table_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,19 @@ func (tu *TableUploads) SyncsInfo(ctx context.Context, uploadID int64) ([]model.
}

tableUploadInfos := lo.Map(tableUploads, func(item model.TableUpload, index int) model.TableUploadInfo {
return model.TableUploadInfo{
tuf := model.TableUploadInfo{
ID: item.ID,
UploadID: item.UploadID,
Name: item.TableName,
Status: item.Status,
Error: item.Error,
LastExecAt: item.LastExecTime,
Count: item.TotalEvents,
Duration: int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second),
}
if !item.LastExecTime.IsZero() {
tuf.Duration = int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second)
}
return tuf
})
return tableUploadInfos, nil
}
Expand Down
15 changes: 15 additions & 0 deletions warehouse/internal/repo/table_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,21 @@ func TestTableUploadRepo(t *testing.T) {
}
})

t.Run("last exec time is nil", func(t *testing.T) {
uploadID := int64(2)

err := r.Insert(ctx, uploadID, tables)
require.NoError(t, err)

syncsInfos, err := r.SyncsInfo(ctx, uploadID)
require.NoError(t, err)
require.Len(t, syncsInfos, len(tables))
for i := range syncsInfos {
require.Zero(t, syncsInfos[i].LastExecAt)
require.Zero(t, syncsInfos[i].Duration)
}
})

t.Run("invalid sync id", func(t *testing.T) {
syncsInfos, err := r.SyncsInfo(ctx, int64(-1))
require.NoError(t, err)
Expand Down
Loading