Skip to content

Commit

Permalink
Add Signal-with-Start
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos committed Aug 15, 2024
1 parent dc7885b commit 13cf1fe
Show file tree
Hide file tree
Showing 10 changed files with 1,767 additions and 1,316 deletions.
11 changes: 10 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,16 @@ jobs:
run: cargo build
- name: Check diff
run: |
[[ -z $(git status --porcelain loadgen/kitchensink/ workers/python/protos/) ]] || (git diff; echo "Protos changed"; false) 1>&2
git config --global core.safecrlf false
git diff > generator.diff
git diff --exit-code
- name: Upload generator diff
uses: actions/upload-artifact@v4
if: always()
with:
name: generator-diff
path: generator.diff
if-no-files-found: ignore

push-latest-docker-images:
uses: ./.github/workflows/all-docker-images.yml
Expand Down
13 changes: 13 additions & 0 deletions loadgen/kitchen-sink-gen/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,21 @@ impl<'a> Arbitrary<'a> for TestInput {
fn arbitrary(u: &mut Unstructured<'a>) -> arbitrary::Result<Self> {
// We always want a client sequence
let mut client_sequence: ClientSequence = u.arbitrary()?;

// Sometimes we want a with_start_action
let with_start_action = if u.ratio(80, 100)? {
None
} else {
let mut signal_action: DoSignal = u.arbitrary()?;
signal_action.with_start = true;
Some(ClientAction { variant: Some(client_action::Variant::DoSignal(signal_action)) })
};

let mut ti = Self {
// Input may or may not be present
workflow_input: u.arbitrary()?,
client_sequence: None,
with_start_action: with_start_action,
};

// Finally, return at the end
Expand Down Expand Up @@ -419,6 +430,7 @@ impl<'a> Arbitrary<'a> for DoSignal {
};
Ok(Self {
variant: Some(variant),
with_start: u.arbitrary()?,
})
}
}
Expand Down Expand Up @@ -778,6 +790,7 @@ fn mk_client_signal_action(actions: impl IntoIterator<Item = action::Variant>) -
)))
.into(),
)),
with_start: false,
})),
}
}
Expand Down
72 changes: 54 additions & 18 deletions loadgen/kitchensink/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/workflow"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/durationpb"
"time"
)

func NoOpSingleActivityActionSet() *ActionSet {
Expand Down Expand Up @@ -57,9 +58,31 @@ func ResourceConsumingActivity(bytesToAllocate uint64, cpuYieldEveryNIters uint3
}

type ClientActionsExecutor struct {
Client client.Client
WorkflowID string
RunID string
Client client.Client
StartOptions client.StartWorkflowOptions
WorkflowType string
WorkflowInput *WorkflowInput
Handle client.WorkflowRun
runID string
}

func (e *ClientActionsExecutor) Start(
ctx context.Context,
withStartAction *ClientAction,
) error {
var err error
if withStartAction == nil {
e.Handle, err = e.Client.ExecuteWorkflow(ctx, e.StartOptions, e.WorkflowType, e.WorkflowInput)
} else if sig := withStartAction.GetDoSignal(); sig != nil {
e.Handle, err = e.executeSignalAction(ctx, sig)
} else {
return fmt.Errorf("unsupported with_start_action: %v", withStartAction.String())
}
if err != nil {
return fmt.Errorf("failed to start kitchen sink workflow: %w", err)
}
e.runID = e.Handle.GetRunID()
return nil
}

func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clientSeq *ClientSequence) error {
Expand All @@ -68,7 +91,6 @@ func (e *ClientActionsExecutor) ExecuteClientSequence(ctx context.Context, clien
return err
}
}

return nil
}

Expand Down Expand Up @@ -103,13 +125,13 @@ func (e *ClientActionsExecutor) executeClientActionSet(ctx context.Context, acti
}
}
if actionSet.GetWaitForCurrentRunToFinishAtEnd() {
err := e.Client.GetWorkflow(ctx, e.WorkflowID, e.RunID).
err := e.Client.GetWorkflow(ctx, e.StartOptions.ID, e.runID).
GetWithOptions(ctx, nil, client.WorkflowRunGetOptions{DisableFollowingRuns: true})
var canErr *workflow.ContinueAsNewError
if err != nil && !errors.As(err, &canErr) {
return err
}
e.RunID = e.Client.GetWorkflow(ctx, e.WorkflowID, "").GetRunID()
e.runID = e.Client.GetWorkflow(ctx, e.StartOptions.ID, "").GetRunID()
}
return nil
}
Expand All @@ -122,26 +144,20 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action

var err error
if sig := action.GetDoSignal(); sig != nil {
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", "do_actions_signal", sigActions)
} else if handler := sig.GetCustom(); handler != nil {
err = e.Client.SignalWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_signal must recognizable variant")
}
_, err = e.executeSignalAction(ctx, sig)
return err
} else if update := action.GetDoUpdate(); update != nil {
var handle client.WorkflowUpdateHandle
if actionsUpdate := update.GetDoActions(); actionsUpdate != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: "do_actions_update",
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{actionsUpdate},
})
} else if handler := update.GetCustom(); handler != nil {
handle, err = e.Client.UpdateWorkflow(ctx, client.UpdateWorkflowOptions{
WorkflowID: e.WorkflowID,
WorkflowID: e.StartOptions.ID,
UpdateName: handler.Name,
WaitForStage: client.WorkflowUpdateStageCompleted,
Args: []any{handler.Args},
Expand All @@ -159,9 +175,9 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
} else if query := action.GetDoQuery(); query != nil {
if query.GetReportState() != nil {
// TODO: Use args
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", "report_state", nil)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", "report_state", nil)
} else if handler := query.GetCustom(); handler != nil {
_, err = e.Client.QueryWorkflow(ctx, e.WorkflowID, "", handler.Name, handler.Args)
_, err = e.Client.QueryWorkflow(ctx, e.StartOptions.ID, "", handler.Name, handler.Args)
} else {
return fmt.Errorf("do_query must recognizable variant")
}
Expand All @@ -176,3 +192,23 @@ func (e *ClientActionsExecutor) executeClientAction(ctx context.Context, action
return fmt.Errorf("client action must be set")
}
}

func (e *ClientActionsExecutor) executeSignalAction(ctx context.Context, sig *DoSignal) (client.WorkflowRun, error) {
var signalName string
var signalArgs any
if sigActions := sig.GetDoSignalActions(); sigActions != nil {
signalName = "do_actions_signal"
signalArgs = sigActions
} else if handler := sig.GetCustom(); handler != nil {
signalName = handler.Name
signalArgs = handler.Args
} else {
return nil, fmt.Errorf("do_signal must recognizable variant")
}

if sig.WithStart {
return e.Client.SignalWithStartWorkflow(
ctx, e.StartOptions.ID, signalName, signalArgs, e.StartOptions, e.WorkflowType, e.WorkflowInput)
}
return nil, e.Client.SignalWorkflow(ctx, e.StartOptions.ID, "", signalName, signalArgs)
}
Loading

0 comments on commit 13cf1fe

Please sign in to comment.