diff --git a/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go b/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go index dd2fb58ef6..a297ffe04d 100644 --- a/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go +++ b/synchronizer/l1_parallel_sync/l1_rollup_info_producer.go @@ -21,7 +21,6 @@ import ( "sync/atomic" "time" - "github.com/0xPolygonHermez/zkevm-node" "github.com/0xPolygonHermez/zkevm-node/log" "github.com/0xPolygonHermez/zkevm-node/synchronizer/common" ) @@ -90,6 +89,7 @@ type workersInterface interface { requestLastBlockWithRetries(ctx context.Context, timeout time.Duration, maxPermittedRetries int) responseL1LastBlock getResponseChannelForRollupInfo() chan responseRollupInfoByBlockRange String() string + ToStringBrief() string howManyRunningWorkers() int } @@ -229,7 +229,7 @@ func (l *L1RollupInfoProducer) Reset(startingBlockNumber uint64) { } func (l *L1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { - log.Infof("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief()) + log.Debugf("producer: Reset L1 sync process to blockNumber %d st=%s", startingBlockNumber, l.toStringBrief()) l.setStatusReseting() log.Debugf("producer: Reset(%d): stop previous run (state=%s)", startingBlockNumber, l.getStatus().String()) log.Debugf("producer: Reset(%d): syncStatus.reset", startingBlockNumber) @@ -243,7 +243,7 @@ func (l *L1RollupInfoProducer) resetUnsafe(startingBlockNumber uint64) { log.Debugf("producer: Reset(%d): reset Filter", startingBlockNumber) l.filterToSendOrdererResultsToConsumer.Reset(startingBlockNumber) l.setStatus(producerIdle) - log.Infof("producer: Reset(%d): reset done!", startingBlockNumber) + log.Infof("producer: Reset(%d): reset producer done!", startingBlockNumber) } func (l *L1RollupInfoProducer) isProducerRunning() bool { @@ -351,13 +351,13 @@ func (l *L1RollupInfoProducer) step(waitDuration *time.Duration) bool { if atomic.CompareAndSwapInt32((*int32)(&l.status), int32(producerNoRunning), int32(producerIdle)) { // l.getStatus() == producerNoRunning log.Info("producer: step: status is no running, changing to idle %s", l.getStatus().String()) } - log.Infof("producer: build_time:%s step: status:%s", zkevm.BuildDate, l.toStringBrief()) + log.Debugf("producer: step: status:%s", l.toStringBrief()) select { case <-l.ctxWithCancel.Done(): log.Debugf("producer: context canceled") return false case cmd := <-l.channelCmds: - log.Infof("producer: received a command") + log.Debugf("producer: received a command") res := l.executeCmd(cmd) if !res { log.Info("producer: cmd %s stop the process", cmd.cmd.String()) @@ -438,7 +438,7 @@ func (l *L1RollupInfoProducer) step(waitDuration *time.Duration) bool { func (l *L1RollupInfoProducer) executeCmd(cmd producerCmd) bool { switch cmd.cmd { case producerStop: - log.Infof("producer: received a stop, so it stops processing") + log.Infof("producer: received a stop, so it stops requesting new rollup info and stop current requests") l.stopUnsafe() return false case producerReset: @@ -534,7 +534,7 @@ func (l *L1RollupInfoProducer) launchWork() (int, error) { blockRangeMsg := br.String() + unsafeAreaMsg _, err := l.workers.asyncRequestRollupInfoByBlockRange(l.ctxWithCancel.ctx, request) if err != nil { - if errors.Is(err, errAllWorkersBusy) { + if !errors.Is(err, errAllWorkersBusy) { accDebugStr += fmt.Sprintf(" segment %s -> [Error:%s] ", blockRangeMsg, err.Error()) } break @@ -545,7 +545,10 @@ func (l *L1RollupInfoProducer) launchWork() (int, error) { log.Debugf("producer: launch_worker: Launched worker for segment %s, num_workers_in_this_iteration: %d", blockRangeMsg, launchedWorker) l.syncStatus.OnStartedNewWorker(*br) } - log.Infof("producer: launch_worker: num of launched workers: %d result: %s status_comm:%s", launchedWorker, accDebugStr, l.outgoingPackageStatusDebugString()) + if launchedWorker > 0 { + log.Infof("producer: launch_worker: num of launched workers: %d (%s) result: %s ", launchedWorker, l.workers.ToStringBrief(), accDebugStr) + } + log.Debugf("producer: launch_worker: num of launched workers: %d result: %s status_comm:%s", launchedWorker, accDebugStr, l.outgoingPackageStatusDebugString()) return launchedWorker, nil } @@ -559,13 +562,13 @@ func (l *L1RollupInfoProducer) renewLastBlockOnL1IfNeeded(reason string) { ttl := l.ttlOfLastBlockOnL1() oldBlock := l.syncStatus.GetLastBlockOnL1() if elapsed > ttl { - log.Infof("producer: Need a new value for Last Block On L1, doing the request reason:%s", reason) + log.Debugf("producer: Need a new value for Last Block On L1, doing the request reason:%s", reason) result := l.workers.requestLastBlockWithRetries(l.ctxWithCancel.ctx, l.cfg.TimeoutForRequestLastBlockOnL1, l.cfg.NumOfAllowedRetriesForRequestLastBlockOnL1) - log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block) if result.generic.err != nil { - log.Error(result.generic.err) return } + log.Infof("producer: Need a new value for Last Block On L1, doing the request old_block:%v -> new block:%v", oldBlock, result.result.block) + l.onNewLastBlock(result.result.block) } } @@ -588,7 +591,12 @@ func (l *L1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB } if isOk { outgoingPackages := l.filterToSendOrdererResultsToConsumer.Filter(*newL1SyncMessageData(result.result)) - log.Infof("producer: filtered Br[%s/%d], outgoing %d filter_status:%s", result.result.blockRange.String(), result.result.getHighestBlockNumberInResponse(), len(outgoingPackages), l.filterToSendOrdererResultsToConsumer.ToStringBrief()) + log.Debugf("producer: filtered Br[%s/%d], outgoing %d filter_status:%s", result.result.blockRange.String(), result.result.getHighestBlockNumberInResponse(), len(outgoingPackages), l.filterToSendOrdererResultsToConsumer.ToStringBrief()) + if len(outgoingPackages) > 0 { + for idx, msg := range outgoingPackages { + log.Infof("producer: sendind data to consumer: [%d/%d] -> range:[%s] Sending results [data] to consumer:%s ", idx, len(outgoingPackages), result.result.blockRange.String(), msg.toStringBrief()) + } + } l.sendPackages(outgoingPackages) } else { if errors.Is(result.generic.err, context.Canceled) { @@ -601,7 +609,7 @@ func (l *L1RollupInfoProducer) onResponseRollupInfo(result responseRollupInfoByB func (l *L1RollupInfoProducer) sendPackages(outgoingPackages []L1SyncMessage) { for _, pkg := range outgoingPackages { - log.Infof("producer: Sending results [data] to consumer:%s: status_comm:%s", pkg.toStringBrief(), l.outgoingPackageStatusDebugString()) + log.Debugf("producer: Sending results [data] to consumer:%s: status_comm:%s", pkg.toStringBrief(), l.outgoingPackageStatusDebugString()) l.outgoingChannel <- pkg } } diff --git a/synchronizer/l1_parallel_sync/l1_workers.go b/synchronizer/l1_parallel_sync/l1_workers.go index a55951434f..4f2e65421a 100644 --- a/synchronizer/l1_parallel_sync/l1_workers.go +++ b/synchronizer/l1_parallel_sync/l1_workers.go @@ -67,6 +67,10 @@ func (w *workers) String() string { return result } +func (w *workers) ToStringBrief() string { + return fmt.Sprintf(" working: %d of %d ", w.howManyRunningWorkers(), len(w.workers)) +} + func newWorkers(ethermans []L1ParallelEthermanInterface, cfg workersConfig) *workers { result := workers{chIncommingRollupInfo: make(chan responseRollupInfoByBlockRange, len(ethermans)+1), cfg: cfg} diff --git a/synchronizer/l1_parallel_sync/mock_workers_interface.go b/synchronizer/l1_parallel_sync/mock_workers_interface.go index 46327d594d..af162aba67 100644 --- a/synchronizer/l1_parallel_sync/mock_workers_interface.go +++ b/synchronizer/l1_parallel_sync/mock_workers_interface.go @@ -67,6 +67,51 @@ func (_c *workersInterfaceMock_String_Call) RunAndReturn(run func() string) *wor return _c } +// ToStringBrief provides a mock function with given fields: +func (_m *workersInterfaceMock) ToStringBrief() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for ToStringBrief") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// workersInterfaceMock_ToStringBrief_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ToStringBrief' +type workersInterfaceMock_ToStringBrief_Call struct { + *mock.Call +} + +// ToStringBrief is a helper method to define mock.On call +func (_e *workersInterfaceMock_Expecter) ToStringBrief() *workersInterfaceMock_ToStringBrief_Call { + return &workersInterfaceMock_ToStringBrief_Call{Call: _e.mock.On("ToStringBrief")} +} + +func (_c *workersInterfaceMock_ToStringBrief_Call) Run(run func()) *workersInterfaceMock_ToStringBrief_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *workersInterfaceMock_ToStringBrief_Call) Return(_a0 string) *workersInterfaceMock_ToStringBrief_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *workersInterfaceMock_ToStringBrief_Call) RunAndReturn(run func() string) *workersInterfaceMock_ToStringBrief_Call { + _c.Call.Return(run) + return _c +} + // asyncRequestRollupInfoByBlockRange provides a mock function with given fields: ctx, request func (_m *workersInterfaceMock) asyncRequestRollupInfoByBlockRange(ctx context.Context, request requestRollupInfoByBlockRange) (chan responseRollupInfoByBlockRange, error) { ret := _m.Called(ctx, request)