Skip to content

Commit

Permalink
chore: address comments and lint
Browse files Browse the repository at this point in the history
  • Loading branch information
EclesioMeloJunior committed Sep 17, 2024
1 parent ed6c755 commit 9dff75a
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 54 deletions.
2 changes: 2 additions & 0 deletions dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Syncer interface {
// CreateBlockResponse is called upon receipt of a BlockRequestMessage to create the response
CreateBlockResponse(peer.ID, *messages.BlockRequestMessage) (*messages.BlockResponseMessage, error)

// OnConnectionClosed should be trigged whenever Gossamer closes a connection with another
// peer, normally used when the peer reputation is too low.
OnConnectionClosed(peer.ID)
}

Expand Down
9 changes: 4 additions & 5 deletions dot/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
wazero_runtime "github.com/ChainSafe/gossamer/lib/runtime/wazero"
)

const blockRequestTimeout = 20 * time.Second

// BlockProducer to produce blocks
type BlockProducer interface {
Pause() error
Expand Down Expand Up @@ -510,11 +512,8 @@ func (nodeBuilder) newSyncService(config *cfg.Config, st *state.Service, fg sync
return nil, err
}

const blockRequestTimeout = 20 * time.Second
requestMaker := net.GetRequestResponseProtocol(
network.SyncID,
blockRequestTimeout,
network.MaxBlockResponseSize)
requestMaker := net.GetRequestResponseProtocol(network.SyncID,
blockRequestTimeout, network.MaxBlockResponseSize)

syncCfg := &sync.FullSyncConfig{
BlockState: st.Block,
Expand Down
15 changes: 8 additions & 7 deletions dot/sync/fullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (f *FullSyncStrategy) NextActions() ([]*syncTask, error) {
f.syncedBlocks = 0

reqsFromQueue := []*messages.BlockRequestMessage{}
for i := 0; i < int(f.numOfTasks); i++ {
for i := 0; i < f.numOfTasks; i++ {
msg, ok := f.requestQueue.PopFront()
if !ok {
break
Expand Down Expand Up @@ -277,9 +277,9 @@ func (f *FullSyncStrategy) IsFinished(results []*syncTaskResult) (bool, []Change
func (f *FullSyncStrategy) ShowMetrics() {
totalSyncAndImportSeconds := time.Since(f.startedAt).Seconds()
bps := float64(f.syncedBlocks) / totalSyncAndImportSeconds
logger.Infof("⛓️ synced %d blocks, disjoint fragments %d, incomplete blocks %d, "+
logger.Infof("⛓️ synced %d blocks, tasks on queue %d, disjoint fragments %d, incomplete blocks %d, "+
"took: %.2f seconds, bps: %.2f blocks/second, target block number #%d",
f.syncedBlocks, len(f.unreadyBlocks.disjointFragments), len(f.unreadyBlocks.incompleteBlocks),
f.syncedBlocks, f.requestQueue.Len(), len(f.unreadyBlocks.disjointFragments), len(f.unreadyBlocks.incompleteBlocks),
totalSyncAndImportSeconds, bps, f.peers.getTarget())
}

Expand Down Expand Up @@ -349,9 +349,9 @@ func (f *FullSyncStrategy) OnBlockAnnounce(from peer.ID, msg *network.BlockAnnou

// if we still far from aproaching the calculated target
// then we can ignore the block announce
ratioOfCompleteness := (bestBlockHeader.Number / uint(f.peers.getTarget())) * 100
logger.Infof("sync: ratio of completeness: %d", ratioOfCompleteness)
if ratioOfCompleteness < 80 {
mx := max(blockAnnounceHeader.Number, bestBlockHeader.Number)
mn := min(blockAnnounceHeader.Number, bestBlockHeader.Number)
if (mx - mn) > messages.MaxBlocksInResponse {
return true, nil, nil
}

Expand All @@ -368,9 +368,10 @@ func (f *FullSyncStrategy) OnBlockAnnounce(from peer.ID, msg *network.BlockAnnou
request := messages.NewBlockRequest(*variadic.Uint32OrHashFrom(blockAnnounceHeaderHash),
1, messages.RequestedDataBody+messages.RequestedDataJustification, messages.Ascending)
f.requestQueue.PushBack(request)
} else {
logger.Infof("announced block already exists #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
}

logger.Infof("announced block already exists #%d (%s)", blockAnnounceHeader.Number, blockAnnounceHeaderHash.Short())
return true, &Change{
who: from,
rep: peerset.ReputationChange{
Expand Down
6 changes: 2 additions & 4 deletions dot/sync/request_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ func (r *requestsQueue[M]) PopFront() (value M, ok bool) {
return e.Value.(M), true
}

func (r *requestsQueue[M]) PushBack(message ...M) {
func (r *requestsQueue[M]) PushBack(message M) {
r.mu.Lock()
defer r.mu.Unlock()
for _, m := range message {
r.queue.PushBack(m)
}
r.queue.PushBack(message)
}
72 changes: 34 additions & 38 deletions dot/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ func (s *SyncService) waitWorkers() {
}

func (s *SyncService) Start() error {
s.waitWorkers()

s.wg.Add(1)
go s.runSyncEngine()
return nil
Expand Down Expand Up @@ -219,17 +217,26 @@ func (s *SyncService) HighestBlock() uint {

func (s *SyncService) runSyncEngine() {
defer s.wg.Done()
s.waitWorkers()

logger.Infof("starting sync engine with strategy: %T", s.currentStrategy)

lockAndStart:
s.mu.Lock()
logger.Info("starting process to acquire more blocks")
for {
select {
case <-s.stopCh:
return
case <-time.After(s.slotDuration):
}

select {
case <-s.stopCh:
return
default:
s.runStrategy()
}
}

func (s *SyncService) runStrategy() {
s.mu.Lock()
defer s.mu.Unlock()

logger.Tracef("running strategy: %T", s.currentStrategy)

finalisedHeader, err := s.blockState.GetHighestFinalisedHeader()
if err != nil {
Expand Down Expand Up @@ -258,41 +265,30 @@ lockAndStart:
return
}

logger.Tracef("amount of tasks to process: %d", len(tasks))
if len(tasks) == 0 {
goto loopBack
return
}

{
results := s.workerPool.submitRequests(tasks)
done, repChanges, peersToIgnore, err := s.currentStrategy.IsFinished(results)
if err != nil {
logger.Criticalf("current sync strategy failed with: %s", err.Error())
return
}

for _, change := range repChanges {
s.network.ReportPeer(change.rep, change.who)
}
results := s.workerPool.submitRequests(tasks)
done, repChanges, peersToIgnore, err := s.currentStrategy.IsFinished(results)
if err != nil {
logger.Criticalf("current sync strategy failed with: %s", err.Error())
return
}

for _, block := range peersToIgnore {
s.workerPool.ignorePeerAsWorker(block)
}
for _, change := range repChanges {
s.network.ReportPeer(change.rep, change.who)
}

s.currentStrategy.ShowMetrics()
for _, block := range peersToIgnore {
s.workerPool.ignorePeerAsWorker(block)
}

if done {
if s.defaultStrategy == nil {
logger.Criticalf("nil default strategy")
return
}
s.currentStrategy.ShowMetrics()
logger.Trace("finish process to acquire more blocks")

s.currentStrategy = s.defaultStrategy
}
if done {
s.currentStrategy = s.defaultStrategy
}

loopBack:
logger.Info("finish process to acquire more blocks")
s.mu.Unlock()
time.Sleep(s.slotDuration)
goto lockAndStart
}
2 changes: 2 additions & 0 deletions dot/sync/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (s *syncWorkerPool) submitRequests(tasks []*syncTask) []*syncTaskResult {
go func(expectedResults int) {
defer wg.Done()
var taskResults []*syncTaskResult

for result := range results {
taskResults = append(taskResults, result)
if len(taskResults) == expectedResults {
Expand Down Expand Up @@ -159,6 +160,7 @@ func executeTask(task *syncTask, workerPool chan peer.ID, failedTasks chan *sync
failedTasks <- task
} else {
logger.Infof("[FINISHED] worker %s, request: %s", worker, task.request)
workerPool <- worker
results <- &syncTaskResult{
who: worker,
completed: true,
Expand Down

0 comments on commit 9dff75a

Please sign in to comment.