Skip to content

Conversation

@andrewwormald
Copy link
Collaborator

Adding a sqlite adapter for single host with disk access usage. Essentially like the in-mem adapters but has persistence to disk.

@coderabbitai
Copy link

coderabbitai bot commented Nov 19, 2025

Walkthrough

A complete SQLite adapter for a workflow library is introduced, comprising eleven new files. The adapter implements three core interfaces: RecordStore, TimeoutStore, and EventStreamer. Key components include database connection handling with optimised PRAGMA settings, a five-table SQLite schema (workflow_records, workflow_outbox, workflow_timeouts, workflow_events, workflow_cursors), full CRUD operations with transactional integrity, cursor-based event streaming with retry logic, and accompanying test coverage. Configuration via go.mod and documentation in README.md are also provided.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

  • store.go: Dynamic SQL query building via whereBuilder with transaction management, filter composition, and data marshalling (JSON meta field); requires careful examination of SQL construction logic and transaction rollback paths.
  • streamer.go: Intricate retry logic with exponential-ish backoff (up to 5 retries), cursor position tracking across multiple tables, JSON header serialisation/deserialisation, and polling mechanisms; potential race conditions in concurrent cursor updates.
  • Concurrency considerations: EventReceiver and EventSender cursor management under concurrent access; StreamFromLatest flag handling and event position tracking.
  • Error propagation: Contextual error wrapping throughout; verify all DB operations include appropriate error checks and recovery paths.
  • Schema and query alignment: Verify that query predicates, indexing strategy, and data types align across schema.sql, store.go, streamer.go, and timeout.go.

Suggested reviewers

  • echarrod
  • ScaleneZA

Poem

🐰 A SQLite burrow, freshly dug,
Where workflow dreams find solid ground,
With cursors hopping, events logged,
Through transactions, timeouts round—
One adapter true, now safely bound! 🌾✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 14.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: adding a SQLite adapter to the adapters package.
Description check ✅ Passed The description is directly related to the changeset, explaining that a SQLite adapter is being added for single-host disk-persistent usage similar to in-memory adapters.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch andreww-addSQLLiteAdapter

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@andrewwormald andrewwormald force-pushed the andreww-addSQLLiteAdapter branch from a7612ea to 866a2b9 Compare November 19, 2025 09:07
@sonarqubecloud
Copy link

@andrewwormald andrewwormald marked this pull request as ready for review November 20, 2025 16:21
@coderabbitai coderabbitai bot requested review from ScaleneZA and echarrod November 20, 2025 16:22
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
adapters/sqlite/store_test.go (1)

23-63: Consider using sqlite.Open and sqlite.InitSchema for consistency.

The connectForTesting helper duplicates logic from sqlite.Open and loads the schema from schema.sql, creating duplication with sqlite.InitSchema. Additionally, the busy_timeout is set to 10 seconds here (line 36) but 5 seconds in sqlite.Open (line 24 of sqlite.go).

