Skip to content

Conversation

@andrewwormald
Copy link
Collaborator

@andrewwormald andrewwormald commented Nov 12, 2025

This MR allows users to call SaveAndRepeat which saves the object and inserts another event to ensure the consumer processes it again. It will go to the back of the queue by design. This has benefits such as store reader replication timing for those using record stores that have replication. This also allows long running processes to not block shorter ones and forces the user to consider this due to the design limit. Hoping the user would opt for batching it and calling SaveAndRepeat every N iterations instead of every iteration.

This can help with workloads that are large and need breaking down with progress being saved.

A boy scout here is the fixing of context deadline exceeded. Workflow handles context cancelled but was not properly handling deadlines for the context and now does so including the testing functions. Previously it would infinitely spin if it could not find something even when a timeout is set. This is fixed here.

@coderabbitai
Copy link

coderabbitai bot commented Nov 12, 2025

Walkthrough

The PR adds a SaveAndRepeat path to Run, introduces an unexported skipType and new sentinel skipTypeSaveAndRepeat, and updates code to use the new skipType values. The updater gains an early-return branch for save-and-repeat that persists the current status without transition validation. Graph gains reserved-node support via a new New(reservedNodes ...int) constructor and reservedNodes field with panicking checks. Tests and test helpers are extended for SaveAndRepeat and updated error propagation; DeadlineExceeded is treated like cancellation in workflow run logic. Several constants and test call sites were adjusted to the new unexported names.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

  • status.go / run.go: rename of exported SkipType → skipType and added skipTypeSaveAndRepeat — review API surface and call-site correctness.
  • update.go: new early-return save-and-repeat path that bypasses transition validation — verify state/descriptor preservation and persistence correctness.
  • internal/graph/graph.go: new constructor and reserved-node enforcement — verify panic messages and backward compatibility for graph.New usages.
  • testing.go and tests: waitFor signature change and new testingRunStateController SaveAndRepeat — check all test call sites and error propagation.
  • callback.go: suspicious change to skip logging expression (skipUpdate(next).) — review for a stray token/typo.

Possibly related PRs

  • outbox: Allow disabling of outbox #148 — introduces graph.New(reservedNodes...) and reserved-node enforcement; highly related to the graph constructor and reserved-node behaviour added here.

Suggested reviewers

  • echarrod

Poem

"I nibble keys and hop through code,
I save, I repeat, I lighten the load,
Reserved nodes fenced with care,
Loops converge with steady flair,
A rabbit's cheer for changes bold 🐇"

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 5.41% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarises the main feature addition—enabling SaveAndRepeat functionality for standard workflow steps—which aligns with the primary changeset objective.
Description check ✅ Passed The description comprehensively explains the SaveAndRepeat feature, its benefits, and the context deadline bug fix, all of which are reflected in the changeset across multiple files.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch andreww-allowSaveAndContinue

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai bot requested review from ScaleneZA and echarrod November 12, 2025 17:17
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
record.go (1)

82-105: Initialise Meta.UpdateType when persisting

We add Meta.UpdateType to describe what kind of write just happened, but nothing in the persistence path sets it. As written the field will always stay at the zero value (or worse, carry a stale value forward after a state-only update), so consumers cannot rely on it. Please set it explicitly when we store records—default to UpdateTypeDefault for normal writes and override to UpdateTypeStateOnly on run-state-only transitions—so downstream readers get the intended signal.

