-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add json bulk stream #701
Conversation
Warning Rate limit exceeded@gfyrag has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 22 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (8)
WalkthroughThis pull request introduces a new JSON stream bulk handler for processing bulk JSON streaming requests along with its factory, termination, and channel management methods. It also refactors the existing script stream bulk handler to a text stream bulk handler, updating method names and constructors accordingly. Router configurations are modified to use a default bulk handler factory helper, and additional tests, including an end-to-end ledger bulk creation test using JSON streams, have been added to verify the new functionality. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Router
participant Factory
participant JSONHandler
participant Goroutine1 as JSON Decoder
participant Goroutine2 as Result Collector
Client->>Router: Submit bulk JSON stream request
Router->>Factory: Retrieve BulkHandlerFactory
Factory-->>Router: Return JSONStreamBulkHandler
Router->>JSONHandler: Invoke GetChannels
JSONHandler->>Goroutine1: Spawn JSON decoding routine
JSONHandler->>Goroutine2: Spawn result collection routine
Client->>Router: Send termination signal
Router->>JSONHandler: Invoke Terminate
JSONHandler-->>Router: Return aggregated results
Router-->>Client: Respond with JSON results
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🔭 Outside diff range comments (1)
internal/api/bulking/handler_stream_text.go (1)
33-33
: 💡 Verification agent🧩 Analysis chain
Verify the ParseTextStream function implementation.
The ParseTextStream function is called but not visible in the provided code. Please ensure it's properly implemented and handles all edge cases.
🏁 Script executed:
#!/bin/bash # Search for ParseTextStream implementation ast-grep --pattern 'func ParseTextStream($$$) $$$'Length of output: 8575
ParseTextStream Implementation Verified – Minor Header Format Check Suggested
The
ParseTextStream
function is implemented in internal/api/bulking/text_stream.go and appears to handle header processing, body reading, and scanner errors appropriately. However, please consider adding an explicit check to ensure that each header part (split on"="
) produces both a key and a value before accessingparts2[1]
to avoid potential index-out-of-range panics when malformed headers are encountered.
- Location:
internal/api/bulking/text_stream.go
(lines 12–83)- Suggestion: Validate the result of
strings.Split(part, "=")
to ensure it returns at least two elements before processing the header key.
🧹 Nitpick comments (5)
internal/api/bulking/handler_stream_json.go (1)
26-26
: Consider configuring the JSON decoder.The default decoder configuration might not be optimal for streaming use cases. Consider setting appropriate limits to prevent potential DoS attacks.
-dec := json.NewDecoder(r.Body) +dec := json.NewDecoder(r.Body) +dec.SetMaxTokenSize(1024 * 1024) // 1MB token size limitinternal/api/bulking/handler_stream_text_test.go (1)
72-73
: Consider making timeout duration configurable.The hardcoded 100ms timeout might be too aggressive for slower systems or debug scenarios.
+const testTimeout = 100 * time.Millisecond + select { case <-send: -case <-time.After(100 * time.Millisecond): +case <-time.After(testTimeout): t.Fatal("should have received send channel") }Also applies to: 81-82
internal/api/bulking/handler_stream_json_test.go (1)
83-85
: Enhance response validation.The test should validate the actual content of the response, not just its length.
response, ok := api.DecodeSingleResponse[[]APIResult](t, w.Result().Body) require.True(t, ok) require.Len(t, response, testCase.expectScriptCount) +for i, result := range response { + require.Equal(t, "CREATE_TRANSACTION", result.Action) + require.NotEmpty(t, result.Data) +}internal/api/v2/routes.go (1)
140-140
: Consider extracting the magic number.The bulk max size of 100 is hardcoded. Consider using the DefaultBulkMaxSize constant from router.go for consistency.
- WithDefaultBulkHandlerFactories(100), + WithDefaultBulkHandlerFactories(DefaultBulkMaxSize),test/e2e/api_bulk_test.go (1)
199-259
: LGTM! Good test coverage for the new JSON stream functionality.The test properly verifies the successful creation of transactions using the JSON stream bulk handler.
Consider adding test cases for:
- Error handling when JSON is malformed
- Behavior with empty stream
- Handling of the bulk max size limit
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
internal/api/bulking/handler_stream_json.go
(1 hunks)internal/api/bulking/handler_stream_json_test.go
(1 hunks)internal/api/bulking/handler_stream_text.go
(3 hunks)internal/api/bulking/handler_stream_text_test.go
(1 hunks)internal/api/router.go
(1 hunks)internal/api/v2/routes.go
(1 hunks)test/e2e/api_bulk_test.go
(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Tests
🔇 Additional comments (6)
internal/api/bulking/handler_stream_text.go (2)
8-15
: LGTM! The struct definition is well-organized.The TextStreamBulkHandler struct has a clear separation of concerns with fields for channel management, results tracking, and error handling.
23-48
: LGTM! The goroutines are well-structured.The implementation correctly:
- Handles context cancellation
- Uses proper channel closing with defer
- Manages error propagation
Also applies to: 49-63
internal/api/bulking/handler_stream_json.go (1)
8-15
: LGTM! The struct and factory implementations are clean.The JSONStreamBulkHandler and its factory follow the same pattern as TextStreamBulkHandler, maintaining consistency in the codebase.
Also applies to: 76-77
internal/api/bulking/handler_stream_text_test.go (1)
55-55
: LGTM! The handler instantiation is correctly updated.The change from NewScriptStreamBulkHandler to NewTextStreamBulkHandler is consistent with the refactoring.
internal/api/router.go (1)
82-82
: LGTM! Good refactoring.The change simplifies the bulk handler factory configuration by using a default implementation, making the code more maintainable and reducing duplication.
internal/api/v2/routes.go (1)
129-135
: LGTM! Well-structured bulk handler factories configuration.The implementation:
- Adds support for JSON stream content type
- Maintains backward compatibility
- Follows good practices by encapsulating factory logic
nextElement := &BulkElement{} | ||
err := dec.Decode(nextElement) | ||
if err != nil { | ||
h.err = err | ||
return | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance JSON error handling.
The current implementation doesn't distinguish between EOF and other JSON decoding errors. Consider adding specific handling for io.EOF to gracefully terminate the stream.
nextElement := &BulkElement{}
err := dec.Decode(nextElement)
if err != nil {
+ if err == io.EOF {
+ return
+ }
h.err = err
return
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
nextElement := &BulkElement{} | |
err := dec.Decode(nextElement) | |
if err != nil { | |
h.err = err | |
return | |
} | |
nextElement := &BulkElement{} | |
err := dec.Decode(nextElement) | |
if err != nil { | |
if err == io.EOF { | |
return | |
} | |
h.err = err | |
return | |
} |
for _, testCase := range []testCase{ | ||
{ | ||
name: "nominal", | ||
stream: ` | ||
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 100, "asset": "USD", "destination": "bank"}]}} | ||
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 200, "asset": "USD", "destination": "bank"}]}} | ||
`, | ||
expectScriptCount: 2, | ||
}, | ||
} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add more test cases for error scenarios.
The test suite would benefit from additional test cases covering:
- Invalid JSON syntax
- Missing required fields
- Malformed actions
- Empty stream
for _, testCase := range []testCase{
{
name: "nominal",
stream: `
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 100, "asset": "USD", "destination": "bank"}]}}
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 200, "asset": "USD", "destination": "bank"}]}}
`,
expectScriptCount: 2,
},
+ {
+ name: "invalid_json",
+ stream: `{"invalid json`,
+ expectedError: true,
+ expectedStatusCode: http.StatusBadRequest,
+ },
+ {
+ name: "missing_action",
+ stream: `{"data": {}}`,
+ expectedError: true,
+ expectedStatusCode: http.StatusBadRequest,
+ },
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for _, testCase := range []testCase{ | |
{ | |
name: "nominal", | |
stream: ` | |
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 100, "asset": "USD", "destination": "bank"}]}} | |
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 200, "asset": "USD", "destination": "bank"}]}} | |
`, | |
expectScriptCount: 2, | |
}, | |
} { | |
for _, testCase := range []testCase{ | |
{ | |
name: "nominal", | |
stream: ` | |
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 100, "asset": "USD", "destination": "bank"}]}} | |
{"action": "CREATE_TRANSACTION", "data": {"postings": [{"source": "world", "amount": 200, "asset": "USD", "destination": "bank"}]}} | |
`, | |
expectScriptCount: 2, | |
}, | |
{ | |
name: "invalid_json", | |
stream: `{"invalid json`, | |
expectedError: true, | |
expectedStatusCode: http.StatusBadRequest, | |
}, | |
{ | |
name: "missing_action", | |
stream: `{"data": {}}`, | |
expectedError: true, | |
expectedStatusCode: http.StatusBadRequest, | |
}, | |
} { |
8f320b3
to
51cd04a
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #701 +/- ##
==========================================
+ Coverage 81.55% 81.67% +0.11%
==========================================
Files 131 132 +1
Lines 7086 7137 +51
==========================================
+ Hits 5779 5829 +50
- Misses 1004 1006 +2
+ Partials 303 302 -1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
internal/api/bulking/handler_stream_json.go (2)
8-15
: Add a doc comment for clarity and maintainability.Consider adding a top-level comment describing the concurrency model (channels, goroutines, etc.) that this struct implements, along with usage guidelines about reading from and closing channels. This helps future maintainers quickly grasp the intended usage.
64-70
: Gracefully handle early termination scenario.Currently, if
r.Context()
is canceled before<-h.terminated
, the function exits without writing a response, possibly leaving clients without a final status. Consider sending a partial or error response to notify clients that the stream ended prematurely.internal/api/bulking/handler_stream_text_test.go (1)
87-91
: Consider increasing the timeout or handling slow I/O gracefully.Relying on a fixed 100ms limit for channel closure might cause flakiness in slower environments. You could consider using a higher timeout or using a more robust synchronization approach (like a WaitGroup).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
internal/api/bulking/bulker_test.go
(2 hunks)internal/api/bulking/handler_stream_json.go
(1 hunks)internal/api/bulking/handler_stream_json_test.go
(1 hunks)internal/api/bulking/handler_stream_text.go
(3 hunks)internal/api/bulking/handler_stream_text_test.go
(2 hunks)internal/api/router.go
(1 hunks)internal/api/v2/routes.go
(1 hunks)test/e2e/api_bulk_test.go
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
- internal/api/router.go
- test/e2e/api_bulk_test.go
- internal/api/v2/routes.go
- internal/api/bulking/handler_stream_json_test.go
- internal/api/bulking/handler_stream_text.go
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: Tests
🔇 Additional comments (3)
internal/api/bulking/handler_stream_json.go (1)
33-38
: Distinguish between EOF and other JSON decoding errors.This mirrors a previously raised suggestion to differentiate between
io.EOF
(a normal stream shutdown) and other errors—e.g., partial decoding failures—to avoid marking an orderly end of stream as an error. For instance:if err != nil { + if errors.Is(err, io.EOF) { + return + } h.err = err return }internal/api/bulking/handler_stream_text_test.go (1)
55-55
: Constructor usage looks good.The handler instantiation aligns with the updated naming convention.
internal/api/bulking/bulker_test.go (1)
108-110
: Pointer usage for log ID is valid.Using
pointer.For(1)
is a clear, explicit way to initialize the ID pointer. This ensures the test check is unambiguous about the expected ID value.
go func() { | ||
defer close(h.channel) | ||
|
||
dec := json.NewDecoder(r.Body) | ||
|
||
for { | ||
select { | ||
case <-r.Context().Done(): | ||
return | ||
default: | ||
nextElement := &BulkElement{} | ||
err := dec.Decode(nextElement) | ||
if err != nil { | ||
h.err = err | ||
return | ||
} | ||
|
||
h.actions = append(h.actions, nextElement.GetAction()) | ||
h.channel <- *nextElement | ||
} | ||
} | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Address potential data race during concurrent writes to shared fields.
The fields actions
, results
, and err
are appended or updated in separate goroutines. Meanwhile, Terminate()
reads them after <-h.terminated
. Although one goroutine closes h.terminated
, there's no guarantee the decoding goroutine is also finished by that time. This can lead to a data race.
A common fix is to use synchronization (e.g., a sync.WaitGroup
) to ensure all goroutines are done before reading shared state in Terminate()
. For example:
func (h *JSONStreamBulkHandler) GetChannels(...){
var wg sync.WaitGroup
+ wg.Add(2) // for the two goroutines
go func(){
defer wg.Done()
// decode loop ...
}()
go func(){
defer wg.Done()
// results loop ...
}()
// later: store wg somewhere in h for use in Terminate()
}
func (h *JSONStreamBulkHandler) Terminate(...){
+ // wait for goroutines to finish before writing the response
+ h.wg.Wait()
writeJSONResponse(...)
}
Also applies to: 46-59
51cd04a
to
c991ecb
Compare
Fixes LX-1