Consider refactoring to use the existing functions:

 func connectForTesting(t *testing.T) *sql.DB {
 	tempDir := t.TempDir()
 	dbPath := filepath.Join(tempDir, "test.db")
 
-	db, err := sql.Open("sqlite", dbPath)
+	db, err := sqlite.Open(dbPath)
 	if err != nil {
-		t.Fatalf("failed to open database: %v", err)
+		t.Fatalf("failed to open database: %v", err)
 	}
 
-	// Configure SQLite for better concurrency
-	pragmas := []string{
-		"PRAGMA journal_mode=WAL",   // Enable Write-Ahead Logging
-		"PRAGMA synchronous=NORMAL", // Good balance of safety and performance
-		"PRAGMA busy_timeout=10000", // Wait up to 10 seconds for locks
-		"PRAGMA cache_size=10000",   // Increase cache size
-		"PRAGMA temp_store=MEMORY",  // Store temporary tables in memory
-	}
-
-	for _, pragma := range pragmas {
-		if _, err := db.Exec(pragma); err != nil {
-			t.Fatalf("failed to set pragma %s: %v", pragma, err)
-		}
-	}
-
-	// Create schema
-	schemaPath := filepath.Join(getPackageDir(t), "schema.sql")
-	schemaSQL, err := os.ReadFile(schemaPath)
-	if err != nil {
-		t.Fatalf("failed to read schema: %v", err)
-	}
-
-	if _, err := db.Exec(string(schemaSQL)); err != nil {
+	if err := sqlite.InitSchema(db); err != nil {
 		t.Fatalf("failed to create schema: %v", err)
 	}
 
 	t.Cleanup(func() {
 		db.Close()
 	})
 
 	return db
 }
-
-func getPackageDir(t *testing.T) string {
-	wd, err := os.Getwd()
-	if err != nil {
-		t.Fatalf("failed to get working directory: %v", err)
-	}
-	return wd
-}

This eliminates duplication and ensures tests use the same configuration as production code.

adapters/sqlite/schema.sql (1)

1-78: Schema duplication with sqlite.go.

The schema defined in this file is duplicated in sqlite.go lines 43-119 within the InitSchema function. This duplication creates a maintenance burden and risks inconsistency between the two definitions.

Consider one of these approaches:

  1. Embed the schema file (recommended): Use //go:embed to embed schema.sql into the binary and read it in InitSchema:
//go:embed schema.sql
var schemaSQL string

func InitSchema(db *sql.DB) error {
	if _, err := db.Exec(schemaSQL); err != nil {
		return fmt.Errorf("init schema: %w", err)
	}
	return nil
}
  1. Remove schema.sql: Keep only the inline schema in sqlite.go if the separate file isn't needed for external tooling.

The first approach maintains a single source of truth whilst keeping the schema available as a separate file for documentation or external database tools.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1ca7779 and 866a2b9.

⛔ Files ignored due to path filters (1)
  • adapters/sqlite/go.sum is excluded by !**/*.sum
📒 Files selected for processing (11)
  • adapters/sqlite/README.md (1 hunks)
  • adapters/sqlite/example_test.go (1 hunks)
  • adapters/sqlite/go.mod (1 hunks)
  • adapters/sqlite/schema.sql (1 hunks)
  • adapters/sqlite/sqlite.go (1 hunks)
  • adapters/sqlite/store.go (1 hunks)
  • adapters/sqlite/store_test.go (1 hunks)
  • adapters/sqlite/streamer.go (1 hunks)
  • adapters/sqlite/streamer_test.go (1 hunks)
  • adapters/sqlite/timeout.go (1 hunks)
  • adapters/sqlite/timeout_test.go (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
adapters/sqlite/timeout_test.go (2)
adapters/adaptertest/timeoutstore.go (1)
  • RunTimeoutStoreTest (14-23)
adapters/sqlite/timeout.go (2)
  • TimeoutStore (12-14)
  • NewTimeoutStore (16-18)
adapters/sqlite/example_test.go (7)
adapters/sqlite/sqlite.go (2)
  • Open (11-39)
  • InitSchema (42-126)
adapters/sqlite/store.go (1)
  • NewRecordStore (20-22)
adapters/sqlite/timeout.go (1)
  • NewTimeoutStore (16-18)
adapters/sqlite/streamer.go (1)
  • NewEventStreamer (17-19)
runstate.go (2)
  • RunState (9-9)
  • RunStateInitiated (13-13)
record.go (1)
  • Meta (51-77)
eventstreamer.go (4)
  • Header (34-34)
  • HeaderTopic (39-39)
  • HeaderWorkflowName (37-37)
  • HeaderForeignID (38-38)
adapters/sqlite/store_test.go (2)
adapters/adaptertest/recordstore.go (1)
  • RunRecordStoreTest (19-32)
adapters/sqlite/store.go (2)
  • RecordStore (16-18)
  • NewRecordStore (20-22)
adapters/sqlite/timeout.go (2)
timeout.go (1)
  • TimeoutRecord (12-21)
errors.go (1)
  • ErrTimeoutNotFound (7-7)
adapters/sqlite/streamer_test.go (2)
adapters/adaptertest/eventstreaming.go (1)
  • RunEventStreamerTest (44-187)
adapters/sqlite/streamer.go (2)
  • EventStreamer (13-15)
  • NewEventStreamer (17-19)
adapters/sqlite/streamer.go (2)
eventstreamer.go (5)
  • ReceiverOption (51-51)
  • ReceiverOptions (46-49)
  • Header (34-34)
  • Ack (32-32)
  • StreamFromLatest (62-66)
event.go (1)
  • Event (13-28)
adapters/sqlite/store.go (6)
errors.go (2)
  • ErrRecordNotFound (6-6)
  • ErrOutboxRecordNotFound (9-9)
runstate.go (1)
  • RunState (9-9)
record.go (1)
  • Meta (51-77)
event.go (2)
  • MakeOutboxEventData (75-125)
  • OutboxEvent (47-60)
order.go (1)
  • OrderType (3-3)
filter.go (2)
  • RecordFilter (107-107)
  • MakeFilter (11-18)
🪛 LanguageTool
adapters/sqlite/README.md

[uncategorized] ~76-~76: Loose punctuation mark.
Context: ...les automatically: - workflow_records: Store workflow execution records - `wor...

(UNLIKELY_OPENING_PUNCTUATION)


[grammar] ~120-~120: The word “timeout” is a noun. The verb is spelled with a space.
Context: ...**: The full RunEventStreamerTest may timeout in high-concurrency scenarios due to SQ...

(NOUN_VERB_CONFUSION)


[locale-violation] ~157-~157: License must be spelled with a “c” when used as a noun in British English. Use “licence”.
Context: ... documentation for any new features ## License Same as the workflow library.

(LICENCE_LICENSE_NOUN_SINGULAR)

🔇 Additional comments (11)
adapters/sqlite/streamer_test.go (1)

12-17: LGTM!

The test correctly uses the shared test harness pattern and properly constructs the SQLite-backed EventStreamer. This approach ensures consistency across adapter implementations.

adapters/sqlite/example_test.go (3)

15-34: LGTM!

The setup and initialization sequence correctly demonstrates the adapter usage pattern: opening the database with optimised settings, initialising the schema, and constructing the adapters.


37-53: LGTM!

The RecordStore usage correctly demonstrates storing and retrieving workflow records with appropriate field values.


55-76: LGTM!

The TimeoutStore and EventStreamer examples correctly demonstrate the key operations with proper resource management.

adapters/sqlite/README.md (1)

1-159: LGTM! Documentation is comprehensive and clear.

The README effectively documents the SQLite adapter, including its features, limitations, and appropriate use cases. The warnings about SQLite's concurrency limitations are particularly important for users.

The static analysis tool flagged a few minor language issues (line 76 punctuation, line 120 "timeout" vs "time out", line 157 "license" vs "licence"), but these are optional refinements and don't affect the clarity of the documentation.

adapters/sqlite/store_test.go (1)

16-21: LGTM!

The test correctly uses the shared test harness pattern.

adapters/sqlite/timeout_test.go (1)

12-17: LGTM!

The test correctly uses the shared test harness pattern and properly constructs the SQLite-backed TimeoutStore.

adapters/sqlite/sqlite.go (3)

34-36: LGTM! Single connection pool is appropriate for SQLite.

The single connection configuration (SetMaxOpenConns(1) and SetMaxIdleConns(1)) correctly enforces SQLite's single-writer limitation and prevents connection pool contention.


17-32: LGTM! PRAGMA configuration is well-optimised.

The PRAGMA settings appropriately configure SQLite for workflow usage with WAL mode, balanced synchronisation, and reasonable timeout values. Enabling foreign keys (line 22) is good practice even though the current schema doesn't define foreign key relationships.


41-126: Schema duplication noted.

The inline schema here duplicates schema.sql. This was already flagged in the schema.sql review. The schema definition itself is correct with appropriate indexes and IF NOT EXISTS guards.

adapters/sqlite/go.mod (1)

3-3: The original review comment is incorrect.

Go 1.25.4 is the latest stable release as of November 2025. However, the original concern about the version format is based on flawed reasoning. Go fully supports semantic versioning with patch versions (e.g., 1.24.2), so this format is entirely valid. The version 1.24.2 represents a legitimate Go release.

While using Go 1.24.2 instead of the newer 1.25.4 may warrant a separate discussion about version currency or compatibility, the original review's premise that the format itself is unusual or non-standard is incorrect.

Likely an incorrect or invalid review comment.

Comment on lines +174 to +176
ack := func() error {
return r.setCursor(ctx, event.ID)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid closing over a cancelling context in the ack closure.

If the caller invokes Recv with a short-lived or cancellable context, that context may expire before they call Ack. Because the closure reuses ctx, setCursor then returns ctx.Err(), so the cursor never advances and the same event is replayed indefinitely. Please decouple the acknowledgement path from the receive context—e.g. spin up a fresh, bounded context before writing the cursor.

-	ack := func() error {
-		return r.setCursor(ctx, event.ID)
-	}
+	ack := func() error {
+		ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+		defer cancel()
+		return r.setCursor(ackCtx, event.ID)
+	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
ack := func() error {
return r.setCursor(ctx, event.ID)
}
ack := func() error {
ackCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return r.setCursor(ackCtx, event.ID)
}
🤖 Prompt for AI Agents
In adapters/sqlite/streamer.go around lines 174 to 176, the ack closure
currently captures and uses the caller-provided ctx which may be cancelled
before Ack is invoked; change the closure to create and use a fresh context
(e.g. context.WithTimeout(context.Background(), shortTimeout) or
context.Background() with appropriate timeout) for the call to r.setCursor,
defer-cancel that context, and return any error from setCursor; this decouples
ack from the receive context and ensures the cursor write is attempted even if
the original ctx has expired.

@andrewwormald andrewwormald marked this pull request as draft November 20, 2025 16:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants