Skip to content

Commit

Permalink
fix(validator): add real-time update logic for terminate index of Val…
Browse files Browse the repository at this point in the history
…idatorPool
  • Loading branch information
sm-stack committed Apr 21, 2024
1 parent b97ee89 commit 9f98e21
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
16 changes: 15 additions & 1 deletion kroma-validator/challenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ type Challenger struct {
l2BlockTime *big.Int
checkpoint *big.Int
requiredBondAmount *big.Int
isValManagerEnabled bool

isValManagerEnabled bool
terminationIndex *big.Int

l2OutputSubmittedSub ethereum.Subscription
challengeCreatedSub ethereum.Subscription
Expand Down Expand Up @@ -176,6 +178,14 @@ func (c *Challenger) InitConfig(ctx context.Context) error {
}
c.isValManagerEnabled = isTerminated

cCtx, cCancel = context.WithTimeout(ctx, c.cfg.NetworkTimeout)
defer cCancel()
terminationIndex, err := c.valpoolContract.TERMINATEOUTPUTINDEX(optsutils.NewSimpleCallOpts(cCtx))
if err != nil {
return fmt.Errorf("failed to get termination index: %w", err)
}
c.terminationIndex = terminationIndex

return nil
})
if err != nil {
Expand Down Expand Up @@ -370,6 +380,10 @@ func (c *Challenger) subscribeL2OutputSubmitted() {
select {
case ev := <-c.l2OutputSubmittedEventChan:
c.log.Info("watched output submitted event", "l2BlockNumber", ev.L2BlockNumber, "outputRoot", ev.OutputRoot, "outputIndex", ev.L2OutputIndex)
// if the emitted output index is greater than the termination output index, set the config to use the ValidatorManager
if ev.L2OutputIndex.Cmp(c.terminationIndex) > 0 {
c.isValManagerEnabled = true
}
// if the emitted output index is less than or equal to the checkpoint, it is considered reorg occurred.
if ev.L2OutputIndex.Cmp(c.checkpoint) <= 0 {
c.wg.Add(1)
Expand Down
52 changes: 49 additions & 3 deletions kroma-validator/l2_output_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/event"
"math/big"
_ "net/http/pprof"
"sync"
Expand Down Expand Up @@ -42,7 +43,7 @@ type L2OutputSubmitter struct {
log log.Logger
metr metrics.Metricer

l2ooContract *bindings.L2OutputOracleCaller
l2ooContract *bindings.L2OutputOracle
l2ooABI *abi.ABI
valpoolContract *bindings.ValidatorPoolCaller
valManagerContract *bindings.ValidatorManagerCaller
Expand All @@ -53,15 +54,19 @@ type L2OutputSubmitter struct {
requiredBondAmount *big.Int

isValManagerEnabled bool
terminationIndex *big.Int

submitChan chan struct{}
outputSubmittedSub ethereum.Subscription

submitChan chan struct{}
l2OutputSubmittedEventChan chan *bindings.L2OutputOracleOutputSubmitted

wg sync.WaitGroup
}

// NewL2OutputSubmitter creates a new L2OutputSubmitter.
func NewL2OutputSubmitter(cfg Config, l log.Logger, m metrics.Metricer) (*L2OutputSubmitter, error) {
l2ooContract, err := bindings.NewL2OutputOracleCaller(cfg.L2OutputOracleAddr, cfg.L1Client)
l2ooContract, err := bindings.NewL2OutputOracle(cfg.L2OutputOracleAddr, cfg.L1Client)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,6 +157,14 @@ func (l *L2OutputSubmitter) InitConfig(ctx context.Context) error {
}
l.isValManagerEnabled = isTerminated

cCtx, cCancel = context.WithTimeout(ctx, l.cfg.NetworkTimeout)
defer cCancel()
terminationIndex, err := l.valpoolContract.TERMINATEOUTPUTINDEX(optsutils.NewSimpleCallOpts(cCtx))
if err != nil {
return fmt.Errorf("failed to get termination index: %w", err)
}
l.terminationIndex = terminationIndex

return nil
})
if err != nil {
Expand All @@ -161,13 +174,29 @@ func (l *L2OutputSubmitter) InitConfig(ctx context.Context) error {
return nil
}

func (l *L2OutputSubmitter) initSub() {
opts := optsutils.NewSimpleWatchOpts(l.ctx)

l.l2OutputSubmittedEventChan = make(chan *bindings.L2OutputOracleOutputSubmitted)
l.outputSubmittedSub = event.ResubscribeErr(time.Second*10, func(ctx context.Context, err error) (event.Subscription, error) {
if err != nil {
l.log.Warn("resubscribing after failed OutputSubmitted event", "err", err)
}
return l.l2ooContract.WatchOutputSubmitted(opts, l.l2OutputSubmittedEventChan, nil, nil, nil)
})
}

func (l *L2OutputSubmitter) Start(ctx context.Context) error {
l.ctx, l.cancel = context.WithCancel(ctx)
l.submitChan = make(chan struct{}, 1)

if err := l.InitConfig(l.ctx); err != nil {
return err
}
l.initSub()

l.wg.Add(1)
go l.subscribeL2OutputSubmitted()

l.wg.Add(1)
go l.loop()
Expand Down Expand Up @@ -196,6 +225,23 @@ func (l *L2OutputSubmitter) loop() {
}
}

func (l *L2OutputSubmitter) subscribeL2OutputSubmitted() {
defer l.wg.Done()

for {
select {
case ev := <-l.l2OutputSubmittedEventChan:
l.log.Info("watched output submitted event", "l2BlockNumber", ev.L2BlockNumber, "outputRoot", ev.OutputRoot, "outputIndex", ev.L2OutputIndex)
// if the emitted output index is greater than the termination output index, set the config to use the ValidatorManager
if ev.L2OutputIndex.Cmp(l.terminationIndex) > 0 {
l.isValManagerEnabled = true
}
case <-l.ctx.Done():
return
}
}
}

func (l *L2OutputSubmitter) retryAfter(d time.Duration) {
l.wg.Add(1)

Expand Down

0 comments on commit 9f98e21

Please sign in to comment.