Skip to content

Commit

Permalink
Add methods on Worker to get registered workflows and activities (#1342)
Browse files Browse the repository at this point in the history
Introduce the following methods to the WorkflowRegistry and ActivityRegistry interfaces:

GetRegisteredWorkflows
GetWorkflowAlias
GetWorkflowFn
GetRegisteredActivities
GetActivityAlias
GetActivityFn
The logic already exists in the internal implementation of the registry but not exposed to the public API.
Also implement these methods for WorkflowReplayer and WorkflowShadower.
Update unit tests so that they now call the top level methods.

Why?
To expose on Uber internal debug page

How did you test it?
Unit tests
Tested on staging environment

Potential risks
Worst case: these methods return unexpected result
  • Loading branch information
ketsiambaku authored Jun 12, 2024
1 parent bf68484 commit 13c2821
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 11 deletions.
16 changes: 16 additions & 0 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,22 @@ type aggregatedWorker struct {
registry *registry
}

func (aw *aggregatedWorker) GetRegisteredWorkflows() []string {
return aw.registry.GetRegisteredWorkflows()
}

func (aw *aggregatedWorker) GetWorkflowFunc(registerName string) (interface{}, bool) {
return aw.registry.GetWorkflowFunc(registerName)
}

func (aw *aggregatedWorker) GetRegisteredActivities() []string {
return aw.registry.GetRegisteredActivities()
}

func (aw *aggregatedWorker) GetActivityFunc(registerName string) (interface{}, bool) {
return aw.registry.GetActivityFunc(registerName)
}

func (aw *aggregatedWorker) RegisterWorkflow(w interface{}) {
aw.registry.RegisterWorkflow(w)
}
Expand Down
51 changes: 43 additions & 8 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"errors"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -831,14 +832,48 @@ func testWorkflowReturnStructPtrPtr(ctx Context, arg1 int) (result **testWorkflo

func TestRegisterVariousWorkflowTypes(t *testing.T) {
r := newRegistry()
r.RegisterWorkflow(testWorkflowSample)
r.RegisterWorkflow(testWorkflowMultipleArgs)
r.RegisterWorkflow(testWorkflowNoArgs)
r.RegisterWorkflow(testWorkflowReturnInt)
r.RegisterWorkflow(testWorkflowReturnString)
r.RegisterWorkflow(testWorkflowReturnStruct)
r.RegisterWorkflow(testWorkflowReturnStructPtr)
r.RegisterWorkflow(testWorkflowReturnStructPtrPtr)
w := &aggregatedWorker{registry: r}
w.RegisterWorkflowWithOptions(testWorkflowSample, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowMultipleArgs, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowNoArgs, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowReturnInt, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowReturnString, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowReturnStruct, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtr, RegisterWorkflowOptions{EnableShortName: true})
w.RegisterWorkflowWithOptions(testWorkflowReturnStructPtrPtr, RegisterWorkflowOptions{EnableShortName: true})

wfs := w.GetRegisteredWorkflows()
assert.Equal(t, 8, len(wfs))
assert.Contains(t, wfs, "testWorkflowSample")
assert.Contains(t, wfs, "testWorkflowMultipleArgs")
assert.Contains(t, wfs, "testWorkflowNoArgs")
assert.Contains(t, wfs, "testWorkflowReturnInt")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnString")
assert.Contains(t, wfs, "testWorkflowReturnStructPtr")
assert.Contains(t, wfs, "testWorkflowReturnStructPtrPtr")

// sample assertion on workflow func
fn, ok := w.GetWorkflowFunc("testWorkflowSample")
assert.True(t, ok)
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testWorkflowSample), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}

func TestRegisterActivityWithOptions(t *testing.T) {
r := newRegistry()
w := &aggregatedWorker{registry: r}
w.RegisterActivityWithOptions(testActivityMultipleArgs, RegisterActivityOptions{EnableShortName: true})

wfs := w.GetRegisteredActivities()
assert.Equal(t, 1, len(wfs))
assert.Contains(t, wfs, "testActivityMultipleArgs")

// assert activity function
fn, ok := w.GetActivityFunc("testActivityMultipleArgs")
assert.True(t, ok)
assert.Equal(t, reflect.Func, reflect.ValueOf(fn).Kind())
assert.Equal(t, getFunctionName(testActivityMultipleArgs), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}

type testErrorDetails struct {
Expand Down
25 changes: 25 additions & 0 deletions internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,31 @@ func (r *registry) RegisterActivityWithOptions(af interface{}, options RegisterA
}
}

func (r *registry) GetRegisteredWorkflows() []string {
return r.GetRegisteredWorkflowTypes()
}

func (r *registry) GetWorkflowFunc(registerName string) (interface{}, bool) {
return r.getWorkflowFn(registerName)
}

func (r *registry) GetRegisteredActivities() []string {
activities := r.getRegisteredActivities()
activityNames := make([]string, 0, len(activities))
for _, a := range activities {
activityNames = append(activityNames, a.ActivityType().Name)
}
return activityNames
}

func (r *registry) GetActivityFunc(registerName string) (interface{}, bool) {
a, ok := r.GetActivity(registerName)
if !ok {
return nil, false
}
return a.GetFunction(), ok
}

func (r *registry) registerActivityFunction(af interface{}, options RegisterActivityOptions) error {
fnType := reflect.TypeOf(af)
if err := validateFnFormat(fnType, false); err != nil {
Expand Down
9 changes: 6 additions & 3 deletions internal/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,16 @@ func TestWorkflowRegistration(t *testing.T) {
tt.register(r)

// Verify registered workflow type
workflowType := r.GetRegisteredWorkflowTypes()[0]
workflowType := r.GetRegisteredWorkflows()[0]
require.Equal(t, tt.workflowType, workflowType)

// Verify workflow is resolved from workflow type
_, ok := r.getWorkflowFn(tt.workflowType)
_, ok := r.GetWorkflowFunc(tt.workflowType)
require.True(t, ok)

// Verify workflow is resolved from alternative (backwards compatible) workflow type
if len(tt.altWorkflowType) > 0 {
_, ok = r.getWorkflowFn(tt.altWorkflowType)
_, ok = r.GetWorkflowFunc(tt.altWorkflowType)
require.True(t, ok)
}

Expand Down Expand Up @@ -228,10 +228,13 @@ func TestActivityRegistration(t *testing.T) {
// Verify registered activity type
activityType := r.getRegisteredActivities()[0].ActivityType().Name
require.Equal(t, tt.activityType, activityType, "activity type")
require.Equal(t, tt.activityType, r.GetRegisteredActivities()[0])

// Verify activity is resolved from activity type
_, ok := r.GetActivity(tt.activityType)
require.True(t, ok)
_, ok = r.GetActivityFunc(tt.activityType)
require.True(t, ok)

// Verify activity is resolved from alternative (backwards compatible) activity type
if len(tt.altActivityType) > 0 {
Expand Down
16 changes: 16 additions & 0 deletions internal/workflow_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ func (r *WorkflowReplayer) RegisterActivityWithOptions(a interface{}, options Re
r.registry.RegisterActivityWithOptions(a, options)
}

func (r *WorkflowReplayer) GetRegisteredWorkflows() []string {
return r.registry.GetRegisteredWorkflows()
}

func (r *WorkflowReplayer) GetWorkflowFunc(registerName string) (interface{}, bool) {
return r.registry.GetWorkflowFunc(registerName)
}

func (r *WorkflowReplayer) GetRegisteredActivities() []string {
return r.registry.GetRegisteredActivities()
}

func (r *WorkflowReplayer) GetActivityFunc(registerName string) (interface{}, bool) {
return r.registry.GetActivityFunc(registerName)
}

// ReplayWorkflowHistory executes a single decision task for the given history.
// Use for testing backwards compatibility of code changes and troubleshooting workflows in a debugger.
// The logger is an optional parameter. Defaults to the noop logger.
Expand Down
14 changes: 14 additions & 0 deletions internal/workflow_replayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"context"
"errors"
"fmt"
"reflect"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -139,6 +141,18 @@ func (s *workflowReplayerSuite) TestReplayWorkflowHistoryFromFile() {
s.NoError(err)
}

func (s *workflowReplayerSuite) TestActivityRegistration() {
name := "test-Activity"
s.replayer.RegisterActivityWithOptions(testActivityFunction, RegisterActivityOptions{Name: name})
a := s.replayer.GetRegisteredActivities()[0]
s.Equal(name, a)

fn, ok := s.replayer.GetActivityFunc(a)
s.True(ok)
s.Equal(reflect.Func, reflect.ValueOf(fn).Kind())
s.Equal(getFunctionName(testActivityFunction), runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}

func testReplayWorkflow(ctx Context) error {
ao := ActivityOptions{
ScheduleToStartTimeout: time.Second,
Expand Down
10 changes: 10 additions & 0 deletions internal/workflow_shadower.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,16 @@ func (s *WorkflowShadower) Stop() {
}
}

// GetRegisteredWorkflows retrieves the list of workflows registered on the worker
func (s *WorkflowShadower) GetRegisteredWorkflows() []string {
return s.replayer.GetRegisteredWorkflows()
}

// GetWorkflowFn returns the workflow function corresponding to the provided registerName
func (s *WorkflowShadower) GetWorkflowFunc(registerName string) (interface{}, bool) {
return s.replayer.GetWorkflowFunc(registerName)
}

func (s *WorkflowShadower) shadowWorker() error {
s.shutdownWG.Add(1)
defer s.shutdownWG.Done()
Expand Down
12 changes: 12 additions & 0 deletions internal/workflow_shadower_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ package internal
import (
"context"
"fmt"
"reflect"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -437,6 +439,16 @@ func (s *workflowShadowerSuite) TestShadowWorker_ExpectedReplayError() {
}
}

func (s *workflowShadowerSuite) TestWorkflowRegistration() {
wfName := s.testShadower.GetRegisteredWorkflows()[0]
fnName := getFunctionName(testReplayWorkflow)
s.Equal(fnName, wfName)

fn, ok := s.testShadower.GetWorkflowFunc(wfName)
s.True(ok)
s.Equal(fnName, runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name())
}

func newTestWorkflowExecutions(size int) []*shared.WorkflowExecutionInfo {
executions := make([]*shared.WorkflowExecutionInfo, size)
for i := 0; i != size; i++ {
Expand Down
30 changes: 30 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,20 @@ type (
// This method panics if workflowFunc doesn't comply with the expected format or tries to register the same workflow
// type name twice. Use workflow.RegisterOptions.DisableAlreadyRegisteredCheck to allow multiple registrations.
RegisterWorkflowWithOptions(w interface{}, options workflow.RegisterOptions)

// GetRegisteredWorkflows returns a list of all workflows registered on the worker.
// The returned workflow name is by default the method name. However, if the workflow was registered
// with options (see Worker.RegisterWorkflowWithOptions), the workflow may have customized name.
// For chained registries, this returns a combined list of all registered activities from the current
// instance to the global registry. In this case, the list may contain duplicate names.
GetRegisteredWorkflows() []string

// GetWorkflowFunc takes a name and returns the corresponding workflow
// function and a boolean value indicating whether the activity was found.
// It returns nil, false when no workflow was registered with the provided name.
// The registerName is the resolved name recorded on the registry after all options
// from workflow.RegisterOptions{} are applied.
GetWorkflowFunc(registerName string) (interface{}, bool)
}

// ActivityRegistry exposes activity registration functions to consumers.
Expand Down Expand Up @@ -131,6 +145,22 @@ type (
// which might be useful for integration tests.
// worker.RegisterActivityWithOptions(barActivity, RegisterActivityOptions{DisableAlreadyRegisteredCheck: true})
RegisterActivityWithOptions(a interface{}, options activity.RegisterOptions)

// GetRegisteredActivities returns the names of all activities registered on the worker.
// The activity name is by default the method name. However, if the activity was registered
// with options (see Worker.RegisterActivityWithOptions), the activity may have customized name.
// For example, struct pointer activities that were registered with the Name option activity.RegisterOptions{Name: ...}
// will have their method names prepended with the provided name option.
// For chained registries, this returns a combined list of all registered activities from the current
// instance to the global registry. In this case, the list may contain duplicate names.
GetRegisteredActivities() []string

// GetActivityFunc takes a name and returns the corresponding activity
// function and a boolean value indicating whether the activity was found.
// It returns nil, false when no activity was registered with the provided name.
// The registerName is the resolved name recorded on the registry after all options
// from activity.RegisterOptions{} are applied.
GetActivityFunc(registerName string) (interface{}, bool)
}

// WorkflowReplayer supports replaying a workflow from its event history.
Expand Down

0 comments on commit 13c2821

Please sign in to comment.