Skip to content

Commit

Permalink
Add Signal-with-Start
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Aug 15, 2024
1 parent dc7885b commit 78e8093
Show file tree
Hide file tree
Showing 8 changed files with 1,346 additions and 1,239 deletions.
13 changes: 13 additions & 0 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,21 @@ impl<'a> Arbitrary<'a> for TestInput {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
// We always want a client sequence
let mut client_sequence: ClientSequence = u.arbitrary()?;

// Sometimes we want a with_start_action
let with_start_action = if u.ratio(80, 100)? {
None
} else {
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
Some(ClientAction { variant: Some(client_action::Variant::DoSignal(signal_action)) })
};

let mut ti = Self {
// Input may or may not be present
workflow_input: u.arbitrary()?,
client_sequence: None,
with_start_action: with_start_action,
};

// Finally, return at the end
Expand Down Expand Up @@ -419,6 +430,7 @@ impl<'a> Arbitrary<'a> for DoSignal {
};
Ok(Self {
variant: Some(variant),
with_start: u.arbitrary()?,
})
}
}
Expand Down Expand Up @@ -778,6 +790,7 @@ fn mk_client_signal_action(actions: impl IntoIterator<Item = action::Variant>) -
)))
.into(),
)),
with_start: false,
})),
}
}
Expand Down
72 changes: 54 additions & 18 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/durationpb"
"time"
)

func NoOpSingleActivityActionSet() *ActionSet {
Expand Down Expand Up @@ -57,9 +58,31 @@ func ResourceConsumingActivity(bytesToAllocate uint64, cpuYieldEveryNIters uint3
}

type ClientActionsExecutor struct {
Client client.Client
WorkflowID string
RunID string
Client client.Client
StartOptions client.StartWorkflowOptions
WorkflowType string
WorkflowInput *WorkflowInput
Handle client.WorkflowRun
runID string
}

func (e *ClientActionsExecutor) Start(
ctx context.Context,
withStartAction *ClientAction,
) error {
var err error
if withStartAction == nil {
e.Handle, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
} else if sig := withStartAction.GetDoSignal(); sig != nil {
e.Handle, err = e.executeSignalAction(ctx, sig)
} else {
return fmt.Errorf("unsupported with_start_action: %v", withStartAction.String())
}
if err != nil {
return fmt.Errorf("failed to start kitchen sink workflow: %w", err)
}
e.runID = e.Handle.GetRunID()
return nil
}

func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clientSeq *ClientSequence) error {
Expand All @@ -68,7 +91,6 @@ func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clien
return err
}
}

return nil
}

Expand Down Expand Up @@ -103,13 +125,13 @@ func (e *ClientActionsExecutor) executeClientActionSet(ctx context.Context, acti
}
}
if actionSet.GetWaitForCurrentRunToFinishAtEnd() {
err := e.Client.GetWorkflow(ctx, e.WorkflowID, e.RunID).
err := e.Client.GetWorkflow(ctx, e.StartOptions.ID, e.runID).
GetWithOptions(ctx, nil, client.WorkflowRunGetOptions{DisableFollowingRuns: true})
var canErr *workflow.ContinueAsNewError
if err != nil && !errors.As(err, &canErr) {
return err
}
e.RunID = e.Client.GetWorkflow(ctx, e.WorkflowID, "").GetRunID()
e.runID = e.Client.GetWorkflow(ctx, e.StartOptions.ID, "").GetRunID()
}
return nil
}
Expand All @@ -122,26 +144,20 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action

var err error
if sig := action.GetDoSignal(); sig != nil {
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", "do_actions_signal", sigActions)
} else if handler := sig.GetCustom(); handler != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_signal must recognizable variant")
}
_, err = e.executeSignalAction(ctx, sig)
return err
} else if update := action.GetDoUpdate(); update != nil {
var handle client.WorkflowUpdateHandle
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
})
} else if handler := update.GetCustom(); handler != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
Expand All @@ -159,9 +175,9 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
} else if query := action.GetDoQuery(); query != nil {
if query.GetReportState() != nil {
// TODO: Use args
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", "report_state", nil)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", "report_state", nil)
} else if handler := query.GetCustom(); handler != nil {
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_query must recognizable variant")
}
Expand All @@ -176,3 +192,23 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
return fmt.Errorf("client action must be set")
}
}

