Skip to content

Commit

Permalink
Resubmit block build job periodically (ethereum#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ruteri authored and avalonche committed Mar 8, 2023
1 parent 5118498 commit 30381d5
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 6 deletions.
25 changes: 19 additions & 6 deletions builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package builder
import (
"errors"
_ "os"
"time"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/beacon"
Expand Down Expand Up @@ -41,6 +42,7 @@ type Builder struct {
beaconClient IBeaconClient
relay IRelay
eth IEthereumService
resubmitter Resubmitter

builderSecretKey *bls.SecretKey
builderPublicKey boostTypes.PublicKey
Expand All @@ -56,6 +58,7 @@ func NewBuilder(sk *bls.SecretKey, bc IBeaconClient, relay IRelay, builderSignin
beaconClient: bc,
relay: relay,
eth: eth,
resubmitter: Resubmitter{},
builderSecretKey: sk,
builderPublicKey: pk,

Expand Down Expand Up @@ -140,13 +143,23 @@ func (b *Builder) OnPayloadAttribute(attrs *BuilderPayloadAttributes) error {
return errors.New("parent block not found in blocktree")
}

executableData, block := b.eth.BuildBlock(attrs)
if executableData == nil || block == nil {
log.Error("did not receive the payload")
return errors.New("could not build block")
}
firstBlockResult := b.resubmitter.newTask(12*time.Second, time.Second, func() error {
executableData, block := b.eth.BuildBlock(attrs)
if executableData == nil || block == nil {
log.Error("did not receive the payload")
return errors.New("did not receive the payload")
}

err := b.onSealedBlock(executableData, block, proposerPubkey, vd.FeeRecipient, attrs.Slot)
if err != nil {
log.Error("could not run block hook", "err", err)
return err
}

return nil
})

return b.onSealedBlock(executableData, block, proposerPubkey, vd.FeeRecipient, attrs.Slot)
return firstBlockResult
}

func executableDataToExecutionPayload(data *beacon.ExecutableDataV1) (*boostTypes.ExecutionPayload, error) {
Expand Down
7 changes: 7 additions & 0 deletions builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package builder
import (
"math/big"
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -120,4 +121,10 @@ func TestOnPayloadAttributes(t *testing.T) {
require.Equal(t, expectedSignature, testRelay.submittedMsg.Signature)

require.Equal(t, uint64(25), testRelay.requestedSlot)

// Clear the submitted message and check that the job will be ran again and a new message will be submitted
testRelay.submittedMsg = nil
time.Sleep(2 * time.Second)
require.NotNil(t, testRelay.submittedMsg)
require.Equal(t, expectedMessage, *testRelay.submittedMsg.Message)
}
42 changes: 42 additions & 0 deletions builder/resubmitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package builder

import (
"context"
"sync"
"time"
)

type Resubmitter struct {
mu sync.Mutex
cancel context.CancelFunc
}

func (r *Resubmitter) newTask(repeatFor time.Duration, interval time.Duration, fn func() error) error {
repeatUntilCh := time.After(repeatFor)

r.mu.Lock()
if r.cancel != nil {
r.cancel()
}
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
r.mu.Unlock()

firstRunErr := fn()

go func() {
for ctx.Err() == nil {
select {
case <-ctx.Done():
return
case <-repeatUntilCh:
cancel()
return
case <-time.After(interval):
fn()
}
}
}()

return firstRunErr
}
56 changes: 56 additions & 0 deletions builder/resubmitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package builder

import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestResubmitter(t *testing.T) {

resubmitter := Resubmitter{}

pingCh := make(chan error)
go func() {
res := resubmitter.newTask(time.Second, 100*time.Millisecond, func() error {
return <-pingCh
})
require.ErrorContains(t, res, "xx")
}()

select {
case pingCh <- errors.New("xx"):
case <-time.After(time.Second):
t.Error("timeout waiting for the function")
}

select {
case pingCh <- nil:
t.Error("function restarted too soon")
default:
}

time.Sleep(200 * time.Millisecond)

select {
case pingCh <- nil:
default:
t.Error("function restarted too late")
}

time.Sleep(800 * time.Millisecond)

select {
case pingCh <- nil:
default:
t.Error("function restarted too late")
}

select {
case pingCh <- nil:
t.Error("function restarted after deadline")
case <-time.After(200 * time.Millisecond):
}
}

0 comments on commit 30381d5

Please sign in to comment.