Skip to content

Commit

Permalink
Do fatal when datastream channel is full (workaround to fix datastrea…
Browse files Browse the repository at this point in the history
…m blocking issue) (#3650)

* Do fatal when datastream channel is full (this will restart sequencer automatically)

* update datastream library (more ds-debug logs)
  • Loading branch information
agnusmor authored May 27, 2024
1 parent 1b091ec commit eb155ae
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 43 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
2 changes: 1 addition & 1 deletion sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat
}

// Send batch bookmark to the datastream
f.DSSendBatchBookmark(batchNumber)
f.DSSendBatchBookmark(ctx, batchNumber)

// Check if synchronizer is up-to-date
//TODO: review if this is needed
Expand Down
23 changes: 19 additions & 4 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package sequencer

import (
"github.com/0xPolygonHermez/zkevm-node/log"
"context"
"fmt"

"github.com/0xPolygonHermez/zkevm-node/state"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
Expand Down Expand Up @@ -43,23 +45,36 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
l2Transactions = append(l2Transactions, l2Transaction)
}

log.Infof("[ds-debug] sending l2block %d to datastream channel", blockResponse.BlockNumber)
f.checkDSBufferIsFull(ctx)

f.dataToStream <- state.DSL2FullBlock{
DSL2Block: l2Block,
Txs: l2Transactions,
}

f.dataToStreamCount.Add(1)
}

return nil
}

func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)

// Send batch bookmark to the streamer
f.dataToStream <- state.DSBookMark{
Type: state.BookMarkTypeBatch,
Value: batchNumber,
}

f.dataToStreamCount.Add(1)
}
}

func (f *finalizer) checkDSBufferIsFull(ctx context.Context) {
if f.dataToStreamCount.Load() == datastreamChannelBufferSize {
f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true)
}
}
10 changes: 8 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type finalizer struct {
// interval metrics
metrics *intervalMetrics
// stream server
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
dataToStreamCount atomic.Int32
}

// newFinalizer returns a new instance of Finalizer.
Expand Down Expand Up @@ -885,6 +886,11 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
counters.Binaries, counters.Sha256Hashes_V2, counters.Steps)
}

// Decrease datastreamChannelCount variable
func (f *finalizer) DatastreamChannelCountAdd(ct int32) {
f.dataToStreamCount.Add(ct)
}

// Halt halts the finalizer
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
11 changes: 1 addition & 10 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,9 +479,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
return err
}

//TODO: remove this Log
log.Infof("[ds-debug] l2 block %d [%d] stored in statedb", blockResponse.BlockNumber, l2Block.trackingNum)

// Update txs status in the pool
for _, txResponse := range blockResponse.TransactionResponses {
// Change Tx status to selected
Expand All @@ -491,19 +488,13 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)

// Send L2 block to data streamer
err = f.DSSendL2Block(l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] sent to datastream", blockResponse.BlockNumber, l2Block.trackingNum)

for _, tx := range l2Block.transactions {
// Delete the tx from the pending list in the worker (addrQueue)
f.workerIntf.DeleteTxPendingToStore(tx.Hash, tx.From)
Expand Down
26 changes: 4 additions & 22 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
datastreamChannelMultiplier = 2
datastreamChannelBufferSize = 10
)

// Sequencer represents a sequencer
Expand Down Expand Up @@ -59,9 +59,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
eventLog: eventLog,
}

// TODO: Make configurable
channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
sequencer.dataToStream = make(chan interface{}, channelBufferSize)
sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)

return sequencer, nil
}
Expand Down Expand Up @@ -270,8 +268,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
case state.DSL2FullBlock:
l2Block := data

//TODO: remove this log
log.Infof("[ds-debug] start atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.StartAtomicOp()
if err != nil {
log.Errorf("failed to start atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
Expand All @@ -283,8 +279,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber,
}

//TODO: remove this log
log.Infof("[ds-debug] add stream bookmark for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamBookmark(bookMark.Encode())
if err != nil {
log.Errorf("failed to add stream bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err)
Expand All @@ -299,8 +293,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber - 1,
}

//TODO: remove this log
log.Infof("[ds-debug] get previous l2block %d", l2Block.L2BlockNumber-1)
previousL2BlockEntry, err := s.streamServer.GetFirstEventAfterBookmark(bookMark.Encode())
if err != nil {
log.Errorf("failed to get previous l2block %d, error: %v", l2Block.L2BlockNumber-1, err)
Expand All @@ -323,16 +315,12 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
ChainID: uint32(chainID),
}

//TODO: remove this log
log.Infof("[ds-debug] add l2blockStart stream entry for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
if err != nil {
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] adding l2tx stream entries for l2block %d", l2Block.L2BlockNumber)
for _, l2Transaction := range l2Block.Txs {
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
Expand All @@ -347,25 +335,17 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
StateRoot: l2Block.StateRoot,
}

//TODO: remove this log
log.Infof("[ds-debug] add l2blockEnd stream entry for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode())
if err != nil {
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] commit atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] l2block %d sent to datastream", l2Block.L2BlockNumber)

// Stream a bookmark
case state.DSBookMark:
bookmark := data
Expand All @@ -392,6 +372,8 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
log.Errorf("invalid stream message type received")
}
}

s.finalizer.DatastreamChannelCountAdd(-1)
}
}

Expand Down

0 comments on commit eb155ae

Please sign in to comment.