diff --git a/pkg/blocker/blocker_test.go b/pkg/blocker/blocker_test.go index 855ef17a3d9..4b9a42390ff 100644 --- a/pkg/blocker/blocker_test.go +++ b/pkg/blocker/blocker_test.go @@ -19,6 +19,8 @@ import ( func TestBlocksAfterFlagTimeout(t *testing.T) { + *blocker.WakeupTime = time.Millisecond + mux := sync.Mutex{} blocked := make(map[string]time.Duration) @@ -64,10 +66,14 @@ func TestBlocksAfterFlagTimeout(t *testing.T) { if blockedTime != blockTime { t.Fatalf("block time: want %v, got %v", blockTime, blockedTime) } + + b.Close() } func TestUnflagBeforeBlock(t *testing.T) { + *blocker.WakeupTime = time.Millisecond + mux := sync.Mutex{} blocked := make(map[string]time.Duration) @@ -106,6 +112,8 @@ func TestUnflagBeforeBlock(t *testing.T) { if ok { t.Fatal("address should not be blocked") } + + b.Close() } type blocklister struct { diff --git a/pkg/blocker/blockler.go b/pkg/blocker/blockler.go index 83b2e39b7aa..b8d07b52646 100644 --- a/pkg/blocker/blockler.go +++ b/pkg/blocker/blockler.go @@ -14,11 +14,14 @@ import ( ) type peer struct { - flagged bool // indicates whether the peer is actively flagged blockAfter time.Time // timestamp of the point we've timed-out or got an error from a peer addr swarm.Address } +var ( + wakeupTime time.Duration = time.Second * 10 +) + type Blocker struct { mux sync.Mutex disconnector p2p.Blocklister @@ -53,31 +56,21 @@ func (b *Blocker) run() { select { case <-b.quit: return - case <-b.wakeupCh: - <-time.After(b.flagTimeout) + case <-time.After(wakeupTime): b.block() } } } -func (b *Blocker) wakeup() { - - select { - case b.wakeupCh <- struct{}{}: - default: - } -} - func (b *Blocker) block() { b.mux.Lock() defer b.mux.Unlock() for key, peer := range b.peers { - if peer.flagged && time.Now().After(peer.blockAfter) { + if !peer.blockAfter.IsZero() && time.Now().After(peer.blockAfter) { if err := b.disconnector.Blocklist(peer.addr, b.blockDuration, "blocker: flag timeout"); err != nil { b.logger.Warningf("blocker: blocking peer %s failed: %v", peer.addr, err) } - delete(b.peers, key) } } @@ -90,19 +83,15 @@ func (b *Blocker) Flag(addr swarm.Address) { p, ok := b.peers[addr.ByteString()] if ok { - if !p.flagged { + if p.blockAfter.IsZero() { p.blockAfter = time.Now().Add(b.flagTimeout) - p.flagged = true } } else { b.peers[addr.ByteString()] = &peer{ blockAfter: time.Now().Add(b.flagTimeout), - flagged: true, addr: addr, } } - - b.wakeup() } func (b *Blocker) Unflag(addr swarm.Address) { @@ -112,7 +101,9 @@ func (b *Blocker) Unflag(addr swarm.Address) { delete(b.peers, addr.ByteString()) } +// Closed will exit the worker loop. +// must be called only once. func (b *Blocker) Close() error { - close(b.quit) + b.quit <- struct{}{} return nil } diff --git a/pkg/blocker/export_test.go b/pkg/blocker/export_test.go new file mode 100644 index 00000000000..f96e4a83008 --- /dev/null +++ b/pkg/blocker/export_test.go @@ -0,0 +1,3 @@ +package blocker + +var WakeupTime = &wakeupTime diff --git a/pkg/p2p/p2p.go b/pkg/p2p/p2p.go index 0c11925ab7b..1537be96747 100644 --- a/pkg/p2p/p2p.go +++ b/pkg/p2p/p2p.go @@ -36,7 +36,7 @@ type Disconnecter interface { type Blocklister interface { // Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration - // duration 0 is treated as an infinite duration + // Duration 0 is treated as an infinite duration. Blocklist(overlay swarm.Address, duration time.Duration, reason string) error }