-
Notifications
You must be signed in to change notification settings - Fork 91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
async execution handling #4683
async execution handling #4683
Conversation
Warning Rate limit exceeded@wdbaruni has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 17 minutes and 33 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. WalkthroughThe changes in this pull request involve significant modifications across various files related to bidding strategies, compute nodes, and event handling. Key updates include the simplification of response structures by removing unnecessary fields, adjustments to method signatures, and the introduction of new components for handling execution events. The removal of certain strategies and constants reflects a shift in how events and bids are processed. Additionally, enhancements in error handling and logging levels have been made, particularly in the context of NATS messaging and the compute node's functionality. Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 26
🧹 Outside diff range and nitpick comments (33)
pkg/bidstrategy/fixed.go (1)
10-14
: LGTM! The simplification aligns with the async execution changes.The removal of the
wait
parameter andShouldWait
field streamlines the bid strategy response structure, making it more focused and maintainable.Consider extracting the common response to reduce duplication:
func NewFixedBidStrategy(response bool) *CallbackBidStrategy { + fixedResponse := BidStrategyResponse{ShouldBid: response} return &CallbackBidStrategy{ OnShouldBid: func(_ context.Context, _ BidStrategyRequest) (BidStrategyResponse, error) { - return BidStrategyResponse{ShouldBid: response}, nil + return fixedResponse, nil }, OnShouldBidBasedOnUsage: func( context.Context, BidStrategyRequest, models.Resources) (BidStrategyResponse, error) { - return BidStrategyResponse{ShouldBid: response}, nil + return fixedResponse, nil }, } }Also applies to: 16-18
pkg/bidstrategy/type.go (1)
16-17
: Consider documenting the removal of ShouldWait.The removal of the
ShouldWait
field suggests a change in the bid strategy logic. Consider adding a comment explaining why waiting is no longer a valid state and how async execution is now handled.type BidStrategyResponse struct { + // ShouldBid indicates whether the node should bid on the job. + // Note: As of PR #4683, waiting is no longer a valid state due to + // the introduction of async execution handling. ShouldBid bool `json:"ShouldBid"` Reason string `json:"Reason"` }pkg/nats/proxy/callback_handler.go (1)
Line range hint
68-82
: Consider adding context timeout for async callback processingThe
processCallback
function launches goroutines without timeout control. While this provides good async processing, it could lead to goroutine leaks if callbacks hang.Consider adding context timeout control:
func processCallback[Request any]( ctx context.Context, msg *nats.Msg, f callbackHandler[Request]) { request := new(Request) err := json.Unmarshal(msg.Data, request) if err != nil { log.Ctx(ctx).Error().Msgf("error decoding %s: %s", reflect.TypeOf(request), err) return } - go f(ctx, *request) + go func() { + // Create timeout context for the callback + callbackCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + f(callbackCtx, *request) + }() }pkg/bidstrategy/semantic/external_exec.go (1)
Line range hint
58-62
: Avoid redundant data transmission.The same JSON data is being passed both via environment variable and stdin. This is redundant and might cause confusion for command implementations.
Choose one method of data transmission. Since stdin is more standard for passing data to commands, consider this diff:
cmd := exec.Command("bash", "-c", s.command) //nolint:gosec cmd.Env = []string{ - "BACALHAU_JOB_SELECTION_PROBE_DATA=" + string(jsonData), "PATH=" + os.Getenv("PATH"), } cmd.Stdin = strings.NewReader(string(jsonData))
pkg/compute/logstream/server.go (2)
29-34
: Add documentation for the server structWhile the fields are well documented in ServerParams, consider adding a doc comment for the server struct itself to maintain consistency with Go's documentation practices.
+// server implements the Server interface and handles log streaming operations type server struct {
Line range hint
49-75
: Enhance error messages for better debuggingWhile the error handling is comprehensive, consider making the error messages more detailed to aid in debugging.
- return nil, fmt.Errorf("can't stream logs for completed execution: %s", request.ExecutionID) + return nil, fmt.Errorf("can't stream logs for execution %s in terminal state: %s", request.ExecutionID, execution.ComputeState)pkg/bidstrategy/semantic/external_http_test.go (1)
71-71
: LGTM! Consider adding more assertions for the new type.The change from
JobSelectionPolicyProbeData
toBidStrategyRequest
aligns with the codebase simplification. However, we're only validating the Job ID field of the request payload.Consider adding assertions for other relevant fields in the
BidStrategyRequest
structure to ensure complete payload validation:// this makes sure that the http payload was given to the http endpoint require.Equal(t, request.Job.ID, requestPayload.Job.ID) +// validate other relevant fields +require.Equal(t, request.Job.Spec, requestPayload.Job.Spec) +require.Equal(t, request.Resources, requestPayload.Resources)pkg/test/compute/ask_for_bid_test.go (2)
98-103
: Consider using %v for enum string formatting.The state verification logic is good, but when formatting enum values in error messages, using
%v
instead of%s
is generally preferred in Go as it handles custom String() methods more reliably.- s.Equal(expectedState, execution.ComputeState.StateType, - "expected execution state %s but got %s", expectedState, execution.ComputeState.StateType) + s.Equal(expectedState, execution.ComputeState.StateType, + "expected execution state %v but got %v", expectedState, execution.ComputeState.StateType)
96-103
: Consider standardizing error handling across test suites.These improvements to error handling and state verification are valuable. Consider applying similar patterns across other test suites in the codebase for consistency, especially:
- Using
Require().NoError()
for critical setup operations- Explicit state verification with detailed error messages
- Clear separation of expected vs actual values in assertions
pkg/bidstrategy/chained.go (1)
88-91
: Consider enhancing error context in resource-based biddingThe logic simplification looks good. However, we could improve the error context by including the resource usage details in the log message when a strategy decides not to bid.
if !response.ShouldBid { status = "should not bid" + log.Ctx(ctx).Debug(). + Interface("usage", usage). + Msgf("resource usage contributed to no-bid decision") }pkg/bidstrategy/semantic/external_http.go (2)
Line range hint
56-60
: Fix error handling in NewRequestWithContextThe error from
NewRequestWithContext
is logged but not properly propagated, which could mask HTTP request creation failures.Apply this fix:
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.url, body) -req.Header.Add("Content-Type", "application/json") if err != nil { - log.Ctx(ctx).Error().Msgf("could not create http request with context: %s", s.url) + return bidstrategy.BidStrategyResponse{}, fmt.Errorf("ExternalHTTPStrategy: failed to create request for %s: %w", s.url, err) } +req.Header.Add("Content-Type", "application/json")
Line range hint
82-89
: Potential race condition in content length validationThe content length check followed by the read operation is not atomic. The response body could grow between these operations, potentially bypassing the size limit.
Consider using
io.LimitReader
to enforce the size limit during read:-buf := make([]byte, resp.ContentLength) -read, err := resp.Body.Read(buf) +limitedReader := io.LimitReader(resp.Body, int64(marshaller.MaxSerializedStringInput)) +buf, err := io.ReadAll(limitedReader) if err != nil { - if !errors.Is(err, io.EOF) { - return bidstrategy.BidStrategyResponse{}, errors.Wrap(err, "error reading http response") - } -} else if int64(read) < resp.ContentLength { - return bidstrategy.BidStrategyResponse{}, fmt.Errorf("only read %d, expecting %d", read, resp.ContentLength) + return bidstrategy.BidStrategyResponse{}, fmt.Errorf("error reading response from %s: %w", s.url, err) }pkg/compute/endpoint.go (2)
50-52
: Consider adding telemetry for state transitions.The removal of telemetry spans reduces observability. Consider adding structured logging or metrics for state transitions, especially for the WaitForApproval path.
if !request.WaitForApproval { + log.Ctx(ctx).Debug(). + Str("execution_id", request.Execution.ID). + Msg("Setting execution state to running - no approval required") request.Execution.DesiredState.StateType = models.ExecutionDesiredStateRunning }
54-62
: Enhance error handling specificity.The error handling could be more specific about what type of store error occurred. Consider wrapping the error with additional context.
if err := s.executionStore.CreateExecution(ctx, *request.Execution); err != nil { - log.Ctx(ctx).Error().Err(err).Msg("Error creating execution") + log.Ctx(ctx).Error(). + Err(err). + Str("execution_id", request.Execution.ID). + Str("job_id", request.Execution.JobID). + Msg("Failed to create execution in store") return messages.AskForBidResponse{ExecutionMetadata: messages.ExecutionMetadata{ ExecutionID: request.Execution.ID, JobID: request.Execution.JobID, - }}, err + }}, fmt.Errorf("failed to create execution in store: %w", err) }pkg/compute/bidder_test.go (2)
125-128
: Consider enhancing the assertion error message.The current error message could be more descriptive to help debug test failures.
- s.Equal(tt.expectedExecutionState, updatedExecution.ComputeState.StateType, - "expected execution state %s but got %s", tt.expectedExecutionState, updatedExecution.ComputeState.StateType) + s.Equal(tt.expectedExecutionState, updatedExecution.ComputeState.StateType, + "[%s] expected execution state to be %s but got %s", tt.name, tt.expectedExecutionState, updatedExecution.ComputeState.StateType)
133-151
: Consider adding more comprehensive assertions.While the test verifies the basic state transition, it could be enhanced to provide better coverage:
s.Equal(models.ExecutionStateAskForBidAccepted, updatedExecution.ComputeState.StateType) + // Verify that the pending desired state is preserved + s.Equal(models.ExecutionDesiredStatePending, updatedExecution.DesiredState.StateType) + // Verify that no error state is set + s.Empty(updatedExecution.ComputeState.Error)pkg/compute/watchers/callback_forwarder_test.go (1)
19-51
: Add documentation to the test suite struct and setup method.While the implementation is clean, adding documentation would improve maintainability:
+// CallbackForwarderTestSuite tests the CallbackForwarder's event handling capabilities +// by mocking the compute callbacks and tracking their invocations. type CallbackForwarderTestSuite struct { suite.Suite forwarder *CallbackForwarder mock compute.CallbackMock bidResult messages.BidResult runResult messages.RunResult computeError messages.ComputeError bidCalled bool runCalled bool failureCalled bool } +// SetupTest initializes the test suite before each test by resetting callback flags +// and configuring mock handlers for bid completion, run completion, and compute failure. func (suite *CallbackForwarderTestSuite) SetupTest() {pkg/test/compute/setup_test.go (1)
57-87
: Consider enhancing cleanup error handling.While the cleanup sequence is well-structured, consider these improvements:
- Reuse the context from the test suite instead of creating a new one
- Add error handling for cleanup operations that might fail
Here's a suggested improvement:
-func (s *ComputeSuite) TearDownTest() { - ctx := context.Background() +func (s *ComputeSuite) TearDownTest() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + var errs []error // Clean up channels if s.bidChannel != nil { close(s.bidChannel) } // Clean up node if s.node != nil { - s.node.Cleanup(ctx) + if err := s.node.Cleanup(ctx); err != nil { + errs = append(errs, fmt.Errorf("node cleanup: %w", err)) + } } // Clean up NATS connections if s.natsClient != nil { s.natsClient.Close() } if s.natsServer != nil { s.natsServer.Shutdown() } // Clean up system resources if s.cm != nil { - s.cm.Cleanup(ctx) + if err := s.cm.Cleanup(ctx); err != nil { + errs = append(errs, fmt.Errorf("system cleanup: %w", err)) + } } // Clean up config data directory if s.config.BacalhauConfig.DataDir != "" { - os.RemoveAll(s.config.BacalhauConfig.DataDir) + if err := os.RemoveAll(s.config.BacalhauConfig.DataDir); err != nil { + errs = append(errs, fmt.Errorf("config cleanup: %w", err)) + } } + + for _, err := range errs { + s.T().Logf("Cleanup error: %v", err) + } }pkg/compute/management_client.go (1)
Line range hint
132-139
: Consider enhancing error observability for async resource updates.While the logging change is good, the async nature of resource updates warrants additional observability considerations:
- The error handling could benefit from more context about the failed resource update
- Consider adding metrics to track failed resource updates for better monitoring
Example enhancement:
func (m *ManagementClient) updateResources(ctx context.Context) { request := messages.UpdateResourcesRequest{ NodeID: m.nodeID, AvailableCapacity: m.availableCapacityTracker.GetAvailableCapacity(ctx), QueueUsedCapacity: m.queueUsageTracker.GetUsedCapacity(ctx), } log.Ctx(ctx).Trace().Msgf("Sending updated resources: %+v", request) _, err := m.managementProxy.UpdateResources(ctx, request) if err != nil { - log.Ctx(ctx).Warn().Err(err).Msg("failed to send resource update to requester node") + log.Ctx(ctx).Warn(). + Err(err). + Str("node_id", m.nodeID). + Interface("capacity", request.AvailableCapacity). + Interface("queue_usage", request.QueueUsedCapacity). + Msg("failed to send resource update to requester node") + // TODO: Add metric for failed resource updates + // metrics.IncCounter("resource_update_failures_total") } }pkg/lib/watcher/watcher.go (2)
62-82
: Consider handling edge case for empty event store.The implementation is well-structured with proper error handling. However, consider handling the edge case where
GetLatestEventNum
returns 0, which could be ambiguous between an empty store and a valid sequence number.Consider adding explicit handling:
if initial.Type == EventIteratorLatest { latestSeqNum, err := w.store.GetLatestEventNum(ctx) if err != nil { return EventIterator{}, err } + // Handle empty store case explicitly + if latestSeqNum == 0 { + return initial, nil + } return AfterSequenceNumberIterator(latestSeqNum), nil }
245-245
: Consider adding final state information to stop log.While the log message is clear, it could be more informative by including the final state of the watcher (last processed sequence number, etc.).
Consider enhancing the log message:
-log.Ctx(ctx).Debug().Str("watcher_id", w.id).Msg("watcher stopped") +log.Ctx(ctx).Debug(). + Str("watcher_id", w.id). + Uint64("last_processed_seq", w.lastProcessedSeqNum). + Time("last_processed_time", w.lastProcessedEventTime). + Msg("watcher stopped")pkg/executor/docker/handler.go (2)
Line range hint
124-133
: Enhance OOM error message with memory limit details.While the OOM error handling is good and includes a helpful documentation link, it could be more informative by including the actual memory limits that were exceeded.
Consider enhancing the error message with container memory limits:
- containerError = errors.New(`memory limit exceeded. Please refer to https://docs.bacalhau.org/getting-started/resources/#docker-executor for more information`) //nolint:lll + containerError = fmt.Errorf("memory limit of %d MB exceeded. Please refer to https://docs.bacalhau.org/getting-started/resources/#docker-executor for more information", containerJSON.HostConfig.Memory/1024/1024) //nolint:lll
204-207
: Define constant for the default since value.The magic number "1" used for the since parameter could be replaced with a named constant for better maintainability and clarity.
Consider adding a constant at the package level:
+const defaultLogsSinceTimestamp = "1" // Unix timestamp representing the earliest possible log time func (h *executionHandler) outputStream(ctx context.Context, request messages.ExecutionLogsRequest) (io.ReadCloser, error) { - since := "1" + since := defaultLogsSinceTimestamppkg/executor/docker/executor_test.go (1)
Line range hint
546-550
: Consider enhancing the streaming test coverage.While the test correctly uses the new ExecutionLogsRequest type and verifies basic streaming functionality, consider enhancing it to verify multiple log lines to ensure continuous streaming works as expected.
Example enhancement:
- WithEntrypoint("sh", "-c", "echo hello && sleep 20"). + WithEntrypoint("sh", "-c", "echo hello && echo world && echo testing && sleep 20").Then verify all three log lines are received in order:
require.Equal(s.T(), string(executionLog.Line), "hello\n") require.Equal(s.T(), executionLog.Type, models.ExecutionLogTypeSTDOUT) + res, ok = <-ch + require.True(s.T(), ok) + executionLog = res.Value + require.Equal(s.T(), string(executionLog.Line), "world\n") + require.Equal(s.T(), executionLog.Type, models.ExecutionLogTypeSTDOUT) + + res, ok = <-ch + require.True(s.T(), ok) + executionLog = res.Value + require.Equal(s.T(), string(executionLog.Line), "testing\n") + require.Equal(s.T(), executionLog.Type, models.ExecutionLogTypeSTDOUT)pkg/executor/docker/executor.go (1)
Line range hint
248-292
: Consider adding request validation.The implementation assumes the new request type has compatible fields. Consider adding validation for the ExecutionID field at the start of the method to fail fast if the request is invalid.
func (e *Executor) GetLogStream(ctx context.Context, request messages.ExecutionLogsRequest) (io.ReadCloser, error) { + if request.ExecutionID == "" { + return nil, executor.NewExecutorError(executor.InvalidRequest, "execution ID is required") + } + // It's possible we've recorded the execution as running, but have not yet added the handler to // the handler map because we're still waiting for the container to start. We will try and wait // for a few seconds to see if the handler is added to the map.pkg/lib/watcher/boltdb/boltdb_test.go (2)
397-424
: Good concurrent testing implementation, consider additional test cases.The test effectively verifies concurrent event delivery. Consider enhancing it with:
- Subscribers starting at different iterator positions
- Multiple event deliveries to verify ordering consistency
- Different event types and operations
Here's a suggested enhancement:
func (s *BoltDBEventStoreTestSuite) TestConcurrentSubscribers() { - // Start 3 concurrent subscribers at different positions + // Start 3 concurrent subscribers at different positions respCh1, errCh1 := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, watcher.EventFilter{}) - respCh2, errCh2 := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, watcher.EventFilter{}) - respCh3, errCh3 := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, watcher.EventFilter{}) + respCh2, errCh2 := s.getEventsAsync(watcher.LatestIterator(), 1, watcher.EventFilter{}) + respCh3, errCh3 := s.getEventsAsync(watcher.TrimHorizonIterator(), 1, watcher.EventFilter{}) // Verify all are waiting s.assertChannelsEmpty(respCh1, errCh1) s.assertChannelsEmpty(respCh2, errCh2) s.assertChannelsEmpty(respCh3, errCh3) - // Store an event - s.Require().NoError(s.store.StoreEvent(s.ctx, watcher.StoreEventRequest{ - Operation: watcher.OperationCreate, - ObjectType: "TestObject", - Object: watchertest.TestObject{Value: 1}, - })) + // Store multiple events with different operations + events := []struct { + op watcher.Operation + value int + }{ + {watcher.OperationCreate, 1}, + {watcher.OperationUpdate, 2}, + {watcher.OperationDelete, 3}, + } + + for _, e := range events { + s.Require().NoError(s.store.StoreEvent(s.ctx, watcher.StoreEventRequest{ + Operation: e.op, + ObjectType: "TestObject", + Object: watchertest.TestObject{Value: e.value}, + })) + } // All subscribers should get the event resp1 := s.assertResponseReceived(respCh1, errCh1) resp2 := s.assertResponseReceived(respCh2, errCh2) resp3 := s.assertResponseReceived(respCh3, errCh3) - // Verify all got the same event - s.assertEventsResponse(resp1, 1, watcher.AfterSequenceNumberIterator(1)) - s.assertEventsResponse(resp2, 1, watcher.AfterSequenceNumberIterator(1)) - s.assertEventsResponse(resp3, 1, watcher.AfterSequenceNumberIterator(1)) + // Verify events and ordering + s.assertEventsResponse(resp1, 3, watcher.AfterSequenceNumberIterator(3)) + s.assertEventsResponse(resp2, 3, watcher.AfterSequenceNumberIterator(3)) + s.assertEventsResponse(resp3, 3, watcher.AfterSequenceNumberIterator(3)) + + // Verify event ordering consistency across subscribers + for i := 0; i < 3; i++ { + s.Equal(resp1.Events[i], resp2.Events[i]) + s.Equal(resp2.Events[i], resp3.Events[i]) + } }
426-459
: Good long polling test implementation, consider timeout edge cases.The test effectively verifies long polling with multiple subscribers. Consider enhancing it with:
- Timeout verification (ensure subscribers are notified when timeout occurs)
- Cleanup verification (ensure subscribers are removed after timeout/completion)
- Different filter combinations
Here's a suggested enhancement:
func (s *BoltDBEventStoreTestSuite) TestLongPollingWithMultipleSubscribers() { longPollingTimeout := 100 * time.Millisecond s.store.options.longPollingTimeout = longPollingTimeout // Start multiple subscribers subscriberCount := 5 respChs := make([]<-chan *watcher.GetEventsResponse, subscriberCount) errChs := make([]<-chan error, subscriberCount) + // Test different filter combinations + filters := []watcher.EventFilter{ + {}, // no filter + {ObjectTypes: []string{"TestObject"}}, + {Operations: []watcher.Operation{watcher.OperationCreate}}, + {ObjectTypes: []string{"TestObject"}, Operations: []watcher.Operation{watcher.OperationCreate}}, + {ObjectTypes: []string{"OtherObject"}}, // should not receive events + } for i := 0; i < subscriberCount; i++ { - respCh, errCh := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, watcher.EventFilter{}) + respCh, errCh := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, filters[i]) respChs[i] = respCh errChs[i] = errCh } // Verify all subscribers are waiting (no responses yet) for i := 0; i < subscriberCount; i++ { s.assertChannelsEmpty(respChs[i], errChs[i]) } + // Verify timeout behavior + s.clock.Add(longPollingTimeout + time.Millisecond) + + // First 4 subscribers should timeout with no events + for i := 0; i < subscriberCount-1; i++ { + resp := s.assertResponseReceived(respChs[i], errChs[i]) + s.assertEventsResponse(resp, 0, watcher.AfterSequenceNumberIterator(0)) + } // Store an event s.Require().NoError(s.store.StoreEvent(s.ctx, watcher.StoreEventRequest{ Operation: watcher.OperationCreate, ObjectType: "TestObject", Object: watchertest.TestObject{Value: 1}, })) - // All subscribers should receive the event + // Start new subscribers and verify they receive the event based on their filters + for i := 0; i < subscriberCount; i++ { + respCh, errCh := s.getEventsAsync(watcher.AfterSequenceNumberIterator(0), 1, filters[i]) + resp := s.assertResponseReceived(respCh, errCh) + + if i < 4 { // First 4 filters should match + s.assertEventsResponse(resp, 1, watcher.AfterSequenceNumberIterator(1)) + s.assertEventEquals(resp.Events[0], 1, watcher.OperationCreate, "TestObject", &watchertest.TestObject{Value: 1}) + } else { // Last filter (OtherObject) should not match + s.assertEventsResponse(resp, 0, watcher.AfterSequenceNumberIterator(0)) + } + } + + // Verify subscriber cleanup + s.Equal(0, s.store.GetSubscriberCount()) // Add this helper method to EventStore }pkg/lib/watcher/watcher_test.go (1)
71-188
: Consider enhancing test readability with documentation and grouping.The test cases are comprehensive but could benefit from additional documentation and organization:
- Add comments explaining the purpose and expected behavior of each test case group
- Group related test cases using descriptive comments (e.g., "Basic Iterator Tests", "Error Handling Tests", "Edge Cases")
Apply this diff to improve readability:
func (s *WatcherTestSuite) TestDetermineStartingIterator() { ctx := context.Background() testCases := []struct { name string setupCheckpoint *uint64 // pointer to handle nil case initialIter watcher.EventIterator setupLatestEvent *uint64 // what should be the latest event in store before test expectedIter watcher.EventIterator expectedError bool checkpointErr error latestErr error }{ + // Basic Iterator Tests + // Tests the fundamental behavior of different iterator types { name: "No checkpoint, non-latest iterator", initialIter: watcher.AfterSequenceNumberIterator(5), expectedIter: watcher.AfterSequenceNumberIterator(5), }, { name: "With checkpoint, non-latest iterator", setupCheckpoint: ptr(uint64(10)), initialIter: watcher.AfterSequenceNumberIterator(5), expectedIter: watcher.AfterSequenceNumberIterator(10), }, + // Latest Iterator Tests + // Verifies behavior when using LatestIterator with different configurations { name: "No checkpoint, latest iterator", initialIter: watcher.LatestIterator(), setupLatestEvent: ptr(uint64(15)), // Store event up to seq 15 expectedIter: watcher.AfterSequenceNumberIterator(15), }, { name: "With checkpoint, latest iterator", setupCheckpoint: ptr(uint64(10)), initialIter: watcher.LatestIterator(), setupLatestEvent: ptr(uint64(15)), expectedIter: watcher.AfterSequenceNumberIterator(10), }, + // Error Handling Tests + // Verifies behavior when errors occur during iterator determination { name: "Checkpoint error", initialIter: watcher.AfterSequenceNumberIterator(5), checkpointErr: errors.New("db error"), expectedError: true, },pkg/test/compute/ask_for_bid_pre_approved_test.go (1)
95-96
: Avoid hardcoding timeout durations; define a constant insteadThe timeout duration
5 * time.Second
is hardcoded multiple times at lines 95-96 and 108-109. Defining a constant for the timeout duration improves maintainability and makes it easier to adjust the timeout in the future.Apply this diff to define a constant and replace the hardcoded values:
+const bidResponseTimeout = 5 * time.Second ... case bid := <-s.bidChannel: s.Equal(!testCase.rejected, bid.Accepted, "unexpected bid acceptance state") case <-s.failureChannel: s.Fail("Got unexpected failure") -case <-time.After(5 * time.Second): +case <-time.After(bidResponseTimeout): s.Fail("Timeout waiting for bid") } ... if !testCase.rejected { select { case <-s.completedChannel: s.T().Log("Received expected completion") case <-s.failureChannel: s.Fail("Got unexpected failure") case <-s.bidChannel: s.Fail("Got unexpected second bid") - case <-time.After(5 * time.Second): + case <-time.After(bidResponseTimeout): s.Fail("Timeout waiting for completion") } }Also applies to: 108-109
pkg/compute/watchers/callback_forwarder.go (2)
39-39
: Fix possessive apostrophe in commentThe comment should read "bidder's nodeID" instead of "bidders nodeID" for correct possessive usage.
Apply this diff:
- // the source of this response is the bidders nodeID. + // the source of this response is the bidder's nodeID.
94-96
: Log unhandled execution states in default caseFor better observability and debugging, consider logging any unhandled execution states in the default case.
Apply this diff:
default: // No message created for other states + log.Ctx(ctx).Warn().Msgf("Unhandled execution state: %s", execution.ComputeState.StateType)
pkg/compute/bidder.go (1)
70-70
: Typo in comment: 'allways' should be 'always'There's a small typo in the comment on line 70: "allways" should be corrected to "always" for clarity.
Apply this diff to correct the typo:
-// NB(forrest): allways run semantic bidding before resource bidding since generally there isn't much point in +// NB(forrest): always run semantic bidding before resource bidding since generally there isn't much point inpkg/node/compute.go (1)
254-254
: Address the TODO: Add checkpointing to avoid missing eventsThe TODO comment indicates that checkpointing needs to be added to ensure events are not missed during event handling. Implementing checkpointing is essential for reliable event processing.
Would you like assistance in implementing the checkpointing mechanism, or should we open a new GitHub issue to track this task?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (53)
pkg/bidstrategy/chained.go
(3 hunks)pkg/bidstrategy/fixed.go
(1 hunks)pkg/bidstrategy/resource/capacity_max_strategy.go
(1 hunks)pkg/bidstrategy/semantic/export_test.go
(1 hunks)pkg/bidstrategy/semantic/external_exec.go
(2 hunks)pkg/bidstrategy/semantic/external_http.go
(2 hunks)pkg/bidstrategy/semantic/external_http_test.go
(1 hunks)pkg/bidstrategy/type.go
(1 hunks)pkg/bidstrategy/waiting.go
(0 hunks)pkg/bidstrategy/waiting_test.go
(0 hunks)pkg/compute/bidder.go
(5 hunks)pkg/compute/bidder_test.go
(1 hunks)pkg/compute/callback_chain.go
(0 hunks)pkg/compute/callback_mock.go
(0 hunks)pkg/compute/endpoint.go
(8 hunks)pkg/compute/events.go
(0 hunks)pkg/compute/executor.go
(1 hunks)pkg/compute/executor_buffer.go
(0 hunks)pkg/compute/logstream/server.go
(3 hunks)pkg/compute/logstream/types.go
(1 hunks)pkg/compute/management_client.go
(1 hunks)pkg/compute/store/boltdb/store.go
(1 hunks)pkg/compute/types.go
(0 hunks)pkg/compute/watchers/callback_forwarder.go
(1 hunks)pkg/compute/watchers/callback_forwarder_test.go
(1 hunks)pkg/compute/watchers/event_logger.go
(1 hunks)pkg/compute/watchers/executor_watcher.go
(1 hunks)pkg/executor/docker/executor.go
(2 hunks)pkg/executor/docker/executor_test.go
(3 hunks)pkg/executor/docker/handler.go
(2 hunks)pkg/executor/noop/executor.go
(2 hunks)pkg/executor/types.go
(2 hunks)pkg/executor/wasm/executor.go
(2 hunks)pkg/executor/wasm/handler.go
(2 hunks)pkg/lib/watcher/boltdb/boltdb.go
(9 hunks)pkg/lib/watcher/boltdb/boltdb_test.go
(1 hunks)pkg/lib/watcher/options.go
(0 hunks)pkg/lib/watcher/registry.go
(3 hunks)pkg/lib/watcher/types.go
(1 hunks)pkg/lib/watcher/watcher.go
(5 hunks)pkg/lib/watcher/watcher_test.go
(2 hunks)pkg/nats/proxy/callback_handler.go
(1 hunks)pkg/nats/proxy/callback_proxy.go
(0 hunks)pkg/nats/proxy/constants.go
(0 hunks)pkg/node/compute.go
(5 hunks)pkg/node/constants.go
(1 hunks)pkg/node/manager/node_manager.go
(1 hunks)pkg/node/requester.go
(0 hunks)pkg/test/compute/ask_for_bid_pre_approved_test.go
(1 hunks)pkg/test/compute/ask_for_bid_test.go
(1 hunks)pkg/test/compute/cancel_test.go
(1 hunks)pkg/test/compute/setup_test.go
(4 hunks)pkg/test/requester/retries_test.go
(1 hunks)
💤 Files with no reviewable changes (11)
- pkg/bidstrategy/waiting.go
- pkg/bidstrategy/waiting_test.go
- pkg/compute/callback_chain.go
- pkg/compute/callback_mock.go
- pkg/compute/events.go
- pkg/compute/executor_buffer.go
- pkg/compute/types.go
- pkg/lib/watcher/options.go
- pkg/nats/proxy/callback_proxy.go
- pkg/nats/proxy/constants.go
- pkg/node/requester.go
🧰 Additional context used
🪛 GitHub Check: lint / go-lint (ubuntu-latest)
pkg/compute/watchers/event_logger.go
[failure] 50-50:
Magic number: 1e6, in detected (mnd)
pkg/node/compute.go
[failure] 22-22:
ST1019: package "github.com/bacalhau-project/bacalhau/pkg/compute/watchers" is being imported more than once (stylecheck)
[failure] 23-23:
ST1019(related information): other import of "github.com/bacalhau-project/bacalhau/pkg/compute/watchers" (stylecheck)
🔇 Additional comments (70)
pkg/bidstrategy/semantic/export_test.go (1)
16-16
: LGTM! Verify test coverage.
The removal of the NodeID
field from BidStrategyRequest
aligns with the broader refactoring efforts to simplify request structures.
Let's verify that the test coverage remains comprehensive:
✅ Verification successful
Changes are consistent with codebase structure
The verification confirms that:
- The
BidStrategyRequest
struct inpkg/bidstrategy/type.go
only contains theJob
field, confirming theNodeID
field has been consistently removed - The only remaining
NodeID
reference in tests is inpkg/util/idgen/node_test.go
, which is unrelated toBidStrategyRequest
- All files using
BidStrategyRequest
are aligned with the new structure
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if there are any tests that might still expect NodeID
# Test 1: Look for any remaining references to NodeID in test files
rg -g '**/*_test.go' 'NodeID.*BidStrategyRequest'
# Test 2: Look for any assertions or validations related to NodeID
rg -g '**/*_test.go' 'assert.*NodeID|require.*NodeID'
Length of output: 243
Script:
#!/bin/bash
# Let's verify if the NodeID field removal is consistent across the codebase
# Check for any remaining NodeID references in the bid strategy related files
rg -g '!**/*_test.go' 'NodeID.*BidStrategyRequest'
# Check the BidStrategyRequest struct definition
ast-grep --pattern 'type BidStrategyRequest struct {
$$$
}'
# Check if there are any other test files using BidStrategyRequest
fd -e go -E '**/vendor/**' --exec grep -l "BidStrategyRequest" {}
Length of output: 1531
pkg/compute/logstream/types.go (2)
8-8
: LGTM: Import addition is necessary and properly structured.
The new import is required for the ExecutionLogsRequest type used in the Server interface.
11-16
: LGTM: Well-designed interface with proper async handling.
The Server interface is well-structured with:
- Clear documentation
- Proper use of context for cancellation
- Type-safe async results using generics
- Read-only channel for stream consumers
Let's verify the consistency with other execution-related interfaces:
✅ Verification successful
Interface consistency verified across execution-related components
The verification shows consistent patterns across the codebase:
- The
GetLogStream
method signature inpkg/executor/types.go
follows similar patterns but returnsio.ReadCloser
instead of channels, which is appropriate as executors handle the raw streams - The compute layer (
pkg/compute/types.go
andpkg/compute/endpoint.go
) properly wraps these streams into async channels - The request type
ExecutionLogsRequest
is used consistently throughout the execution stack - All implementations properly handle context and error returns
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for similar execution-related interfaces and their methods
# to ensure consistent patterns across the codebase
# Search for other execution-related interfaces
echo "Checking for related interfaces:"
ast-grep --pattern 'type $_ interface {
$$$
Get$_Log$_($$$) $$$
$$$
}'
# Search for ExecutionLogsRequest usage patterns
echo -e "\nChecking ExecutionLogsRequest usage patterns:"
rg -A 3 "ExecutionLogsRequest"
Length of output: 6020
pkg/node/constants.go (2)
1-2
: LGTM! Clean package structure.
The package declaration follows Go conventions and matches the directory structure.
3-19
: Well-structured constants with clear documentation.
The constants follow good practices:
- Clear, descriptive comments explaining each watcher's purpose
- Consistent naming convention
- Logical grouping in a single const block
- Human-readable string values that aid debugging
- Appropriate use of unexported constants for package-level encapsulation
pkg/bidstrategy/resource/capacity_max_strategy.go (2)
29-30
: LGTM! Clean response structure simplification.
The simplified response structure with just ShouldBid
and Reason
fields makes the code cleaner and more maintainable.
34-35
: Verify callers handle simplified response.
The response structure has been simplified by removing the ShouldWait
field. Let's verify that all callers have been updated accordingly.
✅ Verification successful
Based on the search results, I can now generate the final response:
The simplified response structure is correctly handled by all callers
The code search shows that:
- All callers only check the
ShouldBid
field of the response - No references to the removed
ShouldWait
field were found - The main caller in
pkg/compute/bidder.go
usesif err != nil || !resp.ShouldBid
pattern - Test files also only assert against
ShouldBid
field
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Find all callers of ShouldBidBasedOnUsage to verify they handle the simplified response
# Search for method calls
echo "=== Direct method calls ==="
rg -A 3 "ShouldBidBasedOnUsage\("
# Search for response field access
echo "=== Response field access patterns ==="
rg "response\.(ShouldWait|ShouldBid)"
Length of output: 8130
pkg/compute/watchers/executor_watcher.go (2)
1-11
: LGTM!
The package structure and imports are well-organized and follow Go conventions.
26-26
: Address the TODO comment regarding old events.
The comment indicates a potential race condition with old events during node startup. This could lead to processing stale state changes.
Let's check if there are any existing event filtering mechanisms:
Would you like me to help implement an event filtering mechanism based on timestamps or sequence numbers?
pkg/bidstrategy/type.go (2)
16-17
: Verify clients handling the JSON response format change.
The capitalization of JSON field names (shouldBid
→ ShouldBid
, reason
→ Reason
) is a breaking change for API clients. Ensure all consumers are updated accordingly.
12-12
: Verify the impact of NodeID field removal.
The removal of the NodeID
field from BidStrategyRequest
could be a breaking change. Let's verify its usage across the codebase.
✅ Verification successful
NodeID field removal is safe to proceed
The removal of NodeID
from BidStrategyRequest
is safe as there are no direct usages of this field in the codebase. The search results show that while NodeID
is used in many other contexts (node management, execution tracking, etc.), it is not used within the bid strategy context. The only references to BidStrategyRequest
show it being used with just the Job
field.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining references to NodeID in BidStrategyRequest
# to ensure all callers have been updated.
# Search for NodeID usage in relation to BidStrategyRequest
rg -A 5 "BidStrategyRequest.*NodeID|NodeID.*BidStrategyRequest"
# Search for any test files that might be constructing BidStrategyRequest
# with NodeID
fd -e test.go -x rg -l "BidStrategyRequest"
Length of output: 109
Script:
#!/bin/bash
# Let's try a broader search to understand the usage and changes
# Check git history for the removed NodeID field
git log -p pkg/bidstrategy/type.go | rg -B 3 -A 3 "NodeID"
# Search for any BidStrategyRequest usage patterns
rg -B 3 -A 3 "BidStrategyRequest"
# Look for any test files or mocks that might be affected
fd ".*test.*\.go|.*mock.*\.go" -x rg -l "BidStrategyRequest"
# Check if NodeID is used in other related structs or interfaces
rg "NodeID"
Length of output: 54452
pkg/nats/proxy/callback_handler.go (2)
11-12
: LGTM: Import reordering
The import reordering maintains correct grouping of standard library and external imports.
Line range hint 52-67
: Verify cancellation handling after OnCancelComplete removal
The removal of OnCancelComplete
case from the handler suggests a significant change in how cancellation events are processed. This could impact resource cleanup and error handling for cancelled computations.
Let's verify the new cancellation handling approach:
Consider documenting the new cancellation handling approach in the package documentation to clarify:
- How cancellation events are now processed
- Where cleanup responsibilities now lie
- Impact on error handling
✅ Verification successful
Based on the search results, I can now generate the final response:
Cancellation handling has been properly refactored and is working as expected
The removal of OnCancelComplete
case from the callback handler is intentional and part of a larger refactoring where cancellation is now handled through the compute node's state management system. The code shows:
- Cancellation is now handled directly through the
CancelExecution
endpoint which updates execution state toExecutionStateCancelled
- The compute executor properly handles cancellation through its
Cancel()
method - Resource cleanup is managed through the compute node's state transitions
- Proper error handling exists for cancelled executions via
ExecutionAlreadyCancelled
error type
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Analyze the new cancellation handling implementation
# Expected: Find alternative cancellation handling mechanisms
# Check for any remaining references to OnCancelComplete
echo "Checking for remaining OnCancelComplete references:"
rg "OnCancelComplete" -l
# Look for alternative cancellation handling
echo "Checking for alternative cancellation patterns:"
rg -A 5 "cancel|Cancel" --type go
# Check compute package for cancellation-related changes
echo "Checking compute package interface:"
ast-grep --pattern 'type Callback interface {
$$$
}'
Length of output: 145340
pkg/bidstrategy/semantic/external_exec.go (2)
13-14
: LGTM!
The import reorganization improves readability while maintaining functionality.
50-50
:
Update error message to reflect current implementation.
The error message still refers to "job selection policy probe data" but we're now marshaling the entire request object.
- fmt.Errorf("ExternalCommandStrategy: error marshaling job selection policy probe data: %w", err)
+ fmt.Errorf("ExternalCommandStrategy: error marshaling bid strategy request: %w", err)
Likely invalid or redundant comment.
pkg/compute/logstream/server.go (4)
11-11
: LGTM: Import addition aligns with type changes
The addition of the messages package import is necessary for the new ExecutionLogsRequest type usage.
37-46
: LGTM: Constructor follows good practices
The changes improve encapsulation by returning the interface type and include proper parameter validation.
77-78
: LGTM: Good practice adding compile-time interface check
The addition of the compile-time interface compliance check helps catch any interface implementation issues early.
29-29
: Verify all external usage is through the interface
The change from exported Server
to unexported server
type requires all external usage to be through the interface.
Also applies to: 37-37
✅ Verification successful
Let's check the actual usage in those files to determine if they're referring to our Server
type or a different one.
Based on the results, I can now generate the final response since we have sufficient information. The search results show that while there are *Server
references in other files, they are different server types (webui.Server and publicapi.Server) and not the logstream.Server we're concerned about. This is confirmed by:
- The context of the usages shows different server implementations
- No imports of the logstream package were found in these files
- The method signatures and structures are completely different from our logstream server
The change from Server
to server
is safe
The unexported change appears safe as there are no external usages of the concrete logstream Server type. All matches found are unrelated server implementations in different packages.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for direct usage of the concrete Server type that might need updating
rg -l "\\*Server\\b" --type go | grep -v "pkg/compute/logstream/server.go"
Length of output: 140
Script:
#!/bin/bash
# Check the actual Server type usage in the identified files
echo "=== webui/webui.go ==="
rg -C 3 "\*Server\b" webui/webui.go
echo -e "\n=== pkg/publicapi/server_test.go ==="
rg -C 3 "\*Server\b" pkg/publicapi/server_test.go
echo -e "\n=== pkg/publicapi/server.go ==="
rg -C 3 "\*Server\b" pkg/publicapi/server.go
# Also check imports to see if they import the logstream package
echo -e "\n=== Checking imports ==="
rg "\".*logstream\"" webui/webui.go pkg/publicapi/server_test.go pkg/publicapi/server.go
Length of output: 3093
pkg/bidstrategy/semantic/external_http_test.go (1)
Line range hint 74-82
: Validate the complete request structure being sent to the HTTP endpoint.
The test decodes the request body but doesn't verify if the encoded structure matches the expected format that real clients would send.
Let's verify the structure of BidStrategyRequest
to ensure we're testing with the correct payload format:
✅ Verification successful
Based on the shell script results, I can now generate the final response:
The test correctly validates the request structure being sent to the HTTP endpoint
The test is properly validating the request structure since:
BidStrategyRequest
has a simple, well-defined structure containing only aJob
field- The test uses
getBidStrategyRequest(t)
helper which creates a valid request with mock data - Multiple test files demonstrate consistent usage of this structure
- The HTTP endpoint test decodes into this exact structure and verifies the payload matches
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Find the definition and usage of BidStrategyRequest to understand its structure
ast-grep --pattern 'type BidStrategyRequest struct {
$$$
}'
# Look for other test files that might show example usage
rg -g '**/*_test.go' -A 5 'BidStrategyRequest'
Length of output: 12768
pkg/test/compute/ask_for_bid_test.go (1)
96-97
: LGTM! Good error handling improvement.
Using Require().NoError()
here is the right choice as it provides a clear failure point and prevents nil pointer dereferences in subsequent assertions.
pkg/bidstrategy/chained.go (2)
9-10
: LGTM: Clean import organization
The import organization follows good practices by separating external and internal dependencies with a blank line.
61-64
: Verify removal of ShouldWait handling across the codebase
The simplification of the bidding logic by removing ShouldWait
checks looks good, but we should ensure this change is consistent across all bid strategy implementations.
✅ Verification successful
The results show that ShouldWait
is not found in any files under pkg/bidstrategy/
, and the BidStrategyResponse
struct only contains ShouldBid
and Reason
fields. Let me verify the changes in the actual bid strategy implementations to ensure they're all consistent with this simplified approach.
Based on the comprehensive review of the codebase, I can now generate the final response:
Removal of ShouldWait is consistent across the codebase
The verification confirms that:
- The
BidStrategyResponse
struct only containsShouldBid
andReason
fields - No references to
ShouldWait
exist in any bid strategy implementations - All bid strategy implementations consistently use the simplified response model
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining ShouldWait references in bid strategies
# Expected: No occurrences of ShouldWait in bid strategy implementations
echo "Checking for remaining ShouldWait references in bid strategies..."
rg -l "ShouldWait" pkg/bidstrategy/
echo "Checking for any BidStrategyResponse struct definitions..."
ast-grep --pattern 'type BidStrategyResponse struct { $$$ }'
Length of output: 581
Script:
#!/bin/bash
# Check bid strategy implementations for their response handling
rg -A 5 "func.*Bid\(" pkg/bidstrategy/
Length of output: 5819
pkg/bidstrategy/semantic/external_http.go (2)
13-14
: LGTM: Import addition is properly placed
The new marshaller import is correctly grouped with other internal packages.
48-48
: Verify external HTTP endpoints compatibility
The change from GetJobSelectionPolicyProbeData
to direct request marshaling modifies the payload structure sent to external endpoints. This could potentially break existing integrations.
Let's check for any external HTTP endpoint configurations in the codebase:
✅ Verification successful
Let me gather more information about the payload structure and tests to verify the compatibility.
Let me check the actual implementation of the external HTTP strategy to understand the payload structure change.
No breaking changes in external HTTP endpoints
The change from GetJobSelectionPolicyProbeData
to direct request marshaling is safe because:
- The test file shows that the same
BidStrategyRequest
structure was already being used and verified in tests - The payload structure remains unchanged as it simply contains the Job model
- The HTTP endpoint contract (request/response format, content types, status codes) is well-tested and remains the same
- The only change is using
JSONMarshalWithMax
which adds a safety limit to prevent oversized payloads
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for HTTP endpoint configurations that might be affected by the payload change
rg -l "ExternalHTTPStrategy|ExternalHTTPStrategyParams"
# Look for any test files that might need updates
rg -l "TestJobSelectionHttp|external.*http.*test"
Length of output: 265
Script:
#!/bin/bash
# Check the test file to understand payload structure and assertions
cat pkg/bidstrategy/semantic/external_http_test.go
# Look for the previous implementation to compare
ast-grep --pattern 'GetJobSelectionPolicyProbeData'
Length of output: 2678
Script:
#!/bin/bash
# Check the implementation file
cat pkg/bidstrategy/semantic/external_http.go
# Also check the BidStrategyRequest structure
rg -A 5 "type BidStrategyRequest struct" pkg/bidstrategy/
Length of output: 3591
pkg/executor/types.go (1)
11-11
: LGTM: Import addition is correct
The new import is necessary for the updated GetLogStream method signature and is properly placed.
pkg/test/compute/cancel_test.go (2)
116-117
: LGTM! Good test isolation practice.
Adding setup and teardown calls for each test iteration ensures a clean state and prevents test interference.
119-121
: Verify watcher cleanup in other test cases.
Good addition of watcher cleanup to prevent unexpected state changes. However, we should verify if other test cases in this file need similar watcher cleanup.
✅ Verification successful
Let me analyze the test cases shown in the output to determine if they need watcher cleanup. I'll search for the execution state modifications in these tests.
Watcher cleanup is only needed in TestStates
The watcher cleanup is specifically needed in TestStates
because it explicitly sets and verifies execution states. Other test cases (TestCancel
, TestCancelDocker
, TestDoesntExist
) don't require this cleanup as they:
- Either wait for natural state transitions through
stateResolver.Wait()
- Or perform simple error checks without state manipulation
The current placement of s.node.Watchers.Stop(ctx)
in TestStates
is correct and sufficient.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other test cases in the file that might need watcher cleanup
rg -A 10 "func.*Test.*\(" pkg/test/compute/cancel_test.go
Length of output: 1485
Script:
#!/bin/bash
# Search for execution state modifications and watcher usage in the test file
rg -B 5 -A 5 "ExecutionState|Watchers" pkg/test/compute/cancel_test.go
Length of output: 3070
pkg/compute/endpoint.go (3)
Line range hint 145-149
: LGTM!
The ExecutionLogs
implementation correctly forwards the request to the log server with all necessary parameters.
Line range hint 124-135
: Verify state transition validation.
The removal of state validation checks could allow invalid state transitions. Ensure that the ExecutionStore
implementation properly validates state transitions to prevent inconsistent states.
#!/bin/bash
# Check ExecutionStore implementation for state validation
rg -A 5 "UpdateExecutionState.*Condition"
11-13
: Verify architectural changes around bidder and executor removal.
The removal of Bidder
and Executor
fields from BaseEndpoint
along with the change of LogServer
to a value type suggests a significant architectural shift. This could impact how executions are handled across the system.
Also applies to: 20-20, 30-30
pkg/compute/bidder_test.go (2)
69-74
: LGTM! Clean simplification of bid response.
The removal of unnecessary fields and focus on just ShouldBid
makes the test cases clearer and more maintainable.
58-151
: Verify alignment with bidding strategy implementations.
The test changes reflect a simplified bidding process. Let's verify that this aligns with the actual implementations.
✅ Verification successful
Let me gather more information about the bidding strategy implementations and state handling.
Based on the gathered information, I can now provide the final response:
Test changes correctly reflect the bidding implementation
The test changes align perfectly with the actual implementation:
- BidStrategyResponse only contains
ShouldBid
andReason
fields, matching the simplified structure in tests - The execution states (AskForBidAccepted/Rejected) used in tests match the defined model states
- The RunBidding implementation in bidder.go follows the same flow being tested
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all bid strategy implementations have been updated
# to match the simplified response structure (removing ShouldWait).
# Test 1: Check for any remaining uses of ShouldWait in bid strategy responses
echo "Checking for remaining ShouldWait usage in bid strategies..."
rg -l "ShouldWait.*BidStrategyResponse" pkg/bidstrategy/
# Test 2: Verify execution state handling in compute package
echo "Verifying execution state handling..."
ast-grep --pattern 'ExecutionStateAskForBid(Accepted|Rejected)' pkg/compute/
Length of output: 353
Script:
#!/bin/bash
# Check the actual implementation of bidding strategies and their response structure
echo "Checking bid strategy implementations..."
rg -A 5 "type BidStrategyResponse struct" pkg/bidstrategy/
# Look for the actual RunBidding implementation
echo "Checking RunBidding implementation..."
ast-grep --pattern 'func ($_) RunBidding(ctx context.Context, execution *models.Execution) {
$$$
}'
# Check for execution state type definitions
echo "Checking execution state definitions..."
rg "ExecutionStateType =" -A 10 pkg/models/
Length of output: 2132
pkg/compute/watchers/callback_forwarder_test.go (2)
1-18
: LGTM! Build tags and imports are well organized.
The file structure follows Go best practices with appropriate build tags and necessary imports.
143-145
: LGTM! Test runner follows standard practice.
The test runner function correctly uses the testify suite package.
pkg/test/compute/setup_test.go (3)
11-12
: LGTM: NATS integration setup looks good.
The addition of NATS server and client dependencies is well-structured and follows Go conventions.
Also applies to: 48-49
166-167
: LGTM: Clean NATS setup implementation.
The assignment of NATS server and client to struct fields aligns well with the cleanup strategy in TearDownTest and follows good testing practices.
139-141
: Verify if the increased buffer size is sufficient.
The increase in channel buffer sizes from 1 to 10 is a good improvement to prevent potential deadlocks. However, let's verify if this size is sufficient for all test scenarios.
✅ Verification successful
Buffer size of 10 is sufficient for the test scenarios
Based on the code analysis:
- Channels are used in a simple select pattern with single message consumption
- No parallel test execution is detected
- No loops or goroutines that could queue multiple messages
- Each callback sends only one message at a time
- Channels are properly cleaned up in TearDownTest
The increased buffer size of 10 provides more than enough capacity for the current test patterns.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check test files for patterns that might require larger buffer sizes
# Look for test cases that might queue up more than 10 messages
rg -A 5 "for.*range" "pkg/test/compute/*_test.go" | grep -B 5 "channel.*<-"
# Look for parallel test execution that might need larger buffers
rg "t.Parallel" "pkg/test/compute/*_test.go"
Length of output: 256
Script:
#!/bin/bash
# Let's first find the correct test file path
fd "setup_test.go" --type f
# Then search for patterns in the correct path
rg -A 5 "for.*range" $(fd "setup_test.go" --type f) | grep -B 5 "channel.*<-"
# Check for parallel test execution
rg "t.Parallel" $(fd "setup_test.go" --type f)
# Look for test cases that might send multiple messages
rg "(bid|completed|failure)Channel.*<-" -B 2 $(fd "setup_test.go" --type f)
Length of output: 1131
Script:
#!/bin/bash
# Let's check how these channels are consumed in tests
rg -B 5 "<-\s*s\.(bid|completed|failure)Channel" pkg/test/compute/setup_test.go
# Look for test functions that might use these channels
rg "func.*Test" -A 10 pkg/test/compute/setup_test.go
# Check for any loops or goroutines that might send multiple messages
rg "(for|go\s+func)" -A 5 pkg/test/compute/setup_test.go
Length of output: 751
pkg/compute/management_client.go (2)
134-134
: LGTM! Appropriate logging level adjustment.
Changing to Trace
level is appropriate for this frequently occurring resource update operation, as it helps reduce log noise while still maintaining observability when needed.
Line range hint 153-196
: Verify async event handling in Start method.
The Start method implements a select-based event loop for handling multiple async operations. Let's verify the robustness of this implementation:
✅ Verification successful
Based on the extensive output from the shell scripts, I can now generate the final response:
Event loop implementation in Start method is robust and well-structured
The event loop implementation follows Go's best practices for concurrent event handling:
- Proper cleanup with deferred
Stop()
calls for all tickers - Correct channel handling with
select
statement covering all cases - Well-ordered shutdown sequence through context cancellation and done channel
- Consistent error handling pattern across similar implementations in the codebase
- Appropriate ticker intervals with clear documentation explaining the timing relationships
The implementation aligns with similar patterns used throughout the codebase in packages like pkg/orchestrator
, pkg/node/heartbeat
, and others, demonstrating consistent and reliable async event handling.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for potential race conditions or deadlocks in event handling
# Test 1: Look for other similar event loops to ensure consistent patterns
echo "Checking for similar event loop patterns:"
rg -A 10 "select\s*{.*case.*<-ctx\.Done\(\):"
# Test 2: Check for potential deadlocks in channel operations
echo "Checking for potential deadlock scenarios:"
ast-grep --pattern 'select { $$$ }'
# Test 3: Look for ticker usage patterns
echo "Checking ticker usage patterns:"
rg "time\.NewTicker|\.Stop\(\)"
Length of output: 65175
pkg/executor/noop/executor.go (2)
14-14
: LGTM!
Clean addition of the required import for the new message types.
233-235
: LGTM!
The GetLogStream method signature has been correctly updated to use the new messages.ExecutionLogsRequest type. The "not implemented" error is appropriate for a NoopExecutor implementation.
Let's verify that other executor implementations have been updated consistently:
✅ Verification successful
All executor implementations correctly use the messages.ExecutionLogsRequest type
The verification shows that all executor implementations (docker
, wasm
, and noop
) have consistent method signatures using messages.ExecutionLogsRequest
:
- Interface definition in
pkg/executor/types.go
- Implementation in
pkg/executor/docker/executor.go
- Implementation in
pkg/executor/wasm/executor.go
- Implementation in
pkg/executor/noop/executor.go
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify GetLogStream signature consistency across executors
# Expected: All GetLogStream implementations should use messages.ExecutionLogsRequest
# Search for GetLogStream implementations
ast-grep --pattern 'func $_($_ *$_) GetLogStream(ctx context.Context, request messages.ExecutionLogsRequest) (io.ReadCloser, error)'
Length of output: 134
Script:
#!/bin/bash
# Let's try a broader search for GetLogStream implementations
# First, find all executor implementations
fd executor.go
# Then use ripgrep to find GetLogStream implementations with context
rg "GetLogStream" -A 3 --type go
Length of output: 4582
pkg/lib/watcher/watcher.go (4)
45-57
: Clean initialization with improved error handling.
The constructor has been simplified while maintaining proper error handling and state initialization. The delegation of iterator determination to a separate method improves code organization.
171-175
: Improved error handling for graceful shutdown.
The changes properly distinguish between context cancellation and other errors, reducing log noise during normal shutdown scenarios.
201-204
: Consistent error handling for context cancellation.
The changes align with the error handling pattern in fetchWithBackoff
, ensuring proper shutdown behavior by preventing unnecessary retries.
17-17
: Verify the removal of event channel.
The removal of the ch
channel from the watcher struct suggests a significant change in the event propagation pattern. This could impact how events are handled across the system.
Let's verify the usage and ensure no references remain:
✅ Verification successful
Event channel removal is part of architectural change
The removal of the ch
channel from the watcher struct is a deliberate architectural change. The git history shows that the event channel was previously used for buffered event propagation (ch chan Event
), but has been replaced with a direct handler-based approach. The remaining channel usages in the codebase are for different purposes (stopping signals, notifications, test coordination) and not for event propagation.
Key findings:
- The event channel removal is paired with increased reliance on the
EventHandler
interface - The change simplifies the concurrency model by removing buffered event channels
- All other channel usages in the codebase serve different purposes and are unaffected
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for any remaining channel-based event handling
rg -g '!vendor/*' -g '*.go' 'ch\s*<-\s*event' .
Length of output: 49
Script:
#!/bin/bash
# Let's do a broader search for channel-related patterns and event handling
# 1. Search for channel declarations and usages in watcher package
rg -g '!vendor/*' -g '*.go' 'chan\s+\w+' pkg/lib/watcher/
# 2. Search for event sending/receiving patterns
rg -g '!vendor/*' -g '*.go' '<-' pkg/lib/watcher/
# 3. Look for the old implementation in git history
git log -p pkg/lib/watcher/watcher.go | grep -A 5 -B 5 'ch.*chan'
# 4. Search for any references to event channels in tests
rg -g '!vendor/*' -g '*_test.go' 'chan' pkg/lib/watcher/
Length of output: 7040
pkg/executor/docker/handler.go (2)
21-21
: LGTM: Import addition aligns with the signature changes.
The new import for the messages package is necessary for the updated ExecutionLogsRequest
type used in the outputStream
method.
Line range hint 208-215
: Potential race condition in select statement.
The current implementation might miss context cancellation if it occurs exactly when activeCh
is closed. Consider checking the context before entering the select statement.
Consider this safer approach:
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-h.activeCh:
}
Let's verify if this race condition has been reported before:
pkg/executor/wasm/handler.go (2)
23-23
: LGTM: Import addition aligns with type changes.
The new import of messages package is correctly placed and supports the parameter type change in the outputStream method.
237-239
: LGTM: Method signature update maintains consistency.
The parameter type change from executor.LogStreamRequest
to messages.ExecutionLogsRequest
aligns with the standardization of log streaming requests across different executors. The implementation remains compatible as it only uses the Follow
field which is present in both types.
Let's verify that other executors have been updated consistently:
✅ Verification successful
Method signature update verified across all executor implementations
The verification confirms that both existing executor implementations (Docker and WASM) use the standardized messages.ExecutionLogsRequest
type in their outputStream
method signatures. No inconsistencies were found.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all executor implementations use the same parameter type
# for their log streaming methods
# Test: Search for outputStream method signatures across executor implementations
# Expect: All implementations should use messages.ExecutionLogsRequest
rg -t go "func.*outputStream.*\(.*context\.Context.*ExecutionLogsRequest\)" pkg/executor/
Length of output: 401
Script:
#!/bin/bash
# Let's verify if there are any other executor implementations that might have a different signature
# or haven't been updated yet
# First, find all potential executor implementations
fd "handler.go" pkg/executor/
# Then search for any outputStream method signatures to ensure we haven't missed any
rg -t go "func.*outputStream.*\(.*context\.Context.*\)" pkg/executor/
Length of output: 471
pkg/node/manager/node_manager.go (2)
167-167
: LGTM! Appropriate logging level adjustment.
The change from Debug
to Trace
level is appropriate as resource updates are frequent operational events that should be logged at the most granular level.
Let's verify the logging configuration to ensure this change doesn't impact observability:
✅ Verification successful
Based on the search results, I can now make a determination about the logging configuration and usage:
Appropriate logging level change confirmed
The change from Debug
to Trace
level for resource updates is appropriate because:
-
The codebase shows a clear logging hierarchy where:
Trace
is used for frequent operational events and detailed tracingDebug
is used for less frequent operational eventsInfo
and above are used for significant events
-
Resource updates are high-frequency operational events that should be at
Trace
level to avoid log spam, while still being available for detailed debugging when needed. -
The logging configuration is properly handled through the global zerolog configuration, so this change will be respected across the system.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check logging configuration and usage patterns
# Expected: Confirm that TRACE level is properly configured in the logging setup
# Search for logging configuration
rg -A 5 'zerolog.*SetGlobalLevel|zerolog.*Level'
# Check for other TRACE level usage for consistency
rg '\.Trace\(\)\.Msg'
Length of output: 247024
Script:
#!/bin/bash
# Search for logging configuration and usage patterns
rg -A 5 'zerolog.*SetGlobalLevel|zerolog.*Level|log\..*\.Trace|log\..*\.Debug'
Length of output: 72807
Line range hint 167-173
: Verify async resource update handling.
While the code is thread-safe due to StripedMap
, let's ensure it properly handles concurrent resource updates in the context of async execution:
✅ Verification successful
Resource updates are properly handled with thread-safe mechanisms
The implementation is thread-safe for concurrent resource updates:
-
Uses
StripedMap
which provides fine-grained locking through multiple stripes:- Each stripe has its own
sync.RWMutex
for read/write synchronization - Uses atomic counters for thread-safe counting operations
- Properly handles concurrent Put/Get/Delete operations
- Each stripe has its own
-
The
UpdateResources
implementation in node manager:- Uses
StripedMap.Put()
which handles locking internally - Performs atomic updates within the locked stripe
- Uses
The concurrent access patterns are well tested through:
- Fuzz testing with multiple goroutines in
TestConcurrent
- Per-stripe locking that reduces contention
- Atomic operations for counters
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify concurrent resource update handling
# Expected: Find potential race conditions or concurrent access patterns
# Check for concurrent access patterns
rg -A 10 'go func|goroutine|async|concurrent'
# Look for other resource update calls that might conflict
ast-grep --pattern 'UpdateResources($$$)'
# Check for related tests
fd -e go -E 'vendor|testdata' -x rg -l 'TestUpdateResources|TestConcurrent'
Length of output: 247024
Script:
#!/bin/bash
# Let's check the implementation of UpdateResources and related code to verify thread safety
# Look for UpdateResources implementation
rg -A 10 "func.*UpdateResources" pkg/node/manager/node_manager.go
# Check StripedMap implementation for thread safety
rg -A 10 "type.*StripedMap" pkg/lib/concurrency/striped_map.go
# Look for any tests verifying concurrent access
rg -A 10 "TestConcurrent.*StripedMap" pkg/lib/concurrency/striped_map_test.go
Length of output: 997
Script:
#!/bin/bash
# Let's check the StripedMap tests and implementation more thoroughly
# Look for concurrent test cases in striped_map_test.go
rg -A 20 "func.*Test.*Concurrent" pkg/lib/concurrency/striped_map_test.go
# Check the full StripedMap implementation for thread-safety mechanisms
cat pkg/lib/concurrency/striped_map.go
# Look for any race condition tests
rg -A 10 "race|Race" pkg/lib/concurrency/striped_map_test.go
Length of output: 2436
pkg/executor/wasm/executor.go (2)
17-17
: LGTM!
The addition of the messages package import aligns with the method signature change.
198-198
: LGTM!
The signature change from executor.LogStreamRequest
to messages.ExecutionLogsRequest
is consistent with the broader refactoring effort to standardize log stream request handling across executor implementations.
Let's verify that this change is consistent across other executor implementations:
✅ Verification successful
The signature change is consistently implemented across all executor implementations
All executor implementations (wasm
, docker
, and noop
) are using the standardized messages.ExecutionLogsRequest
type in their GetLogStream
method signatures, confirming that the refactoring has been applied uniformly across the codebase.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that all executor implementations use the new ExecutionLogsRequest type
# Test: Search for GetLogStream method signatures in executor implementations
rg -A 2 'func.*GetLogStream.*Context' pkg/executor/
Length of output: 1014
pkg/test/requester/retries_test.go (1)
68-69
: LGTM: Bid strategy initialization simplified.
The changes correctly align with the simplified NewFixedBidStrategy
constructor signature, removing the redundant boolean parameter while maintaining the same functionality.
Let's verify that this change is consistent across the codebase:
✅ Verification successful
Bid strategy initialization is correctly updated
The verification confirms that:
- The function signature in
pkg/bidstrategy/fixed.go
accepts only one boolean parameter - All usages in the codebase (in
pkg/test/requester/retries_test.go
) correctly use the single-parameter version - No instances of the old two-parameter constructor remain
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any remaining instances of the old two-parameter constructor
# to ensure all calls have been updated.
# Search for any remaining calls to NewFixedBidStrategy with two parameters
rg "NewFixedBidStrategy\([^)]+,[^)]+\)"
# Search for the strategy's implementation to verify the signature
ast-grep --pattern 'func NewFixedBidStrategy($_) $_'
Length of output: 838
Script:
#!/bin/bash
# Let's verify all usages of NewFixedBidStrategy in the codebase
rg "NewFixedBidStrategy" -A 2
Length of output: 573
pkg/compute/store/boltdb/store.go (1)
437-443
: LGTM! Simplified error handling looks good.
The direct return of s.eventStore.StoreEventTx
result is appropriate here since:
- It's the last operation in the transaction
- Any error will trigger a transaction rollback, maintaining data consistency
- The error handling is simplified without losing any functionality
Let's verify the transaction rollback behavior:
✅ Verification successful
Simplified error handling is safe and appropriate
The code changes are verified to be safe because:
- The
database.Update
function handles transaction management, ensuring automatic rollback on any error StoreEventTx
is properly integrated into the transaction flow as shown in the BoltDB implementation- All operations in the transaction (validation, state checks, merging, updating indexes) properly return errors that will trigger rollback
- The event store's transaction-aware implementation (
StoreEventTx
) is correctly used as the final operation
The evidence from the codebase shows:
- BoltDB's transaction management is consistently used via
database.Update
- The
EventStore
interface and its BoltDB implementation properly handle transaction-scoped operations - Error propagation is maintained throughout the transaction chain
- The simplified error handling doesn't compromise the transaction's atomicity
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify that StoreEventTx errors are properly handled and trigger rollbacks
# Test: Search for transaction rollback tests
rg -A 5 "TestUpdateExecutionState.*rollback|TestStore.*rollback"
# Test: Search for StoreEventTx error handling in tests
ast-grep --pattern 'func Test$_($_, $_) {
$$$
eventStore.$$$StoreEventTx$$$
$$$
}'
Length of output: 157
Script:
#!/bin/bash
# Let's try a different approach to verify transaction and error handling
# Check for transaction handling in the store implementation
rg -B 2 -A 5 "func.*Tx\(" pkg/compute/store/boltdb/store.go
# Look for error handling patterns in UpdateExecutionState
rg -B 5 -A 10 "UpdateExecutionState" pkg/compute/store/boltdb/store.go
# Check for test cases related to UpdateExecutionState
rg -B 2 -A 5 "TestUpdateExecutionState" --type go
# Look for mock implementations of eventStore for testing
rg "type.*EventStore.*interface" --type go
Length of output: 1534
Script:
#!/bin/bash
# Let's examine the transaction and error handling implementation more thoroughly
# Check the complete UpdateExecutionState implementation
rg -B 20 -A 30 "func.*UpdateExecutionState" pkg/compute/store/boltdb/store.go
# Look for the EventStore interface and its implementation
rg -A 10 "type.*EventStore.*interface" pkg/lib/watcher/types.go
# Check for StoreEventTx implementation
rg -B 5 -A 10 "StoreEventTx" pkg/lib/watcher/
# Look for transaction commit/rollback handling
rg -B 2 -A 5 "database.Update" pkg/compute/store/boltdb/
Length of output: 6677
pkg/executor/docker/executor_test.go (2)
29-29
: LGTM!
The new import for messages package is correctly placed and necessary for the updated log stream request type.
Line range hint 522-527
: LGTM!
The test correctly uses the new ExecutionLogsRequest type with appropriate parameters to verify error handling for completed executions.
pkg/executor/docker/executor.go (3)
25-25
: LGTM!
The new import is correctly placed and necessary for the updated GetLogStream method signature.
248-248
: LGTM! Method signature updated correctly.
The change aligns with the broader refactoring of the messaging system while maintaining the existing functionality.
248-248
: Verify consistent implementation across executor types.
Let's verify that all executor implementations have been updated to use the new request type consistently.
✅ Verification successful
All executor implementations are consistent with the new request type
The verification shows that all executor implementations (docker
, wasm
, and noop
) have been properly updated to use the new messages.ExecutionLogsRequest
type in their GetLogStream
method signatures:
- pkg/executor/docker/executor.go
- pkg/executor/wasm/executor.go
- pkg/executor/noop/executor.go
The search results confirm that there are no remaining references to any old request type, and the new ExecutionLogsRequest
type is consistently used across the codebase, including tests and related components.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if all executor implementations use the new request type
# Test 1: Search for GetLogStream implementations
echo "=== GetLogStream implementations ==="
ast-grep --pattern 'func $$$GetLogStream(ctx context.Context, request $_) $_'
# Test 2: Search for any remaining references to the old type
echo -e "\n=== Old type references ==="
rg "LogStreamRequest"
Length of output: 255
Script:
#!/bin/bash
# Let's try a more comprehensive search approach
# 1. Find all executor implementation files
echo "=== Executor Implementation Files ==="
fd "executor.*\.go$"
# 2. Search for GetLogStream method with more context
echo -e "\n=== GetLogStream Method Implementations ==="
rg "func.*GetLogStream" -A 2
# 3. Search for the new ExecutionLogsRequest type usage
echo -e "\n=== ExecutionLogsRequest Usage ==="
rg "ExecutionLogsRequest"
# 4. Check the messages package for request types
echo -e "\n=== Request Types in Messages Package ==="
fd "messages.*\.go$" --exec cat {}
Length of output: 4522
pkg/lib/watcher/registry.go (6)
6-6
: Import "time" package for timeout functionality
The addition of import "time"
is necessary for handling timeouts in the shutdown process.
11-13
: Introduce DefaultShutdownTimeout
constant for configurable shutdown durations
Defining DefaultShutdownTimeout
improves code readability and allows for easy adjustments to the default shutdown timeout.
80-85
: Implement timeout context for graceful shutdown
Creating a timeout context when the parent context lacks a deadline ensures that the Stop
method doesn't hang indefinitely. This enhancement improves the robustness of the shutdown process.
88-96
: Safely snapshot watchers under read lock
Taking a snapshot of the current watchers while holding a read lock prevents race conditions during concurrent access. This approach ensures thread-safe operations when accessing shared resources.
98-106
: Concurrently stop all watchers using goroutines and WaitGroup
Starting a goroutine for each watcher and using a WaitGroup
to synchronize their completion is an efficient way to stop all watchers concurrently without blocking the main thread.
Line range hint 108-121
: Handle shutdown completion or timeout appropriately
Using a select
statement to wait for either the completion signal or the timeout ensures that the Stop
method handles timeouts gracefully. Logging a warning when the shutdown times out provides visibility into potential issues during the shutdown process.
pkg/test/compute/ask_for_bid_pre_approved_test.go (1)
120-121
:
Use Equalf
instead of Equal
for formatted messages
At lines 120-121, the s.Equal
method is used with a formatted string that includes format specifiers %s
. The s.Equal
method does not support formatting the message; it treats the message as a plain string. To include formatted messages in your assertion, you should use s.Equalf
.
Apply this diff to fix the issue:
-s.Equal(expectedState, retrievedExecution.ComputeState.StateType,
- "expected execution state %s but got %s", expectedState, retrievedExecution.ComputeState.StateType)
+s.Equalf(expectedState, retrievedExecution.ComputeState.StateType,
+ "expected execution state %s but got %s", expectedState, retrievedExecution.ComputeState.StateType)
Likely invalid or redundant comment.
pkg/node/compute.go (1)
267-267
: Address the repeated TODO: Add checkpointing to avoid missing events
This TODO comment is similar to the one on line 254. Ensure that checkpointing is implemented here as well to prevent missing events in this watcher registration.
As mentioned earlier, would you like assistance with implementing checkpointing for these watchers?
pkg/compute/executor.go (1)
452-452
: Delegation to executor's Cancel
method is appropriate
The Cancel
method now directly calls exe.Cancel(ctx, execution.ID)
, which streamlines the cancellation process by delegating it to the specific executor implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
pkg/compute/watchers/event_logger.go (2)
14-24
: Consider adding thread-safety documentation.While the implementation is clean, it would be helpful to document whether this type is safe for concurrent use, as logging is often accessed from multiple goroutines.
Add a comment like:
// ExecutionLogger handles logging of execution-related events with detailed state transition information +// This type is safe for concurrent use. type ExecutionLogger struct {
41-83
: Consider standardizing log levels for better observability.The code uses different log levels inconsistently:
- Base execution information uses Debug level
- Event details use Trace level
This split might make it difficult to follow the execution flow. Consider using the same log level (Debug) for both to maintain consistency, or document why different levels are used.
- eventLogger := e.logger.Trace(). + eventLogger := e.logger.Debug().pkg/compute/endpoint.go (1)
Line range hint
140-144
: Simplify log request forwarding.The current implementation creates a new request with the same fields. Consider simplifying by directly forwarding the request:
- return s.logServer.GetLogStream(ctx, messages.ExecutionLogsRequest{ - ExecutionID: request.ExecutionID, - Tail: request.Tail, - Follow: request.Follow, - }) + return s.logServer.GetLogStream(ctx, request)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
pkg/compute/endpoint.go
(7 hunks)pkg/compute/metrics.go
(0 hunks)pkg/compute/watchers/event_logger.go
(1 hunks)pkg/node/compute.go
(6 hunks)
💤 Files with no reviewable changes (1)
- pkg/compute/metrics.go
🔇 Additional comments (10)
pkg/compute/watchers/event_logger.go (2)
1-12
: LGTM: Clean package structure and imports.
The package organization and imports are well-structured and appropriate for the functionality.
72-83
: Fix event logging to properly log each event.
This issue was previously identified and remains unfixed. The current implementation accumulates data within the loop but logs only once after the loop completes, meaning only the last event's data is included.
pkg/compute/endpoint.go (4)
45-51
: Clean simplification of bid request handling.
The changes improve clarity by:
- Simplifying the WaitForApproval logic
- Adding contextual error logging
- Making the execution state flow more explicit
70-71
: LGTM: Consistent state handling across bid responses.
The addition of ExecutionStateAskForBidAccepted
to expected states is consistent across both bid acceptance and rejection flows.
Also applies to: 97-98
Line range hint 119-130
: Verify safety of unconditional cancellation.
The removal of state checks before cancellation could potentially allow invalid state transitions. Please verify that:
- All states can safely transition to
ExecutionStateCancelled
- No race conditions exist in concurrent cancellation scenarios
✅ Verification successful
Let me gather more information about the state transitions and execution model to verify the safety.
Let me check one more aspect about the execution store implementation to understand the concurrency guarantees at the storage level.
Based on the gathered information, I can now provide a final response:
Unconditional cancellation is safe in this context
The implementation is safe because:
ExecutionStateCancelled
is defined as a terminal state in the model, making it a valid final state for any execution- The execution store uses BoltDB with transactions, preventing race conditions during state updates
- The state model explicitly defines
ExecutionStateCancelled
as a valid terminal state throughIsTerminalState()
checks
The removal of state checks before cancellation is actually an improvement as it:
- Simplifies the cancellation logic
- Allows cancellation from any state, which is a desirable behavior for a cancel operation
- Maintains data consistency through the underlying transactional store
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for state transition constraints and cancellation handling
ast-grep --pattern 'ExecutionStateCancelled'
Length of output: 643
Script:
#!/bin/bash
# Check execution state model and transitions
ast-grep --pattern 'type ExecutionState'
# Look for any state validation or transition checks
rg "func.*State" pkg/models/execution.go -A 10
# Check for concurrent access patterns
rg "sync\." pkg/compute -A 5
Length of output: 2940
Script:
#!/bin/bash
# Check execution store implementation for transaction/locking patterns
rg "type.*ExecutionStore" -A 10
rg "UpdateExecutionState" -A 5
Length of output: 18662
18-18
: Verify LogServer value type usage.
The change from pointer to value type for LogServer
could impact memory usage and nil-safety checks. Ensure that:
- The
logstream.Server
type is designed to be safely copied - All callers are updated to handle the non-pointer type
Also applies to: 25-25
✅ Verification successful
Let me gather more information about the logstream.Server
type and its usage.
Let me try to find the logstream package definition and usage patterns.
Let me check the server implementation to understand if it's safe to use as a value type.
Let me check the Server interface definition to complete the analysis.
Change from pointer to value type for LogServer is safe
The change is safe because:
Server
is an interface type, not a concrete type- The actual implementation (
server
struct) is still used as a pointer when returned byNewServer
- Interface values in Go already contain internal pointers to the implementation
- No direct state is stored in the interface itself
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for LogServer pointer usage that might need updates
rg -t go "\\*logstream\\.Server"
Length of output: 643
Script:
#!/bin/bash
# Check the definition and implementation of logstream.Server
ast-grep --pattern 'type Server'
# Search for any LogServer field assignments or usage
rg -t go "LogServer"
# Check if logstream.Server implements any interfaces
rg -t go "interface.*Server"
Length of output: 252
Script:
#!/bin/bash
# Find logstream package files
fd -e go -x grep -l "package logstream" {}
# Look for logstream.Server instantiation
rg -t go "logstream\\.Server"
# Check for any method calls on LogServer
rg -t go "LogServer\\." -A 2
Length of output: 645
Script:
#!/bin/bash
# Check the server implementation
cat pkg/compute/logstream/server.go
# Look for any mutex or internal state in the Server type
rg -t go "type Server struct" -A 10 pkg/compute/logstream/server.go
# Check how NewServer is implemented
ast-grep --pattern 'func NewServer'
Length of output: 2661
Script:
#!/bin/bash
# Find Server interface definition
rg -t go "type Server interface" -A 5
# Check for any other files that might contain Server interface
fd -e go -x grep -l "type Server interface" {}
# Look for the types.go file in logstream package which might contain the interface
cat pkg/compute/logstream/types.go
Length of output: 1305
pkg/node/compute.go (4)
39-46
: LGTM: Well-structured additions to Compute struct
The new fields LogstreamServer
and Watchers
are properly typed and align well with the enhanced logging and event watching capabilities.
173-175
: LGTM: Proper initialization of new components
The initialization of LogServer and WatcherRegistry is well-placed in the initialization sequence and includes proper error handling.
Also applies to: 245-248
352-352
: LGTM: Clean bidder parameter updates
The addition of the Store field to BidderParams provides necessary access to execution state, while the simplified function signature better reflects the event-driven architecture.
366-366
: Verify watcher ID constants
The watcher IDs (computeExecutionLoggerWatcherID
, computeCallbackForwarderWatcherID
, computeExecutionHandlerWatcherID
) are used but not visible in the provided code. Ensure these constants are properly defined and exported if needed by other packages.
Also applies to: 374-374, 388-388
✅ Verification successful
Let me try a different search strategy to locate these constants.
The search results show that the watcher ID constants are defined in pkg/node/constants.go
. Let me verify their exact definitions.
Watcher ID constants properly defined and used
The watcher ID constants are properly defined in pkg/node/constants.go
as string constants with clear documentation:
computeExecutionLoggerWatcherID = "compute-execution-logger"
computeCallbackForwarderWatcherID = "compute-callback-forwarder"
computeExecutionHandlerWatcherID = "compute-execution-handler"
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for watcher ID constant definitions
rg -l "const.*computeExecutionLoggerWatcherID|const.*computeCallbackForwarderWatcherID|const.*computeExecutionHandlerWatcherID"
Length of output: 129
Script:
#!/bin/bash
# Search for watcher ID variables with broader context
rg "computeExecutionLoggerWatcherID|computeCallbackForwarderWatcherID|computeExecutionHandlerWatcherID" -A 2 -B 2
# Also search for any variable declarations containing these names
ast-grep --pattern 'var $_ = $_'
Length of output: 205500
Script:
#!/bin/bash
# Get the context around the watcher ID constants
rg -A 2 -B 2 "computeCallbackForwarderWatcherID|computeExecutionHandlerWatcherID|computeExecutionLoggerWatcherID" pkg/node/constants.go
Length of output: 875
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
This PR introduces async handling of executions in the compute node by decoupling the different components and fully relying on the executor store and watcher library to process executions.
In this implementation we are still using the compute Endpoint and Callbacks to interface with the orchestrator node to maintain backward compatibility. In a follow-up PR will introduce messaging through ncl library for full async communication.
The current flow looks like follows:
Summary by CodeRabbit
New Features
CallbackForwarder
.Bug Fixes
executionHandler
andBaseExecutor
.Refactor
RunBidding
method and its associated error handling.NodeID
field from bid strategy requests to simplify the data structure.Tests
Chores