func (e *ClientActionsExecutor) executeSignalAction(ctx context.Context, sig *DoSignal) (client.WorkflowRun, error) {
var signalName string
var signalArgs any
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
signalName = "do_actions_signal"
signalArgs = sigActions
} else if handler := sig.GetCustom(); handler != nil {
signalName = handler.Name
signalArgs = handler.Args
} else {
return nil, fmt.Errorf("do_signal must recognizable variant")
}

if sig.WithStart {

Check failure on line 209 in loadgen/kitchensink/helpers.go

View workflow job for this annotation

GitHub Actions / build-lint-test-dotnet

sig.WithStart undefined (type *DoSignal has no field or method WithStart)

Check failure on line 209 in loadgen/kitchensink/helpers.go

View workflow job for this annotation

GitHub Actions / build-lint-test-go

sig.WithStart undefined (type *DoSignal has no field or method WithStart)

Check failure on line 209 in loadgen/kitchensink/helpers.go

View workflow job for this annotation

GitHub Actions / build-lint-test-java

sig.WithStart undefined (type *DoSignal has no field or method WithStart)

Check failure on line 209 in loadgen/kitchensink/helpers.go

View workflow job for this annotation

GitHub Actions / build-lint-test-typescript

sig.WithStart undefined (type *DoSignal has no field or method WithStart)
return e.Client.SignalWithStartWorkflow(
ctx, e.StartOptions.ID, signalName, signalArgs, e.StartOptions, e.WorkflowType, e.WorkflowInput)
}
return nil, e.Client.SignalWorkflow(ctx, e.StartOptions.ID, "", signalName, signalArgs)
}
38 changes: 20 additions & 18 deletions loadgen/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package loadgen
import (
"context"
"fmt"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/temporalio/omes/loadgen/kitchensink"
"go.temporal.io/api/enums/v1"
"go.temporal.io/api/operatorservice/v1"

"go.temporal.io/sdk/client"
"go.uber.org/zap"

"github.com/temporalio/omes/loadgen/kitchensink"
)

type Scenario struct {
Expand Down Expand Up @@ -225,40 +227,40 @@ type KitchenSinkWorkflowOptions struct {
// completion ignoring its result. Concurrently it will perform any client actions specified in
// kitchensink.TestInput.ClientSequence
func (r *Run) ExecuteKitchenSinkWorkflow(ctx context.Context, options *KitchenSinkWorkflowOptions) error {
// Start the workflow
r.Logger.Debugf("Executing kitchen sink workflow with options: %v", options)
handle, err := r.Client.ExecuteWorkflow(
ctx, options.StartOptions, "kitchenSink", options.Params.WorkflowInput)
if err != nil {
return fmt.Errorf("failed to start kitchen sink workflow: %w", err)
}

clientSeq := options.Params.ClientSequence
cancelCtx, cancel := context.WithCancel(ctx)
defer cancel()

executor := &kitchensink.ClientActionsExecutor{
Client: r.Client,
StartOptions: options.StartOptions,
WorkflowType: "kitchenSink",
WorkflowInput: options.Params.GetWorkflowInput(),
}
startErr := executor.Start(ctx, options.Params.WithStartAction)
if startErr != nil {
return fmt.Errorf("failed to start kitchen sink workflow: %w", startErr)
}

var clientActionsErr error
clientSeq := options.Params.ClientSequence
if clientSeq != nil && len(clientSeq.ActionSets) > 0 {
executor := &kitchensink.ClientActionsExecutor{
Client: r.Client,
WorkflowID: handle.GetID(),
RunID: handle.GetRunID(),
}
go func() {
clientActionsErr = executor.ExecuteClientSequence(cancelCtx, clientSeq)
if clientActionsErr != nil {
r.Logger.Error("Client actions failed: ", clientActionsErr)
cancel()
// TODO: Remove or change to "always terminate when exiting early" flag
err := r.Client.TerminateWorkflow(
ctx, handle.GetID(), "", "client actions failed", nil)
ctx, options.StartOptions.ID, "", "client actions failed", nil)
if err != nil {
return
}
}
}()
}

executeErr := handle.Get(cancelCtx, nil)
executeErr := executor.Handle.Get(cancelCtx, nil)
if executeErr != nil {
return fmt.Errorf("failed to execute kitchen sink workflow: %w", executeErr)
}
Expand Down
Loading

0 comments on commit 78e8093

Please sign in to comment.