diff --git a/builder/block_submission_rate_limiter.go b/builder/block_submission_rate_limiter.go new file mode 100644 index 000000000000..ec83cc90b3a8 --- /dev/null +++ b/builder/block_submission_rate_limiter.go @@ -0,0 +1,92 @@ +package builder + +import ( + "context" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/core/types" +) + +type blockRateLimitSubmission struct { + resultCh chan bool + block *types.Block +} + +type BlockSubmissionRateLimiter struct { + submissionsCh chan blockRateLimitSubmission + started uint32 + ctx context.Context + cancel context.CancelFunc +} + +func NewBlockSubmissionRateLimiter() *BlockSubmissionRateLimiter { + ctx, cancel := context.WithCancel(context.Background()) + r := &BlockSubmissionRateLimiter{ + submissionsCh: make(chan blockRateLimitSubmission), + started: uint32(0), + ctx: ctx, + cancel: cancel, + } + + return r +} +func (r *BlockSubmissionRateLimiter) Limit(block *types.Block) chan bool { + resultCh := make(chan bool, 1) + if atomic.LoadUint32(&r.started) != 1 { + resultCh <- true + return resultCh + } + + select { + case r.submissionsCh <- blockRateLimitSubmission{ + resultCh: resultCh, + block: block, + }: + case <-r.ctx.Done(): + resultCh <- true + } + return resultCh +} + +func (r *BlockSubmissionRateLimiter) Start() { + if !atomic.CompareAndSwapUint32(&r.started, 0, 1) { + return + } + + go r.rateLimit() +} + +func (r *BlockSubmissionRateLimiter) rateLimit() { + for r.ctx.Err() == nil { + // Beginning of the rate limit bucket + bestSubmission := <-r.submissionsCh + + bucketCutoffCh := time.After(100 * time.Millisecond) + + bucketClosed := false + for !bucketClosed { + select { + case <-r.ctx.Done(): + bucketClosed = true + break + case <-bucketCutoffCh: + bucketClosed = true + break + case newSubmission := <-r.submissionsCh: + if bestSubmission.block.Profit.Cmp(newSubmission.block.Profit) < 0 { + bestSubmission.resultCh <- false + bestSubmission = newSubmission + } else { + newSubmission.resultCh <- false + } + } + } + + bestSubmission.resultCh <- true + } +} + +func (r *BlockSubmissionRateLimiter) Stop() { + r.cancel() +} diff --git a/builder/block_submission_rate_limiter_test.go b/builder/block_submission_rate_limiter_test.go new file mode 100644 index 000000000000..9a0ce5b55d3c --- /dev/null +++ b/builder/block_submission_rate_limiter_test.go @@ -0,0 +1,72 @@ +package builder + +import ( + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" +) + +func TestLimit(t *testing.T) { + rl := NewBlockSubmissionRateLimiter() + + // Check that before starting requests are passed through + ch1 := rl.Limit(&types.Block{Profit: new(big.Int)}) + ch2 := rl.Limit(&types.Block{Profit: new(big.Int)}) + ch3 := rl.Limit(&types.Block{Profit: new(big.Int)}) + + time.Sleep(200 * time.Millisecond) + + for _, ch := range []chan bool{ch1, ch2, ch3} { + select { + case shouldSubmit := <-ch: + require.True(t, shouldSubmit) + default: + t.Error("chan was not ready") + } + } + + // Check that after starting requests are rate limited + rl.Start() + + // Check that before starting requests are passed through + ch1 = rl.Limit(&types.Block{Profit: new(big.Int)}) + ch2 = rl.Limit(&types.Block{Profit: new(big.Int)}) + ch3 = rl.Limit(&types.Block{Profit: big.NewInt(1)}) + + time.Sleep(200 * time.Millisecond) + + for _, ch := range []chan bool{ch1, ch2, ch3} { + select { + case shouldSubmit := <-ch: + if ch == ch3 { + require.True(t, shouldSubmit) + } else { + require.False(t, shouldSubmit) + } + default: + t.Error("chan was not ready") + } + } + + // Check that after stopping requests are passed through + rl.Stop() + + ch1 = rl.Limit(&types.Block{Profit: new(big.Int)}) + ch2 = rl.Limit(&types.Block{Profit: new(big.Int)}) + ch3 = rl.Limit(&types.Block{Profit: new(big.Int)}) + + time.Sleep(200 * time.Millisecond) + + for _, ch := range []chan bool{ch1, ch2, ch3} { + select { + case shouldSubmit := <-ch: + require.True(t, shouldSubmit) + default: + t.Error("chan was not ready") + } + } + +} diff --git a/builder/builder.go b/builder/builder.go index 1aa197d90040..b7287ecaac86 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -38,14 +38,17 @@ type IRelay interface { type IBuilder interface { OnPayloadAttribute(attrs *BuilderPayloadAttributes) error + Start() error + Stop() error } type Builder struct { - ds IDatabaseService - beaconClient IBeaconClient - relay IRelay - eth IEthereumService - resubmitter Resubmitter + ds IDatabaseService + beaconClient IBeaconClient + relay IRelay + eth IEthereumService + resubmitter Resubmitter + blockSubmissionRateLimiter *BlockSubmissionRateLimiter builderSecretKey *bls.SecretKey builderPublicKey boostTypes.PublicKey @@ -62,19 +65,30 @@ func NewBuilder(sk *bls.SecretKey, ds IDatabaseService, bc IBeaconClient, relay pk.FromSlice(pkBytes) return &Builder{ - ds: ds, - beaconClient: bc, - relay: relay, - eth: eth, - resubmitter: Resubmitter{}, - builderSecretKey: sk, - builderPublicKey: pk, + ds: ds, + beaconClient: bc, + relay: relay, + eth: eth, + resubmitter: Resubmitter{}, + blockSubmissionRateLimiter: NewBlockSubmissionRateLimiter(), + builderSecretKey: sk, + builderPublicKey: pk, builderSigningDomain: builderSigningDomain, bestBlockProfit: big.NewInt(0), } } +func (b *Builder) Start() error { + b.blockSubmissionRateLimiter.Start() + return nil +} + +func (b *Builder) Stop() error { + b.blockSubmissionRateLimiter.Stop() + return nil +} + func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBundle, proposerPubkey boostTypes.PublicKey, proposerFeeRecipient boostTypes.Address, attrs *BuilderPayloadAttributes) error { b.bestMu.Lock() defer b.bestMu.Unlock() @@ -116,6 +130,8 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun Value: *value, } + go b.ds.ConsumeBuiltBlock(block, bundles, &blockBidMsg) + signature, err := boostTypes.SignMessage(&blockBidMsg, b.builderSigningDomain, b.builderSecretKey) if err != nil { log.Error("could not sign builder bid", "err", err) @@ -134,9 +150,9 @@ func (b *Builder) onSealedBlock(block *types.Block, bundles []types.SimulatedBun return err } - b.bestBlockProfit.Set(block.Profit) + log.Info("submitted block", "header", block.Header(), "bid", blockBidMsg) - go b.ds.ConsumeBuiltBlock(block, bundles, &blockBidMsg) + b.bestBlockProfit.Set(block.Profit) return nil } @@ -171,6 +187,16 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error { } blockHook := func(block *types.Block, bundles []types.SimulatedBundle) { + select { + case shouldSubmit := <-b.blockSubmissionRateLimiter.Limit(block): + if !shouldSubmit { + log.Info("Block rate limited", "blochHash", block.Hash()) + return + } + case <-time.After(200 * time.Millisecond): + log.Info("Block rate limit timeout, submitting the block anyway") + } + err := b.onSealedBlock(block, bundles, proposerPubkey, vd.FeeRecipient, attrs) if err != nil { log.Error("could not run sealed block hook", "err", err) @@ -178,6 +204,7 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error { } firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error { + log.Info("Resubmitting build job") return b.eth.BuildBlock(attrs, blockHook) }) diff --git a/builder/builder_test.go b/builder/builder_test.go index 8bd98aab7b1f..2f143bf44626 100644 --- a/builder/builder_test.go +++ b/builder/builder_test.go @@ -75,8 +75,11 @@ func TestOnPayloadAttributes(t *testing.T) { testEthService := &testEthereumService{synced: true, testExecutableData: testExecutableData, testBlock: testBlock} builder := NewBuilder(sk, NilDbService{}, &testBeacon, &testRelay, bDomain, testEthService) + builder.Start() + defer builder.Stop() - builder.OnPayloadAttribute(testPayloadAttributes) + err = builder.OnPayloadAttribute(testPayloadAttributes) + require.NoError(t, err) require.NotNil(t, testRelay.submittedMsg) expectedProposerPubkey, err := boostTypes.HexToPubkey(testBeacon.validator.Pk.String()) diff --git a/builder/relay.go b/builder/relay.go index 432c03bfd024..4d9e192c4cc6 100644 --- a/builder/relay.go +++ b/builder/relay.go @@ -150,8 +150,6 @@ func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest) err return fmt.Errorf("non-ok response code %d from relay ", code) } - log.Info("submitted block", "msg", msg) - if r.localRelay != nil { r.localRelay.SubmitBlock(msg) } diff --git a/builder/service.go b/builder/service.go index 131b8d5640e9..0d26cd6e48ae 100644 --- a/builder/service.go +++ b/builder/service.go @@ -41,11 +41,23 @@ type Service struct { builder IBuilder } -func (s *Service) Start() { +func (s *Service) Start() error { if s.srv != nil { log.Info("Service started") go s.srv.ListenAndServe() } + + s.builder.Start() + + return nil +} + +func (s *Service) Stop() error { + if s.srv != nil { + s.srv.Close() + } + s.builder.Stop() + return nil } func (s *Service) PayloadAttributes(payloadAttributes *BuilderPayloadAttributes) error { @@ -170,7 +182,6 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error builderBackend := NewBuilder(builderSk, ds, beaconClient, relay, builderSigningDomain, ethereumService) builderService := NewService(cfg.ListenAddr, localRelay, builderBackend) - builderService.Start() stack.RegisterAPIs([]rpc.API{ { @@ -181,5 +192,8 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *BuilderConfig) error Authenticated: true, }, }) + + stack.RegisterLifecycle(builderService) + return nil }