Apply this diff to set the default during normal updates:

 func updateRecord(
 	ctx context.Context,
 	store storeFunc,
 	record *Record,
 	previousRunState RunState,
 	statusDescription string,
 ) error {
 	// Push run state changes for observability
 	metrics.RunStateChanges.WithLabelValues(record.WorkflowName, previousRunState.String(), record.RunState.String()).
 		Inc()

+	record.Meta.UpdateType = UpdateTypeDefault
 	record.Meta.StatusDescription = statusDescription
 	// Increment the version by 1.
 	record.Meta.Version++

And mark run-state-only changes before we store them:

 func (rsc *runStateControllerImpl) update(ctx context.Context, rs RunState, reason string) error {
 	valid, ok := runStateTransitions[rsc.record.RunState]
 	if !ok || !valid[rs] {
 		return fmt.Errorf("invalid RunState: from %s | to %s", rsc.record.RunState, rs)
 	}

 	previousRunState := rsc.record.RunState
 	rsc.record.RunState = rs
 	rsc.record.Meta.RunStateReason = reason
+	rsc.record.Meta.UpdateType = UpdateTypeStateOnly
 	return updateRecord(ctx, rsc.store, rsc.record, previousRunState, rsc.record.Meta.StatusDescription)
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 12ec56b and 2c540c9.

📒 Files selected for processing (8)
  • record.go (2 hunks)
  • run.go (1 hunks)
  • runstate.go (1 hunks)
  • status.go (2 hunks)
  • testing.go (7 hunks)
  • update.go (1 hunks)
  • workflow.go (2 hunks)
  • workflow_test.go (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
run.go (1)
status.go (1)
  • SkipTypeSaveAndRepeat (32-32)
workflow_test.go (7)
builder.go (4)
  • NewBuilder (25-43)
  • WithClock (307-311)
  • WithDefaultOptions (329-338)
  • WithDebugMode (315-319)
adapters/memrecordstore/memrecordstore.go (2)
  • New (17-64)
  • WithClock (86-90)
adapters/memrolescheduler/memrolescheduler.go (1)
  • New (41-45)
adapters/memstreamer/memstreamer.go (2)
  • New (12-33)
  • WithClock (41-45)
options.go (1)
  • PollingFrequency (40-44)
outbox.go (2)
  • WithOutboxOptions (65-74)
  • OutboxPollingFrequency (78-82)
testing.go (1)
  • Require (90-135)
testing.go (4)
record.go (1)
  • Record (10-48)
unmarshal.go (1)
  • Unmarshal (6-8)
workflow.go (1)
  • Workflow (60-103)
store.go (1)
  • TestingRecordStore (33-37)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2c540c9 and fcbc80f.

📒 Files selected for processing (7)
  • record.go (2 hunks)
  • record_test.go (1 hunks)
  • status_test.go (1 hunks)
  • testing.go (7 hunks)
  • testing_test.go (1 hunks)
  • update_test.go (1 hunks)
  • workflow_test.go (2 hunks)
✅ Files skipped from review due to trivial changes (1)
  • testing_test.go
🧰 Additional context used
🧬 Code graph analysis (4)
testing.go (5)
record.go (1)
  • Record (10-48)
unmarshal.go (1)
  • Unmarshal (6-8)
adapters/memrecordstore/memrecordstore.go (1)
  • Store (94-109)
workflow.go (1)
  • Workflow (60-103)
store.go (1)
  • TestingRecordStore (33-37)
status_test.go (1)
status.go (4)
  • SkipType (27-27)
  • SkipTypeDefault (30-30)
  • SkipTypeRunStateUpdate (31-31)
  • SkipTypeSaveAndRepeat (32-32)
workflow_test.go (5)
builder.go (4)
  • NewBuilder (25-43)
  • WithClock (307-311)
  • WithDefaultOptions (329-338)
  • WithDebugMode (315-319)
adapters/memrecordstore/memrecordstore.go (2)
  • New (17-64)
  • WithClock (86-90)
adapters/memstreamer/memstreamer.go (2)
  • New (12-33)
  • WithClock (41-45)
outbox.go (2)
  • WithOutboxOptions (65-74)
  • OutboxPollingFrequency (78-82)
testing.go (1)
  • Require (90-135)
record_test.go (1)
record.go (3)
  • UpdateType (87-87)
  • UpdateTypeDefault (92-92)
  • UpdateTypeStateOnly (94-94)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
status_internal_test.go (1)

9-13: Consider expanding test coverage for skip helper functions.

Whilst the test validates the numeric mappings correctly, it doesn't test the helper functions introduced in status_internal.go (skipUpdate, isSaveAndRepeat, skipUpdateDescription). Consider adding test cases to verify:

  • skipUpdate returns true for all skip types and false for non-skip values
  • isSaveAndRepeat correctly identifies the SaveAndRepeat condition
  • skipUpdateDescription returns appropriate descriptions for each skip type

Example test structure:

func TestSkipHelpers(t *testing.T) {
	// Test skipUpdate
	require.True(t, skipUpdate(testStatus(skipTypeDefault)))
	require.True(t, skipUpdate(testStatus(skipTypeRunStateUpdate)))
	require.True(t, skipUpdate(testStatus(skipTypeSaveAndRepeat)))
	require.False(t, skipUpdate(testStatus(statusStart)))
	
	// Test isSaveAndRepeat
	require.True(t, isSaveAndRepeat(testStatus(skipTypeSaveAndRepeat)))
	require.False(t, isSaveAndRepeat(testStatus(skipTypeDefault)))
	
	// Test skipUpdateDescription
	desc := skipUpdateDescription(testStatus(skipTypeDefault))
	require.Contains(t, desc, "skip")
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 411404f and 933e05f.

📒 Files selected for processing (7)
  • callback_internal_test.go (1 hunks)
  • run.go (2 hunks)
  • run_internal_test.go (1 hunks)
  • status_internal.go (1 hunks)
  • status_internal_test.go (1 hunks)
  • step_internal_test.go (1 hunks)
  • timeout_internal_test.go (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • run.go
🧰 Additional context used
🧬 Code graph analysis (1)
run_internal_test.go (1)
testing.go (1)
  • NewTestingRun (213-236)
🔇 Additional comments (7)
timeout_internal_test.go (1)

132-132: LGTM – Updated to use internal skip constant.

The change correctly updates the test to use the new camelCase internal constant skipTypeDefault, aligning with the broader refactoring of skip type handling in this PR.

step_internal_test.go (1)

164-164: LGTM – Consistent with internal constant refactoring.

The change correctly updates to the new camelCase internal constant, consistent with the pattern applied across test files.

callback_internal_test.go (1)

139-139: LGTM – Aligned with new skip constant naming.

The change is consistent with the refactoring to use internal camelCase skip constants.

run_internal_test.go (1)

1-29: LGTM – Refactored to use internal types and white-box testing.

The changes correctly update the test to:

  • Use the internal workflow package scope (white-box testing) to access unexported types
  • Reference local NewTestingRun and testStatus types directly
  • Use internal skipTypeRunStateUpdate constant

This approach is appropriate for internal testing of the workflow package.

status_internal.go (3)

3-7: LGTM – Well-designed generic interface for status types.

The StatusType interface provides appropriate type constraints for integer-based status enums whilst requiring a String() method for descriptive output. This enables type-safe generic functions whilst maintaining flexibility across different integer sizes.


27-33: LGTM – Clear skip type constants.

The skipType constants are well-defined with distinct negative values to avoid conflicts with normal status values. The use of negative integers is a sensible design choice for sentinel values.


9-12: The review comment is incorrect; no changes needed.

The design intentionally separates concerns: skipUpdate checks for statuses that should skip the entire update cycle (held in skipConfig), whilst SaveAndRepeat is handled separately in update.go where it skips transition validation but still updates the record. SaveAndRepeat is not included in skipConfig because it must proceed through the update logic to receive special handling. When any handler returns SaveAndRepeat, skipUpdate(next) correctly returns false, execution continues to update.go, and isSaveAndRepeat(next) correctly identifies and processes it.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7521913 and a4271b7.

📒 Files selected for processing (2)
  • callback.go (1 hunks)
  • status.go (1 hunks)
🧰 Additional context used
🪛 GitHub Actions: Run
callback.go

[error] 84-84: syntax error: unexpected comma, expected name or (.

🪛 GitHub Check: core (1.24)
callback.go

[failure] 84-84:
syntax error: unexpected comma, expected name or (


[failure] 84-84:
syntax error: unexpected comma, expected name or (

🪛 GitHub Check: core (1)
callback.go

[failure] 84-84:
syntax error: unexpected comma, expected name or (


[failure] 84-84:
syntax error: unexpected comma, expected name or (

🔇 Additional comments (7)
status.go (7)

9-11: LGTM!

The refactored logic correctly identifies traditional skip cases (default and run state update) whilst explicitly excluding SaveAndRepeat, which is appropriate since SaveAndRepeat is a distinct operation that still requires record persistence.


13-15: LGTM!

Clean helper function that provides clear semantic meaning for identifying SaveAndRepeat operations.


17-24: LGTM!

The function correctly uses the unexported skipType and provides appropriate fallback messaging for unknown skip types.


26-26: LGTM!

Making skipType unexported is good encapsulation that prevents external code from creating arbitrary skip type values.


28-33: LGTM!

The skip type constants are well-defined with distinct values. Using negative values avoids collision with actual status values, and the sentinel pattern at -3 provides a clear boundary marker.


35-42: LGTM!

The iteration correctly enumerates all actual skip type values (0, -1, -2) whilst excluding the sentinel boundary marker.


44-49: LGTM!

The skip configuration map now includes all three skip types with clear, descriptive entries. The SaveAndRepeat entry addresses the previous review comment and provides appropriate documentation for the new operation.

@sonarqubecloud
Copy link

@andrewwormald andrewwormald merged commit 97e3cbb into main Nov 13, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants