diff --git a/warehouse/api/grpc.go b/warehouse/api/grpc.go index 50d9e45fed..68c9a72375 100644 --- a/warehouse/api/grpc.go +++ b/warehouse/api/grpc.go @@ -32,6 +32,7 @@ import ( "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" obskit "github.com/rudderlabs/rudder-observability-kit/go/labels" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/controlplane" proto "github.com/rudderlabs/rudder-server/proto/warehouse" @@ -198,7 +199,12 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest offset = 0 } - sourceIDs := g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId] + var sourceIDs []string + if request.SourceId != "" { + sourceIDs = []string{request.SourceId} + } else { + sourceIDs = g.bcManager.SourceIDsByWorkspace()[request.WorkspaceId] + } if len(sourceIDs) == 0 { return &proto.WHUploadsResponse{}, status.Errorf(codes.Code(code.Code_UNAUTHENTICATED), "no sources found for workspace: %v", request.WorkspaceId) @@ -237,7 +243,7 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest } uploads := lo.Map(uploadInfos, func(item model.UploadInfo, index int) *proto.WHUploadResponse { - return &proto.WHUploadResponse{ + ur := &proto.WHUploadResponse{ Id: item.ID, SourceId: item.SourceID, DestinationId: item.DestinationID, @@ -249,12 +255,17 @@ func (g *GRPC) GetWHUploads(ctx context.Context, request *proto.WHUploadsRequest CreatedAt: timestamppb.New(item.CreatedAt), FirstEventAt: timestamppb.New(item.FirstEventAt), LastEventAt: timestamppb.New(item.LastEventAt), - LastExecAt: timestamppb.New(item.LastExecAt), - NextRetryTime: timestamppb.New(item.NextRetryTime), Duration: int32(item.Duration), Tables: []*proto.WHTable{}, IsArchivedUpload: item.IsArchivedUpload, } + if !item.NextRetryTime.IsZero() { + ur.NextRetryTime = timestamppb.New(item.NextRetryTime) + } + if !item.LastExecAt.IsZero() { + ur.LastExecAt = timestamppb.New(item.LastExecAt) + } + return ur }) response := &proto.WHUploadsResponse{Uploads: uploads, Pagination: &proto.Pagination{ @@ -309,19 +320,22 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest) } tables := lo.Map(syncTableInfos, func(item model.TableUploadInfo, index int) *proto.WHTable { - return &proto.WHTable{ - Id: item.ID, - UploadId: item.UploadID, - Name: item.Name, - Status: item.Status, - Error: item.Error, - LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt), - Count: int32(item.Count), - Duration: int32(item.Duration), + wht := &proto.WHTable{ + Id: item.ID, + UploadId: item.UploadID, + Name: item.Name, + Status: item.Status, + Error: item.Error, + Count: int32(item.Count), + Duration: int32(item.Duration), + } + if !item.LastExecAt.IsZero() { + wht.LastExecAt = timestamppb.New(item.LastExecAt) } + return wht }) - return &proto.WHUploadResponse{ + ur := &proto.WHUploadResponse{ Id: syncUploadInfo.ID, SourceId: syncUploadInfo.SourceID, DestinationId: syncUploadInfo.DestinationID, @@ -333,12 +347,17 @@ func (g *GRPC) GetWHUpload(ctx context.Context, request *proto.WHUploadRequest) CreatedAt: timestamppb.New(syncUploadInfo.CreatedAt), FirstEventAt: timestamppb.New(syncUploadInfo.FirstEventAt), LastEventAt: timestamppb.New(syncUploadInfo.LastEventAt), - LastExecAt: timestamppb.New(syncUploadInfo.LastExecAt), - NextRetryTime: timestamppb.New(syncUploadInfo.NextRetryTime), Duration: int32(syncUploadInfo.Duration), Tables: tables, IsArchivedUpload: syncUploadInfo.IsArchivedUpload, - }, nil + } + if !syncUploadInfo.NextRetryTime.IsZero() { + ur.NextRetryTime = timestamppb.New(syncUploadInfo.NextRetryTime) + } + if !syncUploadInfo.LastExecAt.IsZero() { + ur.LastExecAt = timestamppb.New(syncUploadInfo.LastExecAt) + } + return ur, nil } func (g *GRPC) TriggerWHUploads(ctx context.Context, request *proto.WHUploadsRequest) (*proto.TriggerWhUploadsResponse, error) { diff --git a/warehouse/api/grpc_test.go b/warehouse/api/grpc_test.go index 609ffec180..9d9dbed0d8 100644 --- a/warehouse/api/grpc_test.go +++ b/warehouse/api/grpc_test.go @@ -237,6 +237,7 @@ func TestGRPC(t *testing.T) { totalUploads := 100 firstEventAt, lastEventAt := now.Add(-2*time.Hour), now.Add(-1*time.Hour) + lastExecAt := now.Add(-1 * time.Minute) for i := 0; i < totalUploads; i++ { fid, err := repoStaging.Insert(ctx, &model.StagingFileWithSchema{}) @@ -271,6 +272,23 @@ func TestGRPC(t *testing.T) { err = repoTableUploads.Insert(ctx, uploadID, tables) require.NoError(t, err) + + if (i+1)%2 == 0 { + err = repoUpload.Update(ctx, uploadID, []repo.UpdateKeyValue{ + repo.UploadFieldStatus(model.ExportedData), + repo.UploadFieldLastExecAt(lastExecAt), + }) + require.NoError(t, err) + + for _, table := range tables { + err = repoTableUploads.Set(ctx, uploadID, table, repo.TableUploadSetOptions{ + Status: lo.ToPtr(model.TableUploadExported), + LastExecTime: lo.ToPtr(lastExecAt), + TotalEvents: lo.ToPtr(int64(10)), + }) + require.NoError(t, err) + } + } } t.Run("GetWHUpload", func(t *testing.T) { @@ -288,7 +306,7 @@ func TestGRPC(t *testing.T) { require.Equal(t, "upload_id should be greater than 0", statusError.Message()) }) - t.Run("no sources", func(t *testing.T) { + t.Run("unknown workspace", func(t *testing.T) { res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{ UploadId: 1, WorkspaceId: "unknown_workspace_id", @@ -315,7 +333,8 @@ func TestGRPC(t *testing.T) { require.Equal(t, codes.NotFound, statusError.Code()) require.Equal(t, "no sync found for id 1001", statusError.Message()) }) - t.Run("success", func(t *testing.T) { + + t.Run("success (waiting)", func(t *testing.T) { res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{ UploadId: 1, WorkspaceId: workspaceID, @@ -333,7 +352,8 @@ func TestGRPC(t *testing.T) { require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC()) require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC()) require.EqualValues(t, now.UTC(), res.GetNextRetryTime().AsTime().UTC()) - require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), res.GetDuration()) + require.Zero(t, res.GetDuration()) + require.Zero(t, res.GetLastExecAt()) require.NotEmpty(t, res.GetTables()) require.False(t, res.GetIsArchivedUpload()) require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string { @@ -344,21 +364,55 @@ func TestGRPC(t *testing.T) { require.EqualValues(t, 1, table.GetUploadId()) require.EqualValues(t, model.TableUploadWaiting, table.GetStatus()) require.EqualValues(t, "{}", table.GetError()) - require.Empty(t, table.GetLastExecAt().AsTime().UTC()) + require.Zero(t, table.GetLastExecAt()) require.Zero(t, table.GetCount()) - require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), table.GetDuration()) + require.Zero(t, table.GetDuration()) + } + }) + + t.Run("success (exported)", func(t *testing.T) { + res, err := grpcClient.GetWHUpload(ctx, &proto.WHUploadRequest{ + UploadId: 2, + WorkspaceId: workspaceID, + }) + require.NoError(t, err) + require.NotNil(t, res) + + require.EqualValues(t, sourceID, res.GetSourceId()) + require.EqualValues(t, destinationID, res.GetDestinationId()) + require.EqualValues(t, destinationType, res.GetDestinationType()) + require.Equal(t, "{}", res.GetError()) + require.Zero(t, res.GetAttempt()) + require.EqualValues(t, model.ExportedData, res.GetStatus()) + require.EqualValues(t, now.UTC(), res.GetCreatedAt().AsTime().UTC()) + require.EqualValues(t, firstEventAt.UTC(), res.GetFirstEventAt().AsTime().UTC()) + require.EqualValues(t, lastEventAt.UTC(), res.GetLastEventAt().AsTime().UTC()) + require.Zero(t, res.GetNextRetryTime()) + require.EqualValues(t, now.Sub(lastExecAt)/time.Second, res.GetDuration()) + require.EqualValues(t, lastExecAt, res.GetLastExecAt().AsTime().UTC()) + require.NotEmpty(t, res.GetTables()) + require.False(t, res.GetIsArchivedUpload()) + require.EqualValues(t, tables, lo.Map(res.GetTables(), func(item *proto.WHTable, index int) string { + return item.GetName() + })) + + for _, table := range res.GetTables() { + require.EqualValues(t, 2, table.GetUploadId()) + require.EqualValues(t, model.TableUploadExported, table.GetStatus()) + require.EqualValues(t, "{}", table.GetError()) + require.EqualValues(t, lastExecAt, table.GetLastExecAt().AsTime().UTC()) + require.EqualValues(t, 10, table.GetCount()) + require.EqualValues(t, now.Sub(lastExecAt)/time.Second, table.GetDuration()) } }) }) t.Run("GetWHUploads", func(t *testing.T) { - t.Run("no sources", func(t *testing.T) { + t.Run("unknown workspace", func(t *testing.T) { res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{ - Limit: 10, - Offset: 0, - WorkspaceId: "unknown_workspace_id", - SourceId: "unknown_source_id", - DestinationId: "unknown_destination_id", + Limit: 10, + Offset: 0, + WorkspaceId: "unknown_workspace_id", }) require.Error(t, err) require.Empty(t, res) @@ -368,6 +422,22 @@ func TestGRPC(t *testing.T) { require.Equal(t, codes.Unauthenticated, statusError.Code()) require.Equal(t, "no sources found for workspace: unknown_workspace_id", statusError.Message()) }) + t.Run("unknown source", func(t *testing.T) { + res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{ + Limit: 10, + Offset: 0, + WorkspaceId: workspaceID, + SourceId: "unknown_source_id", + DestinationId: destinationID, + }) + require.NoError(t, err) + require.NotNil(t, res) + require.Len(t, res.GetUploads(), 0) + require.NotNil(t, res.GetPagination()) + require.EqualValues(t, int64(0), res.GetPagination().GetTotal()) + require.EqualValues(t, int64(10), res.GetPagination().GetLimit()) + require.EqualValues(t, int64(0), res.GetPagination().GetOffset()) + }) t.Run("invalid limit and offset", func(t *testing.T) { res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{ Limit: -1, @@ -382,7 +452,7 @@ func TestGRPC(t *testing.T) { require.EqualValues(t, int64(10), res.GetPagination().GetLimit()) require.EqualValues(t, int64(0), res.GetPagination().GetOffset()) }) - t.Run("success", func(t *testing.T) { + t.Run("success (waiting)", func(t *testing.T) { res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{ Limit: 25, Offset: 6, @@ -390,13 +460,13 @@ func TestGRPC(t *testing.T) { SourceId: sourceID, DestinationId: destinationID, DestinationType: destinationType, - Status: model.Waiting, + Status: "waiting", }) require.NoError(t, err) require.NotNil(t, res) require.Len(t, res.GetUploads(), 25) require.NotNil(t, res.GetPagination()) - require.EqualValues(t, int64(100), res.GetPagination().GetTotal()) + require.EqualValues(t, int64(50), res.GetPagination().GetTotal()) require.EqualValues(t, int64(25), res.GetPagination().GetLimit()) require.EqualValues(t, int64(6), res.GetPagination().GetOffset()) @@ -411,7 +481,43 @@ func TestGRPC(t *testing.T) { require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC()) require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC()) require.EqualValues(t, now.UTC(), upload.GetNextRetryTime().AsTime().UTC()) - require.EqualValues(t, int32(now.Sub(time.Time{})/time.Second), upload.GetDuration()) + require.Zero(t, upload.GetLastExecAt()) + require.Zero(t, upload.GetDuration()) + require.Empty(t, upload.GetTables()) + require.False(t, upload.GetIsArchivedUpload()) + } + }) + t.Run("success (exported)", func(t *testing.T) { + res, err := grpcClient.GetWHUploads(ctx, &proto.WHUploadsRequest{ + Limit: 25, + Offset: 6, + WorkspaceId: workspaceID, + SourceId: sourceID, + DestinationId: destinationID, + DestinationType: destinationType, + Status: "success", + }) + require.NoError(t, err) + require.NotNil(t, res) + require.Len(t, res.GetUploads(), 25) + require.NotNil(t, res.GetPagination()) + require.EqualValues(t, int64(50), res.GetPagination().GetTotal()) + require.EqualValues(t, int64(25), res.GetPagination().GetLimit()) + require.EqualValues(t, int64(6), res.GetPagination().GetOffset()) + + for _, upload := range res.GetUploads() { + require.EqualValues(t, sourceID, upload.GetSourceId()) + require.EqualValues(t, destinationID, upload.GetDestinationId()) + require.EqualValues(t, destinationType, upload.GetDestinationType()) + require.Equal(t, "{}", upload.GetError()) + require.Zero(t, upload.GetAttempt()) + require.EqualValues(t, model.ExportedData, upload.GetStatus()) + require.EqualValues(t, now.UTC(), upload.GetCreatedAt().AsTime().UTC()) + require.EqualValues(t, firstEventAt.UTC(), upload.GetFirstEventAt().AsTime().UTC()) + require.EqualValues(t, lastEventAt.UTC(), upload.GetLastEventAt().AsTime().UTC()) + require.Zero(t, upload.GetNextRetryTime()) + require.EqualValues(t, now.Sub(lastExecAt)/time.Second, upload.GetDuration()) + require.EqualValues(t, lastExecAt, upload.GetLastExecAt().AsTime().UTC()) require.Empty(t, upload.GetTables()) require.False(t, upload.GetIsArchivedUpload()) } diff --git a/warehouse/internal/repo/table_upload.go b/warehouse/internal/repo/table_upload.go index bdf43f01d4..31c049c38d 100644 --- a/warehouse/internal/repo/table_upload.go +++ b/warehouse/internal/repo/table_upload.go @@ -381,7 +381,7 @@ func (tu *TableUploads) SyncsInfo(ctx context.Context, uploadID int64) ([]model. } tableUploadInfos := lo.Map(tableUploads, func(item model.TableUpload, index int) model.TableUploadInfo { - return model.TableUploadInfo{ + tuf := model.TableUploadInfo{ ID: item.ID, UploadID: item.UploadID, Name: item.TableName, @@ -389,8 +389,11 @@ func (tu *TableUploads) SyncsInfo(ctx context.Context, uploadID int64) ([]model. Error: item.Error, LastExecAt: item.LastExecTime, Count: item.TotalEvents, - Duration: int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second), } + if !item.LastExecTime.IsZero() { + tuf.Duration = int64(item.UpdatedAt.Sub(item.LastExecTime) / time.Second) + } + return tuf }) return tableUploadInfos, nil } diff --git a/warehouse/internal/repo/table_upload_test.go b/warehouse/internal/repo/table_upload_test.go index d1dfa1e04f..2e9fe79c8a 100644 --- a/warehouse/internal/repo/table_upload_test.go +++ b/warehouse/internal/repo/table_upload_test.go @@ -111,6 +111,21 @@ func TestTableUploadRepo(t *testing.T) { } }) + t.Run("last exec time is nil", func(t *testing.T) { + uploadID := int64(2) + + err := r.Insert(ctx, uploadID, tables) + require.NoError(t, err) + + syncsInfos, err := r.SyncsInfo(ctx, uploadID) + require.NoError(t, err) + require.Len(t, syncsInfos, len(tables)) + for i := range syncsInfos { + require.Zero(t, syncsInfos[i].LastExecAt) + require.Zero(t, syncsInfos[i].Duration) + } + }) + t.Run("invalid sync id", func(t *testing.T) { syncsInfos, err := r.SyncsInfo(ctx, int64(-1)) require.NoError(t, err) diff --git a/warehouse/internal/repo/upload.go b/warehouse/internal/repo/upload.go index 6efa81a338..c840146f11 100644 --- a/warehouse/internal/repo/upload.go +++ b/warehouse/internal/repo/upload.go @@ -846,13 +846,14 @@ func (u *Uploads) syncsInfo(ctx context.Context, limit, offset int, opts model.S uploadInfo.NextRetryTime = nextRetryTime } } - - // set duration as time between updatedAt and lastExec recorded timings for ongoing/retrying uploads - // set diff between lastExec and current time - if uploadInfo.Status == model.ExportedData || uploadInfo.Status == model.Aborted { - uploadInfo.Duration = uploadInfo.UpdatedAt.Sub(uploadInfo.LastExecAt) / time.Second - } else { - uploadInfo.Duration = u.now().Sub(uploadInfo.LastExecAt) / time.Second + if lastExecAt.Valid { + // set duration as time between updatedAt and lastExec recorded timings for ongoing/retrying uploads + // set diff between lastExec and current time + if uploadInfo.Status == model.ExportedData || uploadInfo.Status == model.Aborted { + uploadInfo.Duration = uploadInfo.UpdatedAt.Sub(lastExecAt.Time) / time.Second + } else { + uploadInfo.Duration = u.now().Sub(lastExecAt.Time) / time.Second + } } // set error only for failed uploads. skip for retried and then successful uploads @@ -1363,18 +1364,18 @@ func (u *Uploads) GetFirstAbortedUploadInContinuousAbortsByDestination(ctx conte stmt := fmt.Sprintf(` WITH wh_uploads_with_last_successful_upload AS ( - SELECT + SELECT %[1]s, status, MAX(CASE WHEN status = 'exported_data' THEN created_at ELSE NULL END) OVER (PARTITION BY destination_id) AS last_successful_upload - FROM + FROM `+uploadsTableName+` WHERE workspace_id = $1 AND created_at >= $2 ) SELECT %[1]s FROM ( - SELECT + SELECT *, ROW_NUMBER() OVER (PARTITION BY destination_id ORDER BY created_at) AS row_number FROM wh_uploads_with_last_successful_upload diff --git a/warehouse/internal/repo/upload_test.go b/warehouse/internal/repo/upload_test.go index 1cc5dd5d83..7d1f1a1077 100644 --- a/warehouse/internal/repo/upload_test.go +++ b/warehouse/internal/repo/upload_test.go @@ -1563,7 +1563,7 @@ func TestUploads_SyncsInfo(t *testing.T) { require.Equal(t, uploadInfo.Error, "{}") require.Zero(t, uploadInfo.LastExecAt) require.Equal(t, uploadInfo.NextRetryTime.UTC(), now.UTC()) - require.Equal(t, uploadInfo.Duration, now.Sub(time.Time{})/time.Second) + require.Zero(t, uploadInfo.Duration) require.Zero(t, uploadInfo.Attempt) require.False(t, uploadInfo.IsArchivedUpload) } @@ -1590,7 +1590,7 @@ func TestUploads_SyncsInfo(t *testing.T) { require.Equal(t, uploadInfo.Error, "{}") require.Zero(t, uploadInfo.LastExecAt) require.Zero(t, uploadInfo.NextRetryTime) - require.Equal(t, uploadInfo.Duration, now.Sub(time.Time{})/time.Second) + require.Zero(t, uploadInfo.Duration) require.Zero(t, uploadInfo.Attempt) require.True(t, uploadInfo.IsArchivedUpload) }