Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: state for uploads #4057

Merged
merged 8 commits into from
Nov 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion warehouse/router/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@
return
}

_ = job.setUploadStatus(UploadStatusOpts{Status: getInProgressState(model.ExportedData)})
_ = job.setUploadStatus(UploadStatusOpts{Status: inProgressState(model.ExportedData)})

Check warning on line 451 in warehouse/router/identities.go

View check run for this annotation

Codecov / codecov/patch

warehouse/router/identities.go#L451

Added line #L451 was not covered by tests
loadErrors, err := job.loadIdentityTables(true)
if err != nil {
r.logger.Errorf(`[WH]: Identity table upload errors: %v`, err)
Expand Down
103 changes: 103 additions & 0 deletions warehouse/router/state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package router

import (
"fmt"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
)

type state struct {
inProgress string
failed string
completed string

nextState *state
}

var stateTransitions map[string]*state

func init() {
stateTransitions = make(map[string]*state, 8)

waitingState := &state{
completed: model.Waiting,
}
stateTransitions[model.Waiting] = waitingState

generateUploadSchemaState := &state{
inProgress: "generating_upload_schema",
failed: "generating_upload_schema_failed",
completed: model.GeneratedUploadSchema,
}
stateTransitions[model.GeneratedUploadSchema] = generateUploadSchemaState

createTableUploadsState := &state{
inProgress: "creating_table_uploads",
failed: "creating_table_uploads_failed",
completed: model.CreatedTableUploads,
}
stateTransitions[model.CreatedTableUploads] = createTableUploadsState

generateLoadFilesState := &state{
inProgress: "generating_load_files",
failed: "generating_load_files_failed",
completed: model.GeneratedLoadFiles,
}
stateTransitions[model.GeneratedLoadFiles] = generateLoadFilesState

updateTableUploadCountsState := &state{
inProgress: "updating_table_uploads_counts",
failed: "updating_table_uploads_counts_failed",
completed: model.UpdatedTableUploadsCounts,
}
stateTransitions[model.UpdatedTableUploadsCounts] = updateTableUploadCountsState

createRemoteSchemaState := &state{
inProgress: "creating_remote_schema",
failed: "creating_remote_schema_failed",
completed: model.CreatedRemoteSchema,
}
stateTransitions[model.CreatedRemoteSchema] = createRemoteSchemaState

exportDataState := &state{
inProgress: "exporting_data",
failed: "exporting_data_failed",
completed: model.ExportedData,
}
stateTransitions[model.ExportedData] = exportDataState

abortState := &state{
completed: model.Aborted,
}
stateTransitions[model.Aborted] = abortState

waitingState.nextState = generateUploadSchemaState
generateUploadSchemaState.nextState = createTableUploadsState
createTableUploadsState.nextState = generateLoadFilesState
generateLoadFilesState.nextState = updateTableUploadCountsState
updateTableUploadCountsState.nextState = createRemoteSchemaState
createRemoteSchemaState.nextState = exportDataState
exportDataState.nextState = nil
abortState.nextState = nil
}

func inProgressState(currentState string) string {
uploadState, ok := stateTransitions[currentState]
if !ok {
panic(fmt.Errorf("invalid state: %s", currentState))
}
return uploadState.inProgress
}

func nextState(currentState string) *state {
if _, ok := stateTransitions[currentState]; ok {
return stateTransitions[currentState].nextState
}

for _, uploadState := range stateTransitions {
if currentState == uploadState.inProgress || currentState == uploadState.failed {
return uploadState
}
}
return nil
}
73 changes: 73 additions & 0 deletions warehouse/router/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package router

import (
"testing"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"

"github.com/stretchr/testify/require"
)

func TestState(t *testing.T) {
t.Run("inProgressState", func(t *testing.T) {
t.Run("invalid state", func(t *testing.T) {
require.Panics(t, func() {
inProgressState("unknown")
})
})
t.Run("valid state", func(t *testing.T) {
testcases := []struct {
current string
inProgressState string
}{
{current: model.GeneratedUploadSchema, inProgressState: "generating_upload_schema"},
{current: model.CreatedTableUploads, inProgressState: "creating_table_uploads"},
{current: model.GeneratedLoadFiles, inProgressState: "generating_load_files"},
{current: model.UpdatedTableUploadsCounts, inProgressState: "updating_table_uploads_counts"},
{current: model.CreatedRemoteSchema, inProgressState: "creating_remote_schema"},
{current: model.ExportedData, inProgressState: "exporting_data"},
}

for index, tc := range testcases {
require.Equal(t, tc.inProgressState, inProgressState(tc.current), "test case %d", index)
}
})
})
t.Run("nextState", func(t *testing.T) {
testCases := []struct {
current string
next *state
}{
{current: "unknown", next: nil},

// completed states
{current: model.Waiting, next: stateTransitions[model.GeneratedUploadSchema]},
{current: model.GeneratedUploadSchema, next: stateTransitions[model.CreatedTableUploads]},
{current: model.CreatedTableUploads, next: stateTransitions[model.GeneratedLoadFiles]},
{current: model.GeneratedLoadFiles, next: stateTransitions[model.UpdatedTableUploadsCounts]},
{current: model.UpdatedTableUploadsCounts, next: stateTransitions[model.CreatedRemoteSchema]},
{current: model.CreatedRemoteSchema, next: stateTransitions[model.ExportedData]},
{current: model.ExportedData, next: nil},
{current: model.Aborted, next: nil},

// in progress states
{current: "generating_upload_schema", next: stateTransitions[model.GeneratedUploadSchema]},
{current: "creating_table_uploads", next: stateTransitions[model.CreatedTableUploads]},
{current: "generating_load_files", next: stateTransitions[model.GeneratedLoadFiles]},
{current: "updating_table_uploads_counts", next: stateTransitions[model.UpdatedTableUploadsCounts]},
{current: "creating_remote_schema", next: stateTransitions[model.CreatedRemoteSchema]},
{current: "exporting_data", next: stateTransitions[model.ExportedData]},

// failed states
{current: "generating_upload_schema_failed", next: stateTransitions[model.GeneratedUploadSchema]},
{current: "creating_table_uploads_failed", next: stateTransitions[model.CreatedTableUploads]},
{current: "generating_load_files_failed", next: stateTransitions[model.GeneratedLoadFiles]},
{current: "updating_table_uploads_counts_failed", next: stateTransitions[model.UpdatedTableUploadsCounts]},
{current: "creating_remote_schema_failed", next: stateTransitions[model.CreatedRemoteSchema]},
{current: "exporting_data_failed", next: stateTransitions[model.ExportedData]},
}
for index, tc := range testCases {
require.Equal(t, tc.next, nextState(tc.current), "test case %d", index)
}
})
}
108 changes: 3 additions & 105 deletions warehouse/router/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,6 @@ const (
singerProtocolSourceCategory = "singer-protocol"
)

var stateTransitions map[string]*uploadState

type uploadState struct {
inProgress string
failed string
completed string
nextState *uploadState
}

type tableNameT string

type UploadJobFactory struct {
Expand Down Expand Up @@ -162,10 +153,6 @@ var (
}
)

func init() {
initializeStateMachine()
}

func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJob, whManager manager.Manager) *UploadJob {
ujCtx := whutils.CtxWithUploadID(ctx, dto.Upload.ID)

Expand Down Expand Up @@ -430,12 +417,12 @@ func (job *UploadJob) run() (err error) {

var (
newStatus string
nextUploadState *uploadState
nextUploadState *state
)

// do not set nextUploadState if hasSchemaChanged to make it start from 1st step again
if !hasSchemaChanged {
nextUploadState = getNextUploadState(job.upload.Status)
nextUploadState = nextState(job.upload.Status)
}
if nextUploadState == nil {
nextUploadState = stateTransitions[model.GeneratedUploadSchema]
Expand Down Expand Up @@ -665,7 +652,7 @@ func (job *UploadJob) run() (err error) {
break
}

nextUploadState = getNextUploadState(newStatus)
nextUploadState = nextState(newStatus)
}

if newStatus != model.ExportedData {
Expand Down Expand Up @@ -1897,95 +1884,6 @@ func (job *UploadJob) DTO() *model.UploadJob {
}
}

/*
* State Machine for upload job lifecycle
*/

func getNextUploadState(dbStatus string) *uploadState {
for _, uploadState := range stateTransitions {
if dbStatus == uploadState.inProgress || dbStatus == uploadState.failed {
return uploadState
}
if dbStatus == uploadState.completed {
return uploadState.nextState
}
}
return nil
}

func getInProgressState(state string) string {
uploadState, ok := stateTransitions[state]
if !ok {
panic(fmt.Errorf("invalid Upload state: %s", state))
}
return uploadState.inProgress
}

func initializeStateMachine() {
stateTransitions = make(map[string]*uploadState)

waitingState := &uploadState{
completed: model.Waiting,
}
stateTransitions[model.Waiting] = waitingState

generateUploadSchemaState := &uploadState{
inProgress: "generating_upload_schema",
failed: "generating_upload_schema_failed",
completed: model.GeneratedUploadSchema,
}
stateTransitions[model.GeneratedUploadSchema] = generateUploadSchemaState

createTableUploadsState := &uploadState{
inProgress: "creating_table_uploads",
failed: "creating_table_uploads_failed",
completed: model.CreatedTableUploads,
}
stateTransitions[model.CreatedTableUploads] = createTableUploadsState

generateLoadFilesState := &uploadState{
inProgress: "generating_load_files",
failed: "generating_load_files_failed",
completed: model.GeneratedLoadFiles,
}
stateTransitions[model.GeneratedLoadFiles] = generateLoadFilesState

updateTableUploadCountsState := &uploadState{
inProgress: "updating_table_uploads_counts",
failed: "updating_table_uploads_counts_failed",
completed: model.UpdatedTableUploadsCounts,
}
stateTransitions[model.UpdatedTableUploadsCounts] = updateTableUploadCountsState

createRemoteSchemaState := &uploadState{
inProgress: "creating_remote_schema",
failed: "creating_remote_schema_failed",
completed: model.CreatedRemoteSchema,
}
stateTransitions[model.CreatedRemoteSchema] = createRemoteSchemaState

exportDataState := &uploadState{
inProgress: "exporting_data",
failed: "exporting_data_failed",
completed: model.ExportedData,
}
stateTransitions[model.ExportedData] = exportDataState

abortState := &uploadState{
completed: model.Aborted,
}
stateTransitions[model.Aborted] = abortState

waitingState.nextState = generateUploadSchemaState
generateUploadSchemaState.nextState = createTableUploadsState
createTableUploadsState.nextState = generateLoadFilesState
generateLoadFilesState.nextState = updateTableUploadCountsState
updateTableUploadCountsState.nextState = createRemoteSchemaState
createRemoteSchemaState.nextState = exportDataState
exportDataState.nextState = nil
abortState.nextState = nil
}

func (job *UploadJob) GetLocalSchema(ctx context.Context) (model.Schema, error) {
return job.schemaHandle.GetLocalSchema(ctx)
}
Expand Down
Loading