Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log events #1905

Merged
merged 12 commits into from
Mar 23, 2023
47 changes: 41 additions & 6 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/math"
"github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -40,6 +41,12 @@ type Pool struct {
minGasPrice *big.Int
}

type preexecutionResponse struct {
usedZkCounters state.ZKCounters
isOOC bool
isOOG bool
}

// NewPool creates and initializes an instance of Pool
func NewPool(cfg Config, s storage, st stateInterface, l2BridgeAddr common.Address, chainID uint64) *Pool {
return &Pool{
Expand Down Expand Up @@ -75,36 +82,64 @@ func (p *Pool) StoreTx(ctx context.Context, tx types.Transaction, ip string, isW
poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)

// Execute transaction to calculate its zkCounters
zkCounters, err, isOOC := p.PreExecuteTx(ctx, tx)
preexecutionResponse, err := p.PreExecuteTx(ctx, tx)
if err != nil {
log.Debugf("PreExecuteTx error (this can be ignored): %v", err)

if isOOC {
if preexecutionResponse.isOOC {
event := &state.Event{
EventType: state.EventType_Prexecution_OOC,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
// Do not add tx to the pool
return fmt.Errorf("out of counters")
} else if preexecutionResponse.isOOG {
event := &state.Event{
EventType: state.EventType_Prexecution_OOG,
Timestamp: time.Now(),
IP: ip,
TxHash: tx.Hash(),
}

err := p.state.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
}
}
poolTx.ZKCounters = zkCounters
poolTx.ZKCounters = preexecutionResponse.usedZkCounters

return p.storage.AddTx(ctx, poolTx)
}

// PreExecuteTx executes a transaction to calculate its zkCounters
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZKCounters, error, bool) {
func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (preexecutionResponse, error) {
response := preexecutionResponse{usedZkCounters: state.ZKCounters{}, isOOC: false, isOOG: false}

processBatchResponse, err := p.state.PreProcessTransaction(ctx, &tx, nil)
if err != nil {
return state.ZKCounters{}, err, false
return response, err
}
return processBatchResponse.UsedZkCounters, processBatchResponse.ExecutorError, !processBatchResponse.IsBatchProcessed

response.usedZkCounters = processBatchResponse.UsedZkCounters

if processBatchResponse.IsBatchProcessed {
if processBatchResponse.Responses != nil && len(processBatchResponse.Responses) > 0 &&
executor.IsROMOutOfGasError(executor.RomErrorCode(processBatchResponse.Responses[0].RomError)) {
response.isOOC = true
}
} else {
response.isOOG = !processBatchResponse.IsBatchProcessed
}

return response, nil
}

// GetPendingTxs from the pool
Expand Down
7 changes: 6 additions & 1 deletion sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (d *dbManager) loadFromPool() {
}

func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error {
txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters)
txTracker, err := d.worker.NewTxTracker(tx.Transaction, isClaim, tx.ZKCounters, tx.IP)
if err != nil {
return err
}
Expand Down Expand Up @@ -556,3 +556,8 @@ func (d *dbManager) FlushMerkleTree(ctx context.Context) error {
func (d *dbManager) AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error {
return d.state.AddDebugInfo(ctx, info, dbTx)
}

// AddEvent is used to store and event in the database
func (d *dbManager) AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error {
return d.state.AddEvent(ctx, event, dbTx)
}
50 changes: 48 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (f *finalizer) handleTxProcessResp(ctx context.Context, tx *TxTracker, resu
}

// Check remaining resources
err := f.checkRemainingResources(result, tx)
err := f.checkRemainingResources(ctx, result, tx)
if err != nil {
return err
}
Expand Down Expand Up @@ -810,12 +810,15 @@ func (f *finalizer) isDeadlineEncountered() bool {
}

// checkRemainingResources checks if the transaction uses less resources than the remaining ones in the batch.
func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse, tx *TxTracker) error {
func (f *finalizer) checkRemainingResources(ctx context.Context, result *state.ProcessBatchResponse, tx *TxTracker) error {
usedResources := batchResources{
zKCounters: result.UsedZkCounters,
bytes: uint64(len(tx.RawTx)),
}

// Log an event in case the TX consumed more than the double of the expected for a zkCounter
f.checkZKCounterConsumption(ctx, result.UsedZkCounters, tx)

err := f.batch.remainingResources.sub(usedResources)
if err != nil {
log.Infof("current transaction exceeds the batch limit, updating metadata for tx in worker and continuing")
Expand All @@ -828,6 +831,49 @@ func (f *finalizer) checkRemainingResources(result *state.ProcessBatchResponse,
return nil
}

func (f *finalizer) checkZKCounterConsumption(ctx context.Context, zkCounters state.ZKCounters, tx *TxTracker) {
events := ""

if zkCounters.CumulativeGasUsed > tx.BatchResources.zKCounters.CumulativeGasUsed*2 {
events += "CumulativeGasUsed "
}
if zkCounters.UsedKeccakHashes > tx.BatchResources.zKCounters.UsedKeccakHashes*2 {
events += "UsedKeccakHashes "
}
if zkCounters.UsedPoseidonHashes > tx.BatchResources.zKCounters.UsedPoseidonHashes*2 {
events += "UsedPoseidonHashes "
}
if zkCounters.UsedPoseidonPaddings > tx.BatchResources.zKCounters.UsedPoseidonPaddings*2 {
events += "UsedPoseidonPaddings "
}
if zkCounters.UsedMemAligns > tx.BatchResources.zKCounters.UsedMemAligns*2 {
events += "UsedMemAligns "
}
if zkCounters.UsedArithmetics > tx.BatchResources.zKCounters.UsedArithmetics*2 {
events += "UsedArithmetics "
}
if zkCounters.UsedBinaries > tx.BatchResources.zKCounters.UsedBinaries*2 {
events += "UsedBinaries "
}
if zkCounters.UsedSteps > tx.BatchResources.zKCounters.UsedSteps*2 {
events += "UsedSteps "
}

if events != "" {
event := &state.Event{
EventType: state.EventType_ZKCounters_Diff + " " + events,
Timestamp: time.Now(),
IP: tx.IP,
TxHash: tx.Hash,
}

err := f.dbManager.AddEvent(ctx, event, nil)
if err != nil {
log.Errorf("Error adding event: %v", err)
}
}
}

// isBatchAlmostFull checks if the current batch remaining resources are under the constraints threshold for most efficient moment to close a batch
func (f *finalizer) isBatchAlmostFull() bool {
resources := f.batch.remainingResources
Expand Down
4 changes: 3 additions & 1 deletion sequencer/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,7 @@ func TestFinalizer_isDeadlineEncountered(t *testing.T) {
func TestFinalizer_checkRemainingResources(t *testing.T) {
// arrange
f = setupFinalizer(true)
ctx := context.Background()
txResponse := &state.ProcessTransactionResponse{TxHash: oldHash}
result := &state.ProcessBatchResponse{
UsedZkCounters: state.ZKCounters{CumulativeGasUsed: 1000},
Expand Down Expand Up @@ -944,12 +945,13 @@ func TestFinalizer_checkRemainingResources(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// arrange
f.batch.remainingResources = tc.remaining
dbManagerMock.On("AddEvent", ctx, mock.Anything, nil).Return(nil)
if tc.expectedWorkerUpdate {
workerMock.On("UpdateTx", txResponse.TxHash, tc.expectedTxTracker.From, result.UsedZkCounters).Return().Once()
}

// act
err := f.checkRemainingResources(result, tc.expectedTxTracker)
err := f.checkRemainingResources(ctx, result, tc.expectedTxTracker)

// assert
if tc.expectedErr != nil {
Expand Down
5 changes: 4 additions & 1 deletion sequencer/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type stateInterface interface {
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type workerInterface interface {
Expand All @@ -86,7 +87,7 @@ type workerInterface interface {
MoveTxToNotReady(txHash common.Hash, from common.Address, actualNonce *uint64, actualBalance *big.Int) []*TxTracker
DeleteTx(txHash common.Hash, from common.Address)
HandleL2Reorg(txHashes []common.Hash)
NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error)
NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error)
}

// The dbManager will need to handle the errors inside the functions which don't return error as they will be used async in the other abstractions.
Expand Down Expand Up @@ -118,6 +119,7 @@ type dbManagerInterface interface {
CountReorgs(ctx context.Context, dbTx pgx.Tx) (uint64, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type dbManagerStateInterface interface {
Expand Down Expand Up @@ -149,6 +151,7 @@ type dbManagerStateInterface interface {
GetLatestGer(ctx context.Context, maxBlockNumber uint64) (state.GlobalExitRoot, time.Time, error)
FlushMerkleTree(ctx context.Context) error
AddDebugInfo(ctx context.Context, info *state.DebugInfo, dbTx pgx.Tx) error
AddEvent(ctx context.Context, event *state.Event, dbTx pgx.Tx) error
}

type ethTxManager interface {
Expand Down
14 changes: 14 additions & 0 deletions sequencer/mock_db_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions sequencer/mock_state.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 9 additions & 9 deletions sequencer/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion sequencer/txtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ type TxTracker struct {
Efficiency float64
RawTx []byte
ReceivedAt time.Time // To check if it has been in the efficiency list for too long
IP string // IP of the tx sender
}

// newTxTracker creates and inits a TxTracker
func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights) (*TxTracker, error) {
func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, constraints batchConstraints, weights batchResourceWeights, ip string) (*TxTracker, error) {
addr, err := state.GetSender(tx)
if err != nil {
return nil, err
Expand All @@ -44,6 +45,7 @@ func newTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters,
GasPrice: tx.GasPrice(),
Cost: tx.Cost(),
ReceivedAt: time.Now(),
IP: ip,
}

txTracker.IsClaim = isClaim
Expand Down
4 changes: 2 additions & 2 deletions sequencer/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func NewWorker(state stateInterface, constraints batchConstraints, weights batch
}

// NewTxTracker creates and inits a TxTracker
func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters) (*TxTracker, error) {
return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights)
func (w *Worker) NewTxTracker(tx types.Transaction, isClaim bool, counters state.ZKCounters, ip string) (*TxTracker, error) {
return newTxTracker(tx, isClaim, counters, w.batchConstraints, w.batchResourceWeights, ip)
}

// AddTxTracker adds a new Tx to the Worker
Expand Down
5 changes: 5 additions & 0 deletions state/runtime/executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func IsROMOutOfCountersError(error pb.RomError) bool {
return int32(error) >= ROM_ERROR_OUT_OF_COUNTERS_STEP && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON
}

// IsROMOutOfGasError indicates if the error is an ROM OOG
func IsROMOutOfGasError(error pb.RomError) bool {
return int32(error) == ROM_ERROR_OUT_OF_GAS
}

// IsExecutorOutOfCountersError indicates if the error is an ROM OOC
func IsExecutorOutOfCountersError(error pb.ExecutorError) bool {
return int32(error) >= EXECUTOR_ERROR_COUNTERS_OVERFLOW_KECCAK && int32(error) <= ROM_ERROR_OUT_OF_COUNTERS_POSEIDON
Expand Down
2 changes: 2 additions & 0 deletions state/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ type TrustedReorg struct {
const (
// EventType_Prexecution_OOC indicates a preexecution out of couters error
EventType_Prexecution_OOC = "PREEXECUTION OOC"
// EventType_Prexecution_OOG indicates a preexecution out of gas error
EventType_Prexecution_OOG = "PREEXECUTION OOG"
// EventType_ZKCounters_Diff indicates big different in preexecution and execution regarding ZKCounters
EventType_ZKCounters_Diff = "ZK COUNTERS DIFF"
)
Expand Down