diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 40f825b56..dd1973024 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -776,6 +776,22 @@ type aggregatedWorker struct { registry *registry } +func (aw *aggregatedWorker) GetRegisteredWorkflows() []string { + return aw.registry.GetRegisteredWorkflows() +} + +func (aw *aggregatedWorker) GetWorkflowFunc(registerName string) (interface{}, bool) { + return aw.registry.GetWorkflowFunc(registerName) +} + +func (aw *aggregatedWorker) GetRegisteredActivities() []string { + return aw.registry.GetRegisteredActivities() +} + +func (aw *aggregatedWorker) GetActivityFunc(registerName string) (interface{}, bool) { + return aw.registry.GetActivityFunc(registerName) +} + func (aw *aggregatedWorker) RegisterWorkflow(w interface{}) { aw.registry.RegisterWorkflow(w) } diff --git a/internal/internal_worker_test.go b/internal/internal_worker_test.go index fde69d7e1..4e54ac4c4 100644 --- a/internal/internal_worker_test.go +++ b/internal/internal_worker_test.go @@ -26,6 +26,7 @@ import ( "errors" "os" "reflect" + "runtime" "sync" "testing" "time" @@ -831,14 +832,48 @@ func testWorkflowReturnStructPtrPtr(ctx Context, arg1 int) (result **testWorkflo func TestRegisterVariousWorkflowTypes(t *testing.T) { r := newRegistry() - r.RegisterWorkflow(testWorkflowSample) - r.RegisterWorkflow(testWorkflowMultipleArgs) - r.RegisterWorkflow(testWorkflowNoArgs) - r.RegisterWorkflow(testWorkflowReturnInt) - r.RegisterWorkflow(testWorkflowReturnString) - r.RegisterWorkflow(testWorkflowReturnStruct) - r.RegisterWorkflow(testWorkflowReturnStructPtr) - r.RegisterWorkflow(testWorkflowReturnStructPtrPtr) + w := &aggregatedWorker{registry: r} + w.RegisterWorkflowWithOptions(testWorkflowSample, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowMultipleArgs, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowNoArgs, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowReturnInt, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowReturnString, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowReturnStruct, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtr, RegisterWorkflowOptions{EnableShortName: true}) + w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtrPtr, RegisterWorkflowOptions{EnableShortName: true}) + + wfs := w.GetRegisteredWorkflows() + assert.Equal(t, 8, len(wfs)) + assert.Contains(t, wfs, "testWorkflowSample") + assert.Contains(t, wfs, "testWorkflowMultipleArgs") + assert.Contains(t, wfs, "testWorkflowNoArgs") + assert.Contains(t, wfs, "testWorkflowReturnInt") + assert.Contains(t, wfs, "testWorkflowReturnString") + assert.Contains(t, wfs, "testWorkflowReturnString") + assert.Contains(t, wfs, "testWorkflowReturnStructPtr") + assert.Contains(t, wfs, "testWorkflowReturnStructPtrPtr") + + // sample assertion on workflow func + fn, ok := w.GetWorkflowFunc("testWorkflowSample") + assert.True(t, ok) + assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind()) + assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) +} + +func TestRegisterActivityWithOptions(t *testing.T) { + r := newRegistry() + w := &aggregatedWorker{registry: r} + w.RegisterActivityWithOptions(testActivityMultipleArgs, RegisterActivityOptions{EnableShortName: true}) + + wfs := w.GetRegisteredActivities() + assert.Equal(t, 1, len(wfs)) + assert.Contains(t, wfs, "testActivityMultipleArgs") + + // assert activity function + fn, ok := w.GetActivityFunc("testActivityMultipleArgs") + assert.True(t, ok) + assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind()) + assert.Equal(t, getFunctionName(testActivityMultipleArgs), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) } type testErrorDetails struct { diff --git a/internal/registry.go b/internal/registry.go index 546f5ef53..64fa609a8 100644 --- a/internal/registry.go +++ b/internal/registry.go @@ -124,6 +124,31 @@ func (r *registry) RegisterActivityWithOptions(af interface{}, options RegisterA } } +func (r *registry) GetRegisteredWorkflows() []string { + return r.GetRegisteredWorkflowTypes() +} + +func (r *registry) GetWorkflowFunc(registerName string) (interface{}, bool) { + return r.getWorkflowFn(registerName) +} + +func (r *registry) GetRegisteredActivities() []string { + activities := r.getRegisteredActivities() + activityNames := make([]string, 0, len(activities)) + for _, a := range activities { + activityNames = append(activityNames, a.ActivityType().Name) + } + return activityNames +} + +func (r *registry) GetActivityFunc(registerName string) (interface{}, bool) { + a, ok := r.GetActivity(registerName) + if !ok { + return nil, false + } + return a.GetFunction(), ok +} + func (r *registry) registerActivityFunction(af interface{}, options RegisterActivityOptions) error { fnType := reflect.TypeOf(af) if err := validateFnFormat(fnType, false); err != nil { diff --git a/internal/registry_test.go b/internal/registry_test.go index 7e5f1b8d7..2e6693193 100644 --- a/internal/registry_test.go +++ b/internal/registry_test.go @@ -105,16 +105,16 @@ func TestWorkflowRegistration(t *testing.T) { tt.register(r) // Verify registered workflow type - workflowType := r.GetRegisteredWorkflowTypes()[0] + workflowType := r.GetRegisteredWorkflows()[0] require.Equal(t, tt.workflowType, workflowType) // Verify workflow is resolved from workflow type - _, ok := r.getWorkflowFn(tt.workflowType) + _, ok := r.GetWorkflowFunc(tt.workflowType) require.True(t, ok) // Verify workflow is resolved from alternative (backwards compatible) workflow type if len(tt.altWorkflowType) > 0 { - _, ok = r.getWorkflowFn(tt.altWorkflowType) + _, ok = r.GetWorkflowFunc(tt.altWorkflowType) require.True(t, ok) } @@ -228,10 +228,13 @@ func TestActivityRegistration(t *testing.T) { // Verify registered activity type activityType := r.getRegisteredActivities()[0].ActivityType().Name require.Equal(t, tt.activityType, activityType, "activity type") + require.Equal(t, tt.activityType, r.GetRegisteredActivities()[0]) // Verify activity is resolved from activity type _, ok := r.GetActivity(tt.activityType) require.True(t, ok) + _, ok = r.GetActivityFunc(tt.activityType) + require.True(t, ok) // Verify activity is resolved from alternative (backwards compatible) activity type if len(tt.altActivityType) > 0 { diff --git a/internal/workflow_replayer.go b/internal/workflow_replayer.go index 07612a01a..6f13745c6 100644 --- a/internal/workflow_replayer.go +++ b/internal/workflow_replayer.go @@ -131,6 +131,22 @@ func (r *WorkflowReplayer) RegisterActivityWithOptions(a interface{}, options Re r.registry.RegisterActivityWithOptions(a, options) } +func (r *WorkflowReplayer) GetRegisteredWorkflows() []string { + return r.registry.GetRegisteredWorkflows() +} + +func (r *WorkflowReplayer) GetWorkflowFunc(registerName string) (interface{}, bool) { + return r.registry.GetWorkflowFunc(registerName) +} + +func (r *WorkflowReplayer) GetRegisteredActivities() []string { + return r.registry.GetRegisteredActivities() +} + +func (r *WorkflowReplayer) GetActivityFunc(registerName string) (interface{}, bool) { + return r.registry.GetActivityFunc(registerName) +} + // ReplayWorkflowHistory executes a single decision task for the given history. // Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger. // The logger is an optional parameter. Defaults to the noop logger. diff --git a/internal/workflow_replayer_test.go b/internal/workflow_replayer_test.go index 90c7c52e6..4a7979a2f 100644 --- a/internal/workflow_replayer_test.go +++ b/internal/workflow_replayer_test.go @@ -24,6 +24,8 @@ import ( "context" "errors" "fmt" + "reflect" + "runtime" "testing" "time" @@ -139,6 +141,18 @@ func (s *workflowReplayerSuite) TestReplayWorkflowHistoryFromFile() { s.NoError(err) } +func (s *workflowReplayerSuite) TestActivityRegistration() { + name := "test-Activity" + s.replayer.RegisterActivityWithOptions(testActivityFunction, RegisterActivityOptions{Name: name}) + a := s.replayer.GetRegisteredActivities()[0] + s.Equal(name, a) + + fn, ok := s.replayer.GetActivityFunc(a) + s.True(ok) + s.Equal(reflect.Func, reflect.ValueOf(fn).Kind()) + s.Equal(getFunctionName(testActivityFunction), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) +} + func testReplayWorkflow(ctx Context) error { ao := ActivityOptions{ ScheduleToStartTimeout: time.Second, diff --git a/internal/workflow_shadower.go b/internal/workflow_shadower.go index 9c2257bc4..a6e9b1430 100644 --- a/internal/workflow_shadower.go +++ b/internal/workflow_shadower.go @@ -228,6 +228,16 @@ func (s *WorkflowShadower) Stop() { } } +// GetRegisteredWorkflows retrieves the list of workflows registered on the worker +func (s *WorkflowShadower) GetRegisteredWorkflows() []string { + return s.replayer.GetRegisteredWorkflows() +} + +// GetWorkflowFn returns the workflow function corresponding to the provided registerName +func (s *WorkflowShadower) GetWorkflowFunc(registerName string) (interface{}, bool) { + return s.replayer.GetWorkflowFunc(registerName) +} + func (s *WorkflowShadower) shadowWorker() error { s.shutdownWG.Add(1) defer s.shutdownWG.Done() diff --git a/internal/workflow_shadower_test.go b/internal/workflow_shadower_test.go index 849ad9e2b..0b0c89928 100644 --- a/internal/workflow_shadower_test.go +++ b/internal/workflow_shadower_test.go @@ -23,6 +23,8 @@ package internal import ( "context" "fmt" + "reflect" + "runtime" "sync" "testing" "time" @@ -437,6 +439,16 @@ func (s *workflowShadowerSuite) TestShadowWorker_ExpectedReplayError() { } } +func (s *workflowShadowerSuite) TestWorkflowRegistration() { + wfName := s.testShadower.GetRegisteredWorkflows()[0] + fnName := getFunctionName(testReplayWorkflow) + s.Equal(fnName, wfName) + + fn, ok := s.testShadower.GetWorkflowFunc(wfName) + s.True(ok) + s.Equal(fnName, runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()) +} + func newTestWorkflowExecutions(size int) []*shared.WorkflowExecutionInfo { executions := make([]*shared.WorkflowExecutionInfo, size) for i := 0; i != size; i++ { diff --git a/worker/worker.go b/worker/worker.go index 83f4dd6c7..742179f57 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -79,6 +79,20 @@ type ( // This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow // type name twice. Use workflow.RegisterOptions.DisableAlreadyRegisteredCheck to allow multiple registrations. RegisterWorkflowWithOptions(w interface{}, options workflow.RegisterOptions) + + // GetRegisteredWorkflows returns a list of all workflows registered on the worker. + // The returned workflow name is by default the method name. However, if the workflow was registered + // with options (see Worker.RegisterWorkflowWithOptions), the workflow may have customized name. + // For chained registries, this returns a combined list of all registered activities from the current + // instance to the global registry. In this case, the list may contain duplicate names. + GetRegisteredWorkflows() []string + + // GetWorkflowFunc takes a name and returns the corresponding workflow + // function and a boolean value indicating whether the activity was found. + // It returns nil, false when no workflow was registered with the provided name. + // The registerName is the resolved name recorded on the registry after all options + // from workflow.RegisterOptions{} are applied. + GetWorkflowFunc(registerName string) (interface{}, bool) } // ActivityRegistry exposes activity registration functions to consumers. @@ -131,6 +145,22 @@ type ( // which might be useful for integration tests. // worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true}) RegisterActivityWithOptions(a interface{}, options activity.RegisterOptions) + + // GetRegisteredActivities returns the names of all activities registered on the worker. + // The activity name is by default the method name. However, if the activity was registered + // with options (see Worker.RegisterActivityWithOptions), the activity may have customized name. + // For example, struct pointer activities that were registered with the Name option activity.RegisterOptions{Name: ...} + // will have their method names prepended with the provided name option. + // For chained registries, this returns a combined list of all registered activities from the current + // instance to the global registry. In this case, the list may contain duplicate names. + GetRegisteredActivities() []string + + // GetActivityFunc takes a name and returns the corresponding activity + // function and a boolean value indicating whether the activity was found. + // It returns nil, false when no activity was registered with the provided name. + // The registerName is the resolved name recorded on the registry after all options + // from activity.RegisterOptions{} are applied. + GetActivityFunc(registerName string) (interface{}, bool) } // WorkflowReplayer supports replaying a workflow from its event history.