From e73b3f30e85bc537848668930aa4a681c6f0b6ba Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 10 Dec 2024 14:19:40 -0800 Subject: [PATCH 01/10] WIP - Initial refactor of AsyncUploader Replace engine.Unit with component.Component - Arbitrarily set number of workers to 3 Uses a fifoqueue to buffer tasks (planned to replace with channel, since we would rather wait to upload than let a block be not uploaded). Does not correctly propagate errors, or calculate any new metrics yet. --- .../execution/ingestion/uploader/uploader.go | 131 +++++++++++++----- 1 file changed, 98 insertions(+), 33 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index f9486ea99d8..094de50314e 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -2,13 +2,17 @@ package uploader import ( "context" + "fmt" "time" "github.com/rs/zerolog" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/utils/logging" "github.com/sethvargo/go-retry" @@ -26,74 +30,135 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber uint64, log zerolog.Logger, metrics module.ExecutionMetrics) *AsyncUploader { - return &AsyncUploader{ - unit: engine.NewUnit(), + // TODO queue size, add length metrics and check error + queue, _ := fifoqueue.NewFifoQueue(1000) + a := &AsyncUploader{ uploader: uploader, log: log.With().Str("component", "block_data_uploader").Logger(), metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + queue: queue, + notifier: engine.NewNotifier(), } + builder := component.NewComponentManagerBuilder() + for range 3 { + builder.AddWorker(a.UploadWorker) + } + a.cm = builder.Build() + a.Component = a.cm + return a } // AsyncUploader wraps up another Uploader instance and make its upload asynchronous type AsyncUploader struct { - module.ReadyDoneAware - unit *engine.Unit uploader Uploader log zerolog.Logger metrics module.ExecutionMetrics retryInitialTimeout time.Duration maxRetryNumber uint64 onComplete OnCompleteFunc // callback function called after Upload is completed + queue *fifoqueue.FifoQueue + notifier engine.Notifier + cm *component.ComponentManager + component.Component + // TODO Replace fifoqueue with channel, and make Upload() blocking } -func (a *AsyncUploader) Ready() <-chan struct{} { - return a.unit.Ready() +// UploadWorker implements a component worker which asynchronously uploads computation results +// from the execution node (after a block is executed) to storage such as a GCP bucket or S3 bucket. +func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + + done := ctx.Done() + wake := a.notifier.Channel() + for { + select { + case <-done: + return + case <-wake: + err := a.processUploadTasks(ctx) + if err != nil { + ctx.Throw(err) + } + } + } } -func (a *AsyncUploader) Done() <-chan struct{} { - return a.unit.Done() +// processUploadTasks processes any available tasks from the queue. +// Only returns when the queue is empty (or the component is terminated). +// No errors expected during normal operation. +func (a *AsyncUploader) processUploadTasks(ctx context.Context) error { + for { + item, ok := a.queue.Pop() + if !ok { + return nil + } + + computationResult, ok := item.(*execution.ComputationResult) + if !ok { + return fmt.Errorf("invalid type in AsyncUploader queue") + } + + a.UploadTask(ctx, computationResult) + select { + case <-ctx.Done(): + return nil + default: + } + } } func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) { a.onComplete = onComplete } +// Upload adds the computation result to a queue to be processed asynchronously by workers, +// ensuring that multiple uploads can be run in parallel. +// No errors expected during normal operation. func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error { + if a.queue.Push(computationResult) { + a.notifier.Notify() + } else { + // TODO record in metrics + } + return nil +} +// UploadTask implements retrying for uploading computation results. +// When the upload is complete, the callback will be called with the result (for example, +// to record that the upload was successful) and any error. +// No errors expected during normal operation. +func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execution.ComputationResult) { backoff := retry.NewFibonacci(a.retryInitialTimeout) backoff = retry.WithMaxRetries(a.maxRetryNumber, backoff) - a.unit.Launch(func() { - a.metrics.ExecutionBlockDataUploadStarted() - start := time.Now() + a.metrics.ExecutionBlockDataUploadStarted() + start := time.Now() - a.log.Debug().Msgf("computation result of block %s is being uploaded", - computationResult.ExecutableBlock.ID().String()) - - err := retry.Do(a.unit.Ctx(), backoff, func(ctx context.Context) error { - err := a.uploader.Upload(computationResult) - if err != nil { - a.log.Warn().Err(err).Msg("error while uploading block data, retrying") - } - return retry.RetryableError(err) - }) + a.log.Debug().Msgf("computation result of block %s is being uploaded", + computationResult.ExecutableBlock.ID().String()) + err := retry.Do(ctx, backoff, func(ctx context.Context) error { + err := a.uploader.Upload(computationResult) if err != nil { - a.log.Error().Err(err). - Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). - Msg("failed to upload block data") - } else { - a.log.Debug().Msgf("computation result of block %s was successfully uploaded", - computationResult.ExecutableBlock.ID().String()) + a.log.Warn().Err(err).Msg("error while uploading block data, retrying") } + return retry.RetryableError(err) + }) - a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + if err != nil { + a.log.Error().Err(err). + Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). + Msg("failed to upload block data") + } else { + a.log.Debug().Msgf("computation result of block %s was successfully uploaded", + computationResult.ExecutableBlock.ID().String()) + } - if a.onComplete != nil { - a.onComplete(computationResult, err) - } - }) - return nil + a.metrics.ExecutionBlockDataUploadFinished(time.Since(start)) + + if a.onComplete != nil { + a.onComplete(computationResult, err) + } } From 9c57c77c1772cb258e0be1dda91cb133e2b5e473 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:02:02 -0800 Subject: [PATCH 02/10] Convert BadgerRetryableUploadWrapper to Component BadgerRetryableUploadWrapper wraps an AsyncUploader. Now that the AsyncUploader is a Component, it needs to be `Start()`ed. The retryable wrapper did not itself do anything special on ready/done, so the Component functionality is directly delegated to the wrapped AsyncUploader. --- .../uploader/retryable_uploader_wrapper.go | 11 +++-------- .../uploader/retryable_uploader_wrapper_test.go | 14 +++++++++++--- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go index ecad4801741..418177b4dc8 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper.go @@ -10,6 +10,7 @@ import ( "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/storage" @@ -34,6 +35,7 @@ type BadgerRetryableUploaderWrapper struct { results storage.ExecutionResults transactionResults storage.TransactionResults uploadStatusStore storage.ComputationResultUploadStatus + component.Component } func NewBadgerRetryableUploaderWrapper( @@ -99,17 +101,10 @@ func NewBadgerRetryableUploaderWrapper( results: results, transactionResults: transactionResults, uploadStatusStore: uploadStatusStore, + Component: uploader, // delegate to the AsyncUploader } } -func (b *BadgerRetryableUploaderWrapper) Ready() <-chan struct{} { - return b.uploader.Ready() -} - -func (b *BadgerRetryableUploaderWrapper) Done() <-chan struct{} { - return b.uploader.Done() -} - func (b *BadgerRetryableUploaderWrapper) Upload(computationResult *execution.ComputationResult) error { if computationResult == nil || computationResult.ExecutableBlock == nil || computationResult.ExecutableBlock.Block == nil { diff --git a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go index 491307705eb..adff343f61f 100644 --- a/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go +++ b/engine/execution/ingestion/uploader/retryable_uploader_wrapper_test.go @@ -1,6 +1,7 @@ package uploader import ( + "context" "sync" "testing" "time" @@ -11,6 +12,7 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/executiondatasync/execution_data" executionDataMock "github.com/onflow/flow-go/module/executiondatasync/execution_data/mock" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/mempool/entity" "github.com/onflow/flow-go/module/metrics" @@ -26,6 +28,8 @@ import ( ) func Test_Upload_invoke(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} uploaderCalled := false @@ -40,7 +44,7 @@ func Test_Upload_invoke(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) // nil input - no call to Upload() err := testRetryableUploaderWrapper.Upload(nil) @@ -58,6 +62,8 @@ func Test_Upload_invoke(t *testing.T) { } func Test_RetryUpload(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wg := sync.WaitGroup{} wg.Add(1) uploaderCalled := false @@ -72,7 +78,7 @@ func Test_RetryUpload(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) err := testRetryableUploaderWrapper.RetryUpload() wg.Wait() @@ -82,6 +88,8 @@ func Test_RetryUpload(t *testing.T) { } func Test_AsyncUploaderCallback(t *testing.T) { + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + defer cancel() wgUploadCalleded := sync.WaitGroup{} wgUploadCalleded.Add(1) @@ -95,7 +103,7 @@ func Test_AsyncUploaderCallback(t *testing.T) { 1*time.Nanosecond, 1, zerolog.Nop(), &metrics.NoopCollector{}) testRetryableUploaderWrapper := createTestBadgerRetryableUploaderWrapper(asyncUploader) - defer testRetryableUploaderWrapper.Done() + testRetryableUploaderWrapper.Start(ctx) testComputationResult := createTestComputationResult() err := testRetryableUploaderWrapper.Upload(testComputationResult) From f7709c8df59534dadfd8c2b6880c8ae4cab474e5 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:41:08 -0800 Subject: [PATCH 03/10] Update AsyncUploader tests Since AsyncUploader now implements Component instead of ReadyDoneAware, update the methods used to start and end the AsyncUploader (using a context and its cancel() function). Test "uploads are run in parallel" currently failing (due to only one worker taking tasks from the queue?) --- .../ingestion/uploader/uploader_test.go | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index c14fa683715..f4ee13883a4 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -2,6 +2,7 @@ package uploader import ( "bytes" + "context" "fmt" "runtime/debug" "sync" @@ -13,17 +14,17 @@ import ( "go.uber.org/atomic" "github.com/onflow/flow-go/engine/execution" - "github.com/onflow/flow-go/engine/execution/state/unittest" + exeunittest "github.com/onflow/flow-go/engine/execution/state/unittest" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" - testutils "github.com/onflow/flow-go/utils/unittest" - unittest2 "github.com/onflow/flow-go/utils/unittest" + "github.com/onflow/flow-go/utils/unittest" ) func Test_AsyncUploader(t *testing.T) { - computationResult := unittest.ComputationResultFixture( + computationResult := exeunittest.ComputationResultFixture( t, - testutils.IdentifierFixture(), + unittest.IdentifierFixture(), nil) t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) { @@ -46,6 +47,8 @@ func Test_AsyncUploader(t *testing.T) { metrics := &DummyCollector{} async := NewAsyncUploader(uploader, 1*time.Nanosecond, 1, zerolog.Nop(), metrics) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) @@ -63,6 +66,8 @@ func Test_AsyncUploader(t *testing.T) { wgContinueUpload.Done() //release all // shut down component + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader did not finish in time") <-async.Done() require.Equal(t, int64(0), metrics.Counter.Load()) @@ -89,6 +94,9 @@ func Test_AsyncUploader(t *testing.T) { } async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) + defer cancel() err := async.Upload(computationResult) require.NoError(t, err) @@ -107,7 +115,7 @@ func Test_AsyncUploader(t *testing.T) { // 2. shut down async uploader right after upload initiated (not completed) // 3. assert that upload called only once even when trying to use retry mechanism t.Run("stopping component stops retrying", func(t *testing.T) { - testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky") + unittest.SkipUnless(t, unittest.TEST_FLAKY, "flaky") callCount := 0 t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1])) @@ -151,6 +159,8 @@ func Test_AsyncUploader(t *testing.T) { } t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1])) async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{}) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1])) err := async.Upload(computationResult) // doesn't matter what we upload require.NoError(t, err) @@ -163,11 +173,11 @@ func Test_AsyncUploader(t *testing.T) { // stop component and check that it's fully stopped t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1])) - c := async.Done() + cancel() t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1])) wgShutdownStarted.Done() t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1])) - unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time") + unittest.RequireCloseBefore(t, async.Done(), 1*time.Second, "async uploader not closed in time") t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1])) require.Equal(t, 1, callCount) @@ -190,12 +200,15 @@ func Test_AsyncUploader(t *testing.T) { async.SetOnCompleteCallback(func(computationResult *execution.ComputationResult, err error) { onCompleteCallbackCalled = true }) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(t, context.Background()) + async.Start(ctx) err := async.Upload(computationResult) require.NoError(t, err) wgUploadCalleded.Wait() - <-async.Done() + cancel() + unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time") require.True(t, onCompleteCallbackCalled) }) From 69c99b059d9be9969b8df8cc39346ee10867ca6c Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 10:49:39 -0800 Subject: [PATCH 04/10] Replace AsyncUploader fifoqueue with channel Instead of using a notifier and fifoqueue with metrics, use a buffered channel. Block when the channel is full. (Reasoning: we want to ensure no uploads get dropped.) Buffer size of 100 was chosen arbitrarily. All AsyncUploader tests now pass. --- .../execution/ingestion/uploader/uploader.go | 49 ++----------------- 1 file changed, 5 insertions(+), 44 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index 094de50314e..d00eb7a2c9e 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -2,13 +2,10 @@ package uploader import ( "context" - "fmt" "time" "github.com/rs/zerolog" - "github.com/onflow/flow-go/engine" - "github.com/onflow/flow-go/engine/common/fifoqueue" "github.com/onflow/flow-go/engine/execution" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" @@ -30,16 +27,13 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber uint64, log zerolog.Logger, metrics module.ExecutionMetrics) *AsyncUploader { - // TODO queue size, add length metrics and check error - queue, _ := fifoqueue.NewFifoQueue(1000) a := &AsyncUploader{ uploader: uploader, log: log.With().Str("component", "block_data_uploader").Logger(), metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, - queue: queue, - notifier: engine.NewNotifier(), + queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() for range 3 { @@ -58,8 +52,7 @@ type AsyncUploader struct { retryInitialTimeout time.Duration maxRetryNumber uint64 onComplete OnCompleteFunc // callback function called after Upload is completed - queue *fifoqueue.FifoQueue - notifier engine.Notifier + queue chan *execution.ComputationResult cm *component.ComponentManager component.Component // TODO Replace fifoqueue with channel, and make Upload() blocking @@ -71,40 +64,12 @@ func (a *AsyncUploader) UploadWorker(ctx irrecoverable.SignalerContext, ready co ready() done := ctx.Done() - wake := a.notifier.Channel() for { select { case <-done: return - case <-wake: - err := a.processUploadTasks(ctx) - if err != nil { - ctx.Throw(err) - } - } - } -} - -// processUploadTasks processes any available tasks from the queue. -// Only returns when the queue is empty (or the component is terminated). -// No errors expected during normal operation. -func (a *AsyncUploader) processUploadTasks(ctx context.Context) error { - for { - item, ok := a.queue.Pop() - if !ok { - return nil - } - - computationResult, ok := item.(*execution.ComputationResult) - if !ok { - return fmt.Errorf("invalid type in AsyncUploader queue") - } - - a.UploadTask(ctx, computationResult) - select { - case <-ctx.Done(): - return nil - default: + case computationResult := <-a.queue: + a.UploadTask(ctx, computationResult) } } } @@ -117,11 +82,7 @@ func (a *AsyncUploader) SetOnCompleteCallback(onComplete OnCompleteFunc) { // ensuring that multiple uploads can be run in parallel. // No errors expected during normal operation. func (a *AsyncUploader) Upload(computationResult *execution.ComputationResult) error { - if a.queue.Push(computationResult) { - a.notifier.Notify() - } else { - // TODO record in metrics - } + a.queue <- computationResult return nil } From 72e87038f5b2ced753833e91d7449ad4d6c3439a Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 11:28:37 -0800 Subject: [PATCH 05/10] Update Component interface documentation Because Component's `Ready()` and `Done()` methods work differently from ReadyDoneAware's, include them in Component with Component-specific comments. Namely, specify that Components should be started with Start() and shutdown by canceling the context they were started with. --- module/component/component.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/module/component/component.go b/module/component/component.go index 34f8f61cf14..3f813ad0612 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -19,9 +19,25 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") // channels that close when startup and shutdown have completed. // Once Start has been called, the channel returned by Done must close eventually, // whether that be because of a graceful shutdown or an irrecoverable error. +// See also ComponentManager below. type Component interface { module.Startable - module.ReadyDoneAware + // Ready returns a ready channel that is closed once startup has completed. + // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // and only exposes information about the component's state. + // To start the component, instead use the Start() method. + // Note that the ready channel may never close if errors are encountered during startup, + // or if shutdown has already commenced before startup is complete. + // This should be an idempotent method. + Ready() <-chan struct{} + + // Done returns a done channel that is closed once shutdown has completed. + // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // and only exposes information about the component's state. + // To shutdown the component, instead cancel the context that was passed to Start(). + // Note that the done channel should be closed even if errors are encountered during shutdown. + // This should be an idempotent method. + Done() <-chan struct{} } type ComponentFactory func() (Component, error) From 75a6b280c12a33c4dc78e4f026f2f577c642cb5e Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 12 Dec 2024 12:36:05 -0800 Subject: [PATCH 06/10] fix Lint for golang 1.22 --- engine/execution/ingestion/uploader/uploader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d00eb7a2c9e..e888d9589aa 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -36,7 +36,7 @@ func NewAsyncUploader(uploader Uploader, queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() - for range 3 { + for i := 0; i < 3; i++ { builder.AddWorker(a.UploadWorker) } a.cm = builder.Build() From c7a85f73dadd600e6ab8166efbdd224889457124 Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Mon, 16 Dec 2024 08:55:51 -0800 Subject: [PATCH 07/10] Apply suggestions from code review Update/clarify doc comments Co-authored-by: Jordan Schalm --- engine/execution/ingestion/uploader/uploader.go | 3 ++- module/component/component.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index e888d9589aa..db8bbe93189 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -55,7 +55,6 @@ type AsyncUploader struct { queue chan *execution.ComputationResult cm *component.ComponentManager component.Component - // TODO Replace fifoqueue with channel, and make Upload() blocking } // UploadWorker implements a component worker which asynchronously uploads computation results @@ -108,6 +107,8 @@ func (a *AsyncUploader) UploadTask(ctx context.Context, computationResult *execu return retry.RetryableError(err) }) + // We only log upload errors here because the errors originate from an external cloud provider + // and the upload success is not critical to correct continued operation of the node if err != nil { a.log.Error().Err(err). Hex("block_id", logging.Entity(computationResult.ExecutableBlock)). diff --git a/module/component/component.go b/module/component/component.go index 3f813ad0612..e78b24fffec 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -23,7 +23,7 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") type Component interface { module.Startable // Ready returns a ready channel that is closed once startup has completed. - // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, // and only exposes information about the component's state. // To start the component, instead use the Start() method. // Note that the ready channel may never close if errors are encountered during startup, @@ -32,10 +32,10 @@ type Component interface { Ready() <-chan struct{} // Done returns a done channel that is closed once shutdown has completed. - // Unlike the previous ReadyDoneAware interface, it has no effect on the state of the component, + // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, // and only exposes information about the component's state. // To shutdown the component, instead cancel the context that was passed to Start(). - // Note that the done channel should be closed even if errors are encountered during shutdown. + // Implementations must close the done channel even if errors are encountered during shutdown. // This should be an idempotent method. Done() <-chan struct{} } From d3a1438462f96fc126cd50ba739c37ebb14d875b Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:03:45 -0800 Subject: [PATCH 08/10] Apply suggestions from code review Co-authored-by: Jordan Schalm --- engine/execution/ingestion/uploader/uploader.go | 2 ++ engine/execution/ingestion/uploader/uploader_test.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index db8bbe93189..d1dcabb0cbc 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -33,6 +33,8 @@ func NewAsyncUploader(uploader Uploader, metrics: metrics, retryInitialTimeout: retryInitialTimeout, maxRetryNumber: maxRetryNumber, + // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, + // but it is not acceptable to skip uploading an execution result queue: make(chan *execution.ComputationResult, 100), } builder := component.NewComponentManagerBuilder() diff --git a/engine/execution/ingestion/uploader/uploader_test.go b/engine/execution/ingestion/uploader/uploader_test.go index f4ee13883a4..73a72b91507 100644 --- a/engine/execution/ingestion/uploader/uploader_test.go +++ b/engine/execution/ingestion/uploader/uploader_test.go @@ -206,7 +206,7 @@ func Test_AsyncUploader(t *testing.T) { err := async.Upload(computationResult) require.NoError(t, err) - wgUploadCalleded.Wait() + unittest.AssertReturnsBefore(t, wgUploadCalleded.Wait, time.Second) cancel() unittest.AssertClosesBefore(t, async.Done(), 1*time.Second, "async uploader not done in time") From 602a4cc71fa2b3db17c209950d5f7f38044c0ec4 Mon Sep 17 00:00:00 2001 From: Tim Barry <21149133+tim-barry@users.noreply.github.com> Date: Fri, 20 Dec 2024 10:04:49 -0800 Subject: [PATCH 09/10] Apply suggestions from code review Channel size of 20000 and worker count of 100 suggested by Leo. 20000 is approximately equal to 4 hours of execution results. Co-authored-by: Leo Zhang --- engine/execution/ingestion/uploader/uploader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d1dcabb0cbc..d29a0d3a69c 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -35,10 +35,10 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber: maxRetryNumber, // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, // but it is not acceptable to skip uploading an execution result - queue: make(chan *execution.ComputationResult, 100), + queue: make(chan *execution.ComputationResult, 20000), } builder := component.NewComponentManagerBuilder() - for i := 0; i < 3; i++ { + for i := 0; i < 10; i++ { builder.AddWorker(a.UploadWorker) } a.cm = builder.Build() From cc03317752ce0993d856fae4caa78f7a1d4ca14f Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 20 Dec 2024 10:40:15 -0800 Subject: [PATCH 10/10] Update Component comments and fix lint --- engine/execution/ingestion/uploader/uploader.go | 2 +- module/component/component.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/engine/execution/ingestion/uploader/uploader.go b/engine/execution/ingestion/uploader/uploader.go index d29a0d3a69c..f5abca7e490 100644 --- a/engine/execution/ingestion/uploader/uploader.go +++ b/engine/execution/ingestion/uploader/uploader.go @@ -35,7 +35,7 @@ func NewAsyncUploader(uploader Uploader, maxRetryNumber: maxRetryNumber, // we use a channel rather than a Fifoqueue here because a Fifoqueue might drop items when full, // but it is not acceptable to skip uploading an execution result - queue: make(chan *execution.ComputationResult, 20000), + queue: make(chan *execution.ComputationResult, 20000), } builder := component.NewComponentManagerBuilder() for i := 0; i < 10; i++ { diff --git a/module/component/component.go b/module/component/component.go index e78b24fffec..11b543f239f 100644 --- a/module/component/component.go +++ b/module/component/component.go @@ -23,8 +23,8 @@ var ErrComponentShutdown = fmt.Errorf("component has already shut down") type Component interface { module.Startable // Ready returns a ready channel that is closed once startup has completed. - // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, - // and only exposes information about the component's state. + // Unlike the previous [module.ReadyDoneAware] interface, Ready does not start the component, + // but only exposes information about whether the component has completed startup. // To start the component, instead use the Start() method. // Note that the ready channel may never close if errors are encountered during startup, // or if shutdown has already commenced before startup is complete. @@ -32,8 +32,8 @@ type Component interface { Ready() <-chan struct{} // Done returns a done channel that is closed once shutdown has completed. - // Unlike the previous [module.ReadyDoneAware] interface, it has no effect on the state of the component, - // and only exposes information about the component's state. + // Unlike the previous [module.ReadyDoneAware] interface, Done does not shut down the component, + // but only exposes information about whether the component has shut down yet. // To shutdown the component, instead cancel the context that was passed to Start(). // Implementations must close the done channel even if errors are encountered during shutdown. // This should be an idempotent method.