From c423fc8d06abac900c87f97c49d4bf1cba866890 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 1 Feb 2023 14:50:09 -0800 Subject: [PATCH] Expose session state (#1024) Expose session state --- internal/session.go | 67 ++++++++++++++++++------------------- internal/session_test.go | 30 ++++++++--------- internal/workflow.go | 4 +-- test/integration_test.go | 33 +++++++++++++++++++ test/workflow_test.go | 41 +++++++++++++++++++++++ workflow/session.go | 71 ++++++++++++++++++++++++++-------------- 6 files changed, 172 insertions(+), 74 deletions(-) diff --git a/internal/session.go b/internal/session.go index 292d9e828..34177a35c 100644 --- a/internal/session.go +++ b/internal/session.go @@ -46,9 +46,9 @@ type ( SessionInfo struct { SessionID string HostName string - resourceID string // hide from user for now - taskqueue string // resource specific taskqueue - sessionState sessionState + SessionState SessionState + resourceID string // hide from user for now + taskqueue string // resource specific taskqueue sessionCancelFunc CancelFunc // cancel func for the session context, used by both creation activity and user activities completionCtx Context // context for executing the completion activity } @@ -71,7 +71,7 @@ type ( Taskqueue string } - sessionState int + SessionState int sessionTokenBucket struct { *sync.Cond @@ -104,9 +104,9 @@ type ( // Session State enum const ( - sessionStateOpen sessionState = iota - sessionStateFailed - sessionStateClosed + SessionStateOpen SessionState = iota + SessionStateFailed + SessionStateClosed ) const ( @@ -143,11 +143,11 @@ var ( // ActivityOptions. If none is specified, the default one will be used. // // CreationSession will fail in the following situations: -// 1. The context passed in already contains a session which is still open -// (not closed and failed). -// 2. All the workers are busy (number of sessions currently running on all the workers have reached -// MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session -// cannot be created within a specified timeout. +// 1. The context passed in already contains a session which is still open +// (not closed and failed). +// 2. All the workers are busy (number of sessions currently running on all the workers have reached +// MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session +// cannot be created within a specified timeout. // // If an activity is executed using the returned context, it's regarded as part of the // session. All activities within the same session will be executed by the same worker. @@ -164,22 +164,23 @@ var ( // New session can be created if necessary to retry the whole session. // // Example: -// so := &SessionOptions{ -// ExecutionTimeout: time.Minute, -// CreationTimeout: time.Minute, -// } -// sessionCtx, err := CreateSession(ctx, so) -// if err != nil { -// // Creation failed. Wrong ctx or too many outstanding sessions. -// } -// defer CompleteSession(sessionCtx) -// err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil) -// if err == ErrSessionFailed { -// // Session has failed -// } else { -// // Handle activity error -// } -// ... // execute more activities using sessionCtx +// +// so := &SessionOptions{ +// ExecutionTimeout: time.Minute, +// CreationTimeout: time.Minute, +// } +// sessionCtx, err := CreateSession(ctx, so) +// if err != nil { +// // Creation failed. Wrong ctx or too many outstanding sessions. +// } +// defer CompleteSession(sessionCtx) +// err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil) +// if err == ErrSessionFailed { +// // Session has failed +// } else { +// // Handle activity error +// } +// ... // execute more activities using sessionCtx func CreateSession(ctx Context, sessionOptions *SessionOptions) (Context, error) { options := getActivityOptions(ctx) baseTaskqueue := options.TaskQueueName @@ -214,7 +215,7 @@ func RecreateSession(ctx Context, recreateToken []byte, sessionOptions *SessionO // it's not in a session. func CompleteSession(ctx Context) { sessionInfo := getSessionInfo(ctx) - if sessionInfo == nil || sessionInfo.sessionState != sessionStateOpen { + if sessionInfo == nil || sessionInfo.SessionState != SessionStateOpen { return } @@ -237,7 +238,7 @@ func CompleteSession(ctx Context) { GetLogger(completionCtx).Warn("Complete session activity failed", tagError, err) } - sessionInfo.sessionState = sessionStateClosed + sessionInfo.SessionState = SessionStateClosed getWorkflowEnvironment(ctx).RemoveSession(sessionInfo.SessionID) GetLogger(ctx).Debug("Completed session", "sessionID", sessionInfo.SessionID) } @@ -279,7 +280,7 @@ func setSessionInfo(ctx Context, sessionInfo *SessionInfo) Context { func createSession(ctx Context, creationTaskqueue string, options *SessionOptions, retryable bool) (Context, error) { logger := GetLogger(ctx) logger.Debug("Start creating session") - if prevSessionInfo := getSessionInfo(ctx); prevSessionInfo != nil && prevSessionInfo.sessionState == sessionStateOpen { + if prevSessionInfo := getSessionInfo(ctx); prevSessionInfo != nil && prevSessionInfo.SessionState == SessionStateOpen { return nil, errFoundExistingOpenSession } sessionID, err := generateSessionID(ctx) @@ -316,7 +317,7 @@ func createSession(ctx Context, creationTaskqueue string, options *SessionOption sessionInfo := &SessionInfo{ SessionID: sessionID, - sessionState: sessionStateOpen, + SessionState: SessionStateOpen, } completionCtx := setSessionInfo(ctx, sessionInfo) sessionInfo.completionCtx = completionCtx @@ -361,7 +362,7 @@ func createSession(ctx Context, creationTaskqueue string, options *SessionOption if !errors.As(err, &canceledErr) { getWorkflowEnvironment(creationCtx).RemoveSession(sessionID) GetLogger(creationCtx).Debug("Session failed", "sessionID", sessionID, tagError, err) - sessionInfo.sessionState = sessionStateFailed + sessionInfo.SessionState = SessionStateFailed sessionCancelFunc() } }) diff --git a/internal/session_test.go b/internal/session_test.go index 1adfb8eae..e14836f6a 100644 --- a/internal/session_test.go +++ b/internal/session_test.go @@ -72,14 +72,14 @@ func (s *SessionTestSuite) TestCreationCompletion() { return err } info := GetSessionInfo(sessionCtx) - if info == nil || info.sessionState != sessionStateOpen { + if info == nil || info.SessionState != SessionStateOpen { return errors.New("session state should be open after creation") } CompleteSession(sessionCtx) info = GetSessionInfo(sessionCtx) - if info == nil || info.sessionState != sessionStateClosed { + if info == nil || info.SessionState != SessionStateClosed { return errors.New("session state should be closed after completion") } return nil @@ -102,7 +102,7 @@ func (s *SessionTestSuite) TestCreationWithOpenSessionContext() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateOpen, + SessionState: SessionStateOpen, }) _, err := CreateSession(sessionCtx, s.sessionOptions) return err @@ -137,7 +137,7 @@ func (s *SessionTestSuite) TestCreationWithClosedSessionContext() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, }) sessionCtx, err := CreateSession(sessionCtx, s.sessionOptions) @@ -171,7 +171,7 @@ func (s *SessionTestSuite) TestCreationWithFailedSessionContext() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateFailed, + SessionState: SessionStateFailed, }) sessionCtx, err := CreateSession(sessionCtx, s.sessionOptions) @@ -199,7 +199,7 @@ func (s *SessionTestSuite) TestCompletionWithClosedSessionContext() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, }) CompleteSession(sessionCtx) return nil @@ -219,7 +219,7 @@ func (s *SessionTestSuite) TestCompletionWithFailedSessionContext() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateFailed, + SessionState: SessionStateFailed, }) CompleteSession(sessionCtx) return nil @@ -244,7 +244,7 @@ func (s *SessionTestSuite) TestGetSessionInfo() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateFailed, + SessionState: SessionStateFailed, }) info = GetSessionInfo(sessionCtx) if info == nil { @@ -254,7 +254,7 @@ func (s *SessionTestSuite) TestGetSessionInfo() { newSessionInfo := &SessionInfo{ SessionID: "another sessionID", taskqueue: "another taskqueue", - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, } sessionCtx = setSessionInfo(ctx, newSessionInfo) info = GetSessionInfo(sessionCtx) @@ -286,7 +286,7 @@ func (s *SessionTestSuite) TestRecreation() { sessionInfo := &SessionInfo{ SessionID: "some random sessionID", taskqueue: "some random taskqueue", - sessionState: sessionStateFailed, + SessionState: SessionStateFailed, } sessionCtx, err := RecreateSession(ctx, sessionInfo.GetRecreateToken(), s.sessionOptions) @@ -461,7 +461,7 @@ func (s *SessionTestSuite) TestSessionRecreationTaskQueue() { sessionInfo := &SessionInfo{ SessionID: "testSessionID", taskqueue: resourceSpecificTaskQueue, - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, } sessionCtx, err := RecreateSession(ctx, sessionInfo.GetRecreateToken(), s.sessionOptions) if err != nil { @@ -509,7 +509,7 @@ func (s *SessionTestSuite) TestExecuteActivityInFailedSession() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "random sessionID", taskqueue: "random taskqueue", - sessionState: sessionStateFailed, + SessionState: SessionStateFailed, }) return ExecuteActivity(sessionCtx, testSessionActivity, "a random name").Get(sessionCtx, nil) @@ -543,7 +543,7 @@ func (s *SessionTestSuite) TestExecuteActivityInClosedSession() { sessionCtx := setSessionInfo(ctx, &SessionInfo{ SessionID: "random sessionID", taskqueue: "random taskqueue", - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, }) return ExecuteActivity(sessionCtx, testSessionActivity, "some random message").Get(sessionCtx, nil) @@ -569,7 +569,7 @@ func (s *SessionTestSuite) TestSessionRecreateToken() { sessionInfo := &SessionInfo{ SessionID: "testSessionID", taskqueue: taskqueue, - sessionState: sessionStateClosed, + SessionState: SessionStateClosed, } token := sessionInfo.GetRecreateToken() params, err := deserializeRecreateToken(token) @@ -600,7 +600,7 @@ func (s *SessionTestSuite) TestCompletionFailed() { CompleteSession(sessionCtx) info := GetSessionInfo(sessionCtx) - if info == nil || info.sessionState != sessionStateClosed { + if info == nil || info.SessionState != SessionStateClosed { return errors.New("session state should be closed after completion even when completion activity failed") } return nil diff --git a/internal/workflow.go b/internal/workflow.go index 4d7ea47a9..645e7218d 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -566,11 +566,11 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName // Validate session state. if sessionInfo := getSessionInfo(ctx); sessionInfo != nil { isCreationActivity := isSessionCreationActivity(typeName) - if sessionInfo.sessionState == sessionStateFailed && !isCreationActivity { + if sessionInfo.SessionState == SessionStateFailed && !isCreationActivity { settable.Set(nil, ErrSessionFailed) return future } - if sessionInfo.sessionState == sessionStateOpen && !isCreationActivity { + if sessionInfo.SessionState == SessionStateOpen && !isCreationActivity { // Use session taskqueue oldTaskQueueName := options.TaskQueueName options.TaskQueueName = sessionInfo.taskqueue diff --git a/test/integration_test.go b/test/integration_test.go index 8e3a94cba..726e8509f 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -1159,6 +1159,39 @@ func (ts *IntegrationTestSuite) TestBasicSession() { ts.tracer.GetTrace("BasicSession")) } +func (ts *IntegrationTestSuite) TestSessionStateFailedWorkerFailed() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + ts.activities.manualStopContext = ctx + // We want to start a single long-running activity in a session + run, err := ts.client.ExecuteWorkflow(ctx, + ts.startWorkflowOptions("test-session-worker-failure"), + ts.workflows.SessionFailedStateWorkflow, + &AdvancedSessionParams{ + SessionCount: 1, + SessionCreationTimeout: 10 * time.Second, + }) + ts.NoError(err) + + // Wait until sessions started + ts.waitForQueryTrue(run, "sessions-created-equals", 1) + + // Kill the worker, this should cause the session to timeout. + ts.worker.Stop() + ts.workerStopped = true + + // Now create a new worker on that same task queue to resume the work of the + // workflow + nextWorker := worker.New(ts.client, ts.taskQueueName, worker.Options{DisableStickyExecution: true}) + ts.registerWorkflowsAndActivities(nextWorker) + ts.NoError(nextWorker.Start()) + defer nextWorker.Stop() + + // Get the result of the workflow run now + err = run.Get(ctx, nil) + ts.NoError(err) +} + func (ts *IntegrationTestSuite) TestAsyncActivityCompletion() { workflowID := "test-async-activity-completion" ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) diff --git a/test/workflow_test.go b/test/workflow_test.go index 6e3b47d28..c60542211 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1275,6 +1275,46 @@ func (w Workflows) AdvancedSession(ctx workflow.Context, params *AdvancedSession return nil } +func (w Workflows) SessionFailedStateWorkflow(ctx workflow.Context, params *AdvancedSessionParams) error { + ctx = workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Second, + // No retry on activities + RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 1}, + }) + + // Create a query to know sessions pending or started + var sessionsCreated int + err := workflow.SetQueryHandler(ctx, "sessions-created-equals", func(expected int) (bool, error) { + return sessionsCreated == expected, nil + }) + if err != nil { + return err + } + + opts := &workflow.SessionOptions{ + CreationTimeout: params.SessionCreationTimeout, + ExecutionTimeout: 20 * time.Second, + // Note the heartbeat timeout is less then half the activity timeout. + HeartbeatTimeout: 1 * time.Second, + } + sessionCtx, err := workflow.CreateSession(ctx, opts) + if err != nil { + return err + } + sessionsCreated += 1 + var act Activities + // The test should kill the worker and the session should fail. + err = workflow.ExecuteActivity(sessionCtx, act.WaitForManualStop).Get(sessionCtx, nil) + var canceledErr *temporal.CanceledError + if !errors.As(err, &canceledErr) { + return errors.New("Expected activity to be canceled") + } + if workflow.GetSessionInfo(sessionCtx).SessionState != workflow.SessionStateFailed { + return errors.New("Session not in correct state") + } + return nil +} + func (w *Workflows) ActivityCompletionUsingID(ctx workflow.Context) ([]string, error) { activityAOptions := workflow.ActivityOptions{ ActivityID: "A", @@ -2055,6 +2095,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.HistoryLengths) worker.RegisterWorkflow(w.HeartbeatSpecificCount) worker.RegisterWorkflow(w.UpsertMemo) + worker.RegisterWorkflow(w.SessionFailedStateWorkflow) worker.RegisterWorkflow(w.child) worker.RegisterWorkflow(w.childForMemoAndSearchAttr) diff --git a/workflow/session.go b/workflow/session.go index ba9baf619..dfd605c58 100644 --- a/workflow/session.go +++ b/workflow/session.go @@ -31,9 +31,17 @@ import ( type ( // SessionInfo contains information of a created session. For now, exported // fields are SessionID and HostName. + // // SessionID is a uuid generated when CreateSession() or RecreateSession() // is called and can be used to uniquely identify a session. + // // HostName specifies which host is executing the session + // + // SessionState specifies the current know state of the session. + // + // Note: Sessions have an inherently stale view of the worker they are running on. Session + // state may be stale up the the SessionOptions.HeartbeatTimeout. SessionOptions.HeartbeatTimeout + // should be less than half the activity timeout for the state to be accurate when checking after activity failure. SessionInfo = internal.SessionInfo // SessionOptions specifies metadata for a session. @@ -45,11 +53,25 @@ type ( // Specifies the heartbeat timeout. If heartbeat is not received by server // within the timeout, the session will be declared as failed SessionOptions = internal.SessionOptions + + // SessionState specifies the state of the session. + SessionState = internal.SessionState ) -// ErrSessionFailed is the error returned when user tries to execute an activity but the -// session it belongs to has already failed -var ErrSessionFailed = internal.ErrSessionFailed +var ( + // ErrSessionFailed is the error returned when user tries to execute an activity but the + // session it belongs to has already failed + ErrSessionFailed = internal.ErrSessionFailed + + // SessionStateOpen means the session worker is heartbeating and new activities will be schedule on the session host. + SessionStateOpen = internal.SessionStateOpen + + // SessionStateClosed means the session was closed by the workflow and new activities will not be scheduled on the session host. + SessionStateClosed = internal.SessionStateClosed + + // SessionStateFailed means the session worker was detected to be down and the session cannot be used to schedule new activities. + SessionStateFailed = internal.SessionStateFailed +) // Note: Worker should be configured to process session. To do this, set the following // fields in WorkerOptions: @@ -62,11 +84,11 @@ var ErrSessionFailed = internal.ErrSessionFailed // ActivityOptions. If none is specified, the default one will be used. // // CreationSession will fail in the following situations: -// 1. The context passed in already contains a session which is still open -// (not closed and failed). -// 2. All the workers are busy (number of sessions currently running on all the workers have reached -// MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session -// cannot be created within a specified timeout. +// 1. The context passed in already contains a session which is still open +// (not closed and failed). +// 2. All the workers are busy (number of sessions currently running on all the workers have reached +// MaxConcurrentSessionExecutionSize, which is specified when starting the workers) and session +// cannot be created within a specified timeout. // // If an activity is executed using the returned context, it's regarded as part of the // session. All activities within the same session will be executed by the same worker. @@ -83,22 +105,23 @@ var ErrSessionFailed = internal.ErrSessionFailed // New session can be created if necessary to retry the whole session. // // Example: -// so := &SessionOptions{ -// ExecutionTimeout: time.Minute, -// CreationTimeout: time.Minute, -// } -// sessionCtx, err := CreateSession(ctx, so) -// if err != nil { -// // Creation failed. Wrong ctx or too many outstanding sessions. -// } -// defer CompleteSession(sessionCtx) -// err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil) -// if err == ErrSessionFailed { -// // Session has failed -// } else { -// // Handle activity error -// } -// ... // execute more activities using sessionCtx +// +// so := &SessionOptions{ +// ExecutionTimeout: time.Minute, +// CreationTimeout: time.Minute, +// } +// sessionCtx, err := CreateSession(ctx, so) +// if err != nil { +// // Creation failed. Wrong ctx or too many outstanding sessions. +// } +// defer CompleteSession(sessionCtx) +// err = ExecuteActivity(sessionCtx, someActivityFunc, activityInput).Get(sessionCtx, nil) +// if err == ErrSessionFailed { +// // Session has failed +// } else { +// // Handle activity error +// } +// ... // execute more activities using sessionCtx // // NOTE: Session recreation via RecreateSession may not work properly across worker fail/crash before Temporal server // version v1.15.1.