Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command for workflow update #200

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2bb5a7a
Add update command
samanbarghi Apr 18, 2023
882a98a
Use sdkclient instead of frontend client
samanbarghi Apr 19, 2023
ae2d812
Add e2e test for update
samanbarghi Apr 19, 2023
8d784f8
Update tests/workflows/update/update.go
samanbarghi Apr 19, 2023
a63058d
Update workflow/workflow.go
samanbarghi Apr 19, 2023
b5630ac
Use high level sdk client APIs and remove WaitPolicy for now
samanbarghi Apr 20, 2023
15388af
Enable update in dev server and uncomment test
samanbarghi Apr 20, 2023
1e12db5
Handle result from update comand and print it out
samanbarghi Apr 21, 2023
0cf8748
Add new tests and modify e2e suite
samanbarghi Apr 21, 2023
ffd221f
Stop server after suite is done
samanbarghi Apr 21, 2023
3ccfa6e
Simplify workflow update test
samanbarghi Apr 21, 2023
c3b8637
Add update-id flag to get result of updates
samanbarghi Apr 21, 2023
60f35e8
PR comments
samanbarghi Apr 25, 2023
fcd10ce
Fix lint issues
samanbarghi Apr 26, 2023
d4c3aaf
Remove update-id flag
samanbarghi Apr 26, 2023
de5d539
fix unbound variable (#215)
samanbarghi Apr 26, 2023
a8bc242
Use camel case
samanbarghi Apr 26, 2023
1850598
Add lint to makefile (#218)
samanbarghi Apr 26, 2023
acdee89
Update tests/workflows/update/update.go
samanbarghi Apr 26, 2023
d24034f
Remove extra error checks from test
samanbarghi Apr 26, 2023
18bc9ad
PR comments
samanbarghi Apr 26, 2023
b40878c
Remove success message
samanbarghi Apr 26, 2023
fa6695e
Use local app and server for each test
samanbarghi Apr 26, 2023
859b7cc
Merge branch 'main' into features/add-update-workflow-command
samanbarghi Apr 26, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions common/defs-cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
DeleteWorkflowDefinition = "Deletes a Workflow Execution."
ResetWorkflowDefinition = "Resets a Workflow Execution by Event ID or reset type."
TraceWorkflowDefinition = "Trace progress of a Workflow Execution and its children."
UpdateWorkflowDefinition = "Updates a running workflow synchronously."

// Activity subcommand definitions
CompleteActivityDefinition = "Completes an Activity Execution."
Expand Down Expand Up @@ -197,7 +198,6 @@ Workflow terminations require a valid [Workflow ID](/concepts/what-is-a-workflow

Use the options listed below to change termination behavior.`


const ResetWorkflowUsageText = `The ` + "`" + `temporal workflow reset` + "`" + ` command resets a [Workflow Execution](/concepts/what-is-a-workflow-execution).
A reset allows the Workflow to resume from a certain point without losing its parameters or [Event History](/concepts/what-is-an-event-history).

Expand Down Expand Up @@ -547,6 +547,10 @@ const WorkflowTraceUsageText = `The ` + "`" + `temporal workflow trace` + "`" +

Use the options listed below to change the command's behavior.`

const WorkflowUpdateUsageText = `The ` + "`" + `temporal workflow update` + "`" + ` command synchronously updates a running [Workflow Execution](/concepts/what-is-a-workflow-execution).
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved

Use the options listed below to change the command's behavior.`

const ServerUsageText = `Server commands allow you to start and manage the [Temporal Server](/concepts/what-is-the-temporal-server) from the command line.

Currently, ` + "`" + `cli` + "`" + ` server functionality extends to starting the Server.
Expand All @@ -571,4 +575,4 @@ USAGE:

DISPLAY OPTIONS:
{{template "visibleFlagTemplate" .}}{{end}}
`
`
180 changes: 92 additions & 88 deletions common/defs-flags.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ var (
FlagUICodecEndpoint = "ui-codec-endpoint"
FlagUIPort = "ui-port"
FlagUnpause = "unpause"
FlagUpdateWaitPolicy = "wait-policy"
FlagUpdateFirstExecutionRunID = "first-execution-run-id"
FlagUpdateID = "update-id"
FlagVisibilityArchivalState = "visibility-archival-state"
FlagVisibilityArchivalURI = "visibility-uri"
FlagWorkflowExecutionTimeout = "execution-timeout"
Expand Down
16 changes: 14 additions & 2 deletions tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,22 @@ func TestClientIntegrationSuite(t *testing.T) {

func (s *e2eSuite) SetupSuite() {
s.app = app.BuildApp()
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{})
server, err := testsuite.StartDevServer(context.Background(), testsuite.DevServerOptions{
ExtraArgs: []string{
// server logs are too noisy, limit server logs
"--log-level", "error",
//TODO: remove this flag when update workflow is enabled in the server by default
"--dynamic-config-value", "frontend.enableUpdateWorkflowExecution=true",
Comment on lines +42 to +43
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be enabled here. Let it rely on default value.

},
})
s.NoError(err)
s.ts = server
}

func (s *e2eSuite) TearDownSuite() {
err := s.ts.Stop()
s.NoError(err)

}

func (s *e2eSuite) SetupTest() {
Expand All @@ -51,10 +61,12 @@ func (s *e2eSuite) SetupTest() {
})
s.writer = &MemWriter{}
s.app.Writer = s.writer

// noop exiter to prevent the app from exiting mid test
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved
cli.OsExiter = func(code int) { return }
}

func (s *e2eSuite) TearDownTest() {
s.ts.Stop()
}

func (s *e2eSuite) NewWorker(taskQueue string, registerFunc func(registry worker.Registry)) worker.Worker {
Expand Down
57 changes: 55 additions & 2 deletions tests/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package tests

import (
"context"
"fmt"
"io/ioutil"
"math/rand"
"os"
"strconv"

"github.com/pborman/uuid"
"github.com/temporalio/cli/tests/workflows/helloworld"
"go.temporal.io/sdk/client"
"github.com/temporalio/cli/tests/workflows/update"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
)

Expand All @@ -25,7 +29,7 @@ func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {

wfr, err := c.ExecuteWorkflow(
context.Background(),
client.StartWorkflowOptions{TaskQueue: testTq},
sdkclient.StartWorkflowOptions{TaskQueue: testTq},
helloworld.Workflow,
"world",
)
Expand All @@ -51,3 +55,52 @@ func (s *e2eSuite) TestWorkflowShow_ReplayableHistory() {
err = replayer.ReplayWorkflowHistoryFromJSONFile(nil, historyFile)
s.NoError(err)
}

func (s *e2eSuite) TestWorkflowUpdate() {
c := s.ts.Client()

s.NewWorker(testTq, func(r worker.Registry) {
r.RegisterWorkflow(update.Counter)
})
randomInt := rand.Intn(100)
wfr, err := c.ExecuteWorkflow(
context.Background(),
sdkclient.StartWorkflowOptions{TaskQueue: testTq},
update.Counter,
randomInt,
)
s.NoError(err)
signalWorkflow := func() {
// send a Signal to stop the workflow
err = c.SignalWorkflow(context.Background(), wfr.GetID(), wfr.GetRunID(), update.Done, nil)
s.NoError(err)
}

defer signalWorkflow()

// successful update with wait policy Completed, should show the result
err = s.app.Run([]string{"", "workflow", "update", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", strconv.Itoa(randomInt)})
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved
s.NoError(err)
want := fmt.Sprintf(": %v", randomInt)
s.Contains(s.writer.GetContent(), want)

// successful update with wait policy Completed, passing first-execution-run-id
err = s.app.Run([]string{"", "workflow", "update", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1", "--first-execution-run-id", wfr.GetRunID()})
s.NoError(err)

// update rejected, when name is not available
err = s.app.Run([]string{"", "workflow", "update", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "-i", "1"})
s.Error(err)
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved
s.ErrorContains(err, "Required flag \"name\" not set")

// update rejected, wrong workflowID
err = s.app.Run([]string{"", "workflow", "update", "--workflow-id", "non-existent-ID", "--run-id", wfr.GetRunID(), "--name", update.FetchAndAdd, "-i", "1"})
s.Error(err)
s.ErrorContains(err, "update workflow failed")

// update rejected, wrong update name
err = s.app.Run([]string{"", "workflow", "update", "--workflow-id", wfr.GetID(), "--run-id", wfr.GetRunID(), "--name", "non-existent-name", "-i", "1"})
s.Error(err)
s.ErrorContains(err, "update workflow failed: unknown update")

}
44 changes: 44 additions & 0 deletions tests/workflows/update/update.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package update

import (
"fmt"

"go.temporal.io/sdk/workflow"
)

const (
FetchAndAdd = "fetch_and_add"
Done = "done"
)

func Counter(ctx workflow.Context, val int) (int, error) {
log := workflow.GetLogger(ctx)
counter := val

if err := workflow.SetUpdateHandlerWithOptions(
ctx,
FetchAndAdd,
func(ctx workflow.Context, i int) (int, error) {
tmp := counter
counter += i
log.Info("counter updated", "added", i, "new-value", counter)
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved
return tmp, nil
},
workflow.UpdateHandlerOptions{Validator: nonNegative},
); err != nil {
return 0, err
}

_ = workflow.GetSignalChannel(ctx, Done).Receive(ctx, nil)
return counter, ctx.Err()
samanbarghi marked this conversation as resolved.
Show resolved Hide resolved
}

func nonNegative(ctx workflow.Context, i int) error {
log := workflow.GetLogger(ctx)
if i < 0 {
log.Debug("Rejecting negative update", "addend", i)
return fmt.Errorf("addend must be non-negative (%v)", i)
}
log.Debug("Accepting update", "addend", i)
return nil
}
Loading