Skip to content
This repository has been archived by the owner on Feb 17, 2025. It is now read-only.

new fields in stream #3149

Merged
merged 12 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

31 changes: 23 additions & 8 deletions docs/config-file/node-config-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -1992,14 +1992,15 @@ SequentialProcessL2Block=true
**Type:** : `object`
**Description:** StreamServerCfg is the config for the stream server

| Property | Pattern | Type | Deprecated | Definition | Title/Description |
| ----------------------------------------------- | ------- | ------- | ---------- | ---------- | ----------------------------------------------------- |
| - [Port](#Sequencer_StreamServer_Port ) | No | integer | No | - | Port to listen on |
| - [Filename](#Sequencer_StreamServer_Filename ) | No | string | No | - | Filename of the binary data file |
| - [Version](#Sequencer_StreamServer_Version ) | No | integer | No | - | Version of the binary data file |
| - [ChainID](#Sequencer_StreamServer_ChainID ) | No | integer | No | - | ChainID is the chain ID |
| - [Enabled](#Sequencer_StreamServer_Enabled ) | No | boolean | No | - | Enabled is a flag to enable/disable the data streamer |
| - [Log](#Sequencer_StreamServer_Log ) | No | object | No | - | Log is the log configuration |
| Property | Pattern | Type | Deprecated | Definition | Title/Description |
| ----------------------------------------------------------------------------- | ------- | ------- | ---------- | ---------- | ---------------------------------------------------------------- |
| - [Port](#Sequencer_StreamServer_Port ) | No | integer | No | - | Port to listen on |
| - [Filename](#Sequencer_StreamServer_Filename ) | No | string | No | - | Filename of the binary data file |
| - [Version](#Sequencer_StreamServer_Version ) | No | integer | No | - | Version of the binary data file |
| - [ChainID](#Sequencer_StreamServer_ChainID ) | No | integer | No | - | ChainID is the chain ID |
| - [Enabled](#Sequencer_StreamServer_Enabled ) | No | boolean | No | - | Enabled is a flag to enable/disable the data streamer |
| - [Log](#Sequencer_StreamServer_Log ) | No | object | No | - | Log is the log configuration |
| - [UpgradeEtrogBatchNumber](#Sequencer_StreamServer_UpgradeEtrogBatchNumber ) | No | integer | No | - | UpgradeEtrogBatchNumber is the batch number of the upgrade etrog |

#### <a name="Sequencer_StreamServer_Port"></a>10.8.1. `Sequencer.StreamServer.Port`

Expand Down Expand Up @@ -2123,6 +2124,20 @@ Must be one of:

**Type:** : `array of string`

#### <a name="Sequencer_StreamServer_UpgradeEtrogBatchNumber"></a>10.8.7. `Sequencer.StreamServer.UpgradeEtrogBatchNumber`

**Type:** : `integer`

**Default:** `0`

**Description:** UpgradeEtrogBatchNumber is the batch number of the upgrade etrog

**Example setting the default value** (0):
```
[Sequencer.StreamServer]
UpgradeEtrogBatchNumber=0
```

## <a name="SequenceSender"></a>11. `[SequenceSender]`

**Type:** : `object`
Expand Down
5 changes: 5 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,11 @@
"additionalProperties": false,
"type": "object",
"description": "Log is the log configuration"
},
"UpgradeEtrogBatchNumber": {
"type": "integer",
"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
"default": 0
}
},
"additionalProperties": false,
Expand Down
2 changes: 2 additions & 0 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type StreamServerCfg struct {
Enabled bool `mapstructure:"Enabled"`
// Log is the log configuration
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
}

// FinalizerCfg contains the finalizer's configuration properties
Expand Down
21 changes: 11 additions & 10 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,22 @@ import (
"github.com/0xPolygonHermez/zkevm-node/state"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse) error {
func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
if f.streamServer != nil {
l2Block := state.DSL2Block{
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
BatchNumber: batchNumber,
L2BlockNumber: blockResponse.BlockNumber,
Timestamp: int64(blockResponse.Timestamp),
L1InfoTreeIndex: l1InfoTreeIndex,
L1BlockHash: blockResponse.BlockHashL1,
GlobalExitRoot: blockResponse.GlobalExitRoot,
Coinbase: f.sequencerAddress,
ForkID: uint16(forkID),
BlockHash: blockResponse.BlockHash,
StateRoot: blockResponse.BlockHash, //From etrog, the blockhash is the block root
}

l2Transactions := []state.DSL2Transaction{}
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, batchR
}

// Send L2 block to data streamer
err = f.DSSendL2Block(batchResponse.NewBatchNumber, forcedL2BlockResponse)
err = f.DSSendL2Block(batchResponse.NewBatchNumber, forcedL2BlockResponse, 0)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}

// Send L2 block to data streamer
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse)
err = f.DSSendL2Block(f.wipBatch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
Expand Down
44 changes: 32 additions & 12 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,13 @@ func (s *Sequencer) Start(ctx context.Context) {
log.Fatalf("failed to start stream server, error: %v", err)
}

s.updateDataStreamerFile(ctx)
s.updateDataStreamerFile(ctx, s.cfg.StreamServer.ChainID)
}

go s.loadFromPool(ctx)

if s.streamServer != nil {
go s.sendDataToStreamer()
go s.sendDataToStreamer(s.cfg.StreamServer.ChainID)
}

s.worker = NewWorker(s.stateIntf, s.batchCfg.Constraints)
Expand Down Expand Up @@ -129,8 +129,8 @@ func (s *Sequencer) checkStateInconsistency(ctx context.Context) {
}
}

func (s *Sequencer) updateDataStreamerFile(ctx context.Context) {
err := state.GenerateDataStreamerFile(ctx, s.streamServer, s.stateIntf, true, nil)
func (s *Sequencer) updateDataStreamerFile(ctx context.Context, chainID uint64) {
err := state.GenerateDataStreamerFile(ctx, s.streamServer, s.stateIntf, true, nil, chainID, s.cfg.StreamServer.UpgradeEtrogBatchNumber)
if err != nil {
log.Fatalf("failed to generate data streamer file, error: %v", err)
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (s *Sequencer) addTxToWorker(ctx context.Context, tx pool.Transaction) erro
}

// sendDataToStreamer sends data to the data stream server
func (s *Sequencer) sendDataToStreamer() {
func (s *Sequencer) sendDataToStreamer(chainID uint64) {
var err error
for {
// Read error from previous iteration
Expand Down Expand Up @@ -259,14 +259,34 @@ func (s *Sequencer) sendDataToStreamer() {
continue
}

// Get previous block timestamp to calculate delta timestamp
previousL2Block := state.DSL2BlockStart{}
if l2Block.L2BlockNumber > 0 {
bookMark = state.DSBookMark{
Type: state.BookMarkTypeL2Block,
Value: l2Block.L2BlockNumber - 1,
}

previousL2BlockEntry, err := s.streamServer.GetFirstEventAfterBookmark(bookMark.Encode())
if err != nil {
log.Errorf("failed to get previous l2block %d, error: %v", l2Block.L2BlockNumber-1, err)
continue
}

previousL2Block = state.DSL2BlockStart{}.Decode(previousL2BlockEntry.Data)
}

blockStart := state.DSL2BlockStart{
BatchNumber: l2Block.BatchNumber,
L2BlockNumber: l2Block.L2BlockNumber,
Timestamp: l2Block.Timestamp,
L1BlockHash: l2Block.L1BlockHash,
GlobalExitRoot: l2Block.GlobalExitRoot,
Coinbase: l2Block.Coinbase,
ForkID: l2Block.ForkID,
BatchNumber: l2Block.BatchNumber,
L2BlockNumber: l2Block.L2BlockNumber,
Timestamp: l2Block.Timestamp,
DeltaTimestamp: uint32(l2Block.Timestamp - previousL2Block.Timestamp),
L1InfoTreeIndex: l2Block.L1InfoTreeIndex,
L1BlockHash: l2Block.L1BlockHash,
GlobalExitRoot: l2Block.GlobalExitRoot,
Coinbase: l2Block.Coinbase,
ForkID: l2Block.ForkID,
ChainID: uint32(chainID),
}

_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
Expand Down
Loading
Loading