Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Update-with-Start #114

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ jobs:
- name: Run local scenario with worker
run: ./temporal-omes run-scenario-with-worker --scenario workflow_with_single_noop_activity --log-level debug --language go --embedded-server --iterations 5
- name: Build worker image
run: ./temporal-omes build-worker-image --language go --version v1.28.1 --tag-as-latest
run: ./temporal-omes build-worker-image --language go --version v1.29.0 --tag-as-latest
- name: Run worker image
run: docker run --rm --detach -i -p 10233:10233 omes:go-1.28.1 --scenario workflow_with_single_noop_activity --log-level debug --language go --run-id {{ github.run_id }} --embedded-server-address 0.0.0.0:10233
run: docker run --rm --detach -i -p 10233:10233 omes:go-1.29.0 --scenario workflow_with_single_noop_activity --log-level debug --language go --run-id {{ github.run_id }} --embedded-server-address 0.0.0.0:10233
- name: Run scenario against image
run: ./temporal-omes run-scenario --scenario workflow_with_single_noop_activity --log-level debug --server-address 127.0.0.1:10233 --run-id {{ github.run_id }} --connect-timeout 1m --iterations 5

Expand Down Expand Up @@ -201,7 +201,7 @@ jobs:
with:
do-push: true
as-latest: true
go-version: 'v1.28.1'
go-version: 'v1.29.0'
ts-version: 'v1.10.2'
java-version: 'v1.24.2'
py-version: 'v1.7.0'
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
github.com/temporalio/features v0.0.0-20240806202554-bdfe567c9d89
go.temporal.io/api v1.36.0
go.temporal.io/sdk v1.28.1
go.temporal.io/api v1.38.0
go.temporal.io/sdk v1.29.0
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New Go SDK is required.

go.uber.org/zap v1.27.0
golang.org/x/mod v0.20.0
golang.org/x/sync v0.8.0
golang.org/x/sys v0.23.0
golang.org/x/sys v0.24.0
google.golang.org/protobuf v1.34.2
)

Expand All @@ -36,9 +36,9 @@ require (
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.21.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/nexus-rpc/sdk-go v0.0.9 // indirect
github.com/nexus-rpc/sdk-go v0.0.10 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
Expand All @@ -48,8 +48,8 @@ require (
golang.org/x/net v0.28.0 // indirect
golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.6.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.65.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Expand Down
201 changes: 14 additions & 187 deletions go.sum

Large diffs are not rendered by default.

21 changes: 16 additions & 5 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,21 @@ impl<'a> Arbitrary<'a> for ClientAction {

impl<'a> Arbitrary<'a> for WithStartClientAction {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
Ok(Self {
variant: Some(with_start_client_action::Variant::DoSignal(signal_action)),
})
let action_kind = u.int_in_range(0..=1)?;
let variant = match action_kind {
0 => with_start_client_action::Variant::DoSignal({
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
signal_action
}),
1 => with_start_client_action::Variant::DoUpdate({
let mut update_action: DoUpdate = u.arbitrary()?;
update_action.with_start = true;
update_action
}),
_ => unreachable!(),
};
Ok(Self { variant: Some(variant) })
}
}

Expand Down Expand Up @@ -481,6 +491,7 @@ impl<'a> Arbitrary<'a> for DoUpdate {
Ok(Self {
variant: Some(variant),
failure_expected,
with_start: u.arbitrary()?,
})
}
}
Expand Down
71 changes: 47 additions & 24 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -75,6 +76,8 @@ func (e *ClientActionsExecutor) Start(
e.Handle, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
} else if sig := withStartAction.GetDoSignal(); sig != nil {
e.Handle, err = e.executeSignalAction(ctx, sig)
} else if upd := withStartAction.GetDoUpdate(); upd != nil {
e.Handle, err = e.executeUpdateAction(ctx, upd)
} else {
return fmt.Errorf("unsupported with_start_action: %v", withStartAction.String())
}
Expand Down Expand Up @@ -147,30 +150,7 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
_, err = e.executeSignalAction(ctx, sig)
return err
} else if update := action.GetDoUpdate(); update != nil {
var handle client.WorkflowUpdateHandle
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
})
} else if handler := update.GetCustom(); handler != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
})
} else {
return fmt.Errorf("do_update must recognizable variant")
}
if err == nil {
err = handle.Get(ctx, nil)
}
if update.FailureExpected {
err = nil
}
_, err = e.executeUpdateAction(ctx, update)
return err
} else if query := action.GetDoQuery(); query != nil {
if query.GetReportState() != nil {
Expand Down Expand Up @@ -212,3 +192,46 @@ func (e *ClientActionsExecutor) executeSignalAction(ctx context.Context, sig *Do
}
return nil, e.Client.SignalWorkflow(ctx, e.StartOptions.ID, "", signalName, signalArgs)
}

func (e *ClientActionsExecutor) executeUpdateAction(ctx context.Context, upd *DoUpdate) (run client.WorkflowRun, err error) {
var opts client.UpdateWorkflowOptions
if actionsUpdate := upd.GetDoActions(); actionsUpdate != nil {
opts = client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
}
} else if handler := upd.GetCustom(); handler != nil {
opts = client.UpdateWorkflowOptions{
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
}
} else {
return nil, fmt.Errorf("do_update must recognizable variant")
}

var handle client.WorkflowUpdateHandle
if upd.WithStart {
op := client.NewUpdateWithStartWorkflowOperation(opts)
startOpts := e.StartOptions
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a copy to avoid changing the shared options (since they are re-used).

startOpts.WithStartOperation = op
startOpts.WorkflowIDConflictPolicy = enumspb.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING
run, err = e.Client.ExecuteWorkflow(ctx, startOpts, e.WorkflowType, e.WorkflowInput)
if err == nil {
handle, err = op.Get(ctx)
}
} else {
handle, err = e.Client.UpdateWorkflow(ctx, opts)
}

if err == nil {
err = handle.Get(ctx, nil)
}
if upd.FailureExpected {
err = nil
}
return run, err
}
Loading
Loading