Skip to content

Commit

Permalink
fix: added time based loop
Browse files Browse the repository at this point in the history
  • Loading branch information
istae committed Sep 27, 2021
1 parent be5db7b commit 3aadd28
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 20 deletions.
8 changes: 8 additions & 0 deletions pkg/blocker/blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (

func TestBlocksAfterFlagTimeout(t *testing.T) {

*blocker.WakeupTime = time.Millisecond

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

Expand Down Expand Up @@ -64,10 +66,14 @@ func TestBlocksAfterFlagTimeout(t *testing.T) {
if blockedTime != blockTime {
t.Fatalf("block time: want %v, got %v", blockTime, blockedTime)
}

b.Close()
}

func TestUnflagBeforeBlock(t *testing.T) {

*blocker.WakeupTime = time.Millisecond

mux := sync.Mutex{}
blocked := make(map[string]time.Duration)

Expand Down Expand Up @@ -106,6 +112,8 @@ func TestUnflagBeforeBlock(t *testing.T) {
if ok {
t.Fatal("address should not be blocked")
}

b.Close()
}

type blocklister struct {
Expand Down
29 changes: 10 additions & 19 deletions pkg/blocker/blockler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (
)

type peer struct {
flagged bool // indicates whether the peer is actively flagged
blockAfter time.Time // timestamp of the point we've timed-out or got an error from a peer
addr swarm.Address
}

var (
wakeupTime time.Duration = time.Second * 10
)

type Blocker struct {
mux sync.Mutex
disconnector p2p.Blocklister
Expand Down Expand Up @@ -53,31 +56,21 @@ func (b *Blocker) run() {
select {
case <-b.quit:
return
case <-b.wakeupCh:
<-time.After(b.flagTimeout)
case <-time.After(wakeupTime):
b.block()
}
}
}

func (b *Blocker) wakeup() {

select {
case b.wakeupCh <- struct{}{}:
default:
}
}

func (b *Blocker) block() {
b.mux.Lock()
defer b.mux.Unlock()

for key, peer := range b.peers {
if peer.flagged && time.Now().After(peer.blockAfter) {
if !peer.blockAfter.IsZero() && time.Now().After(peer.blockAfter) {
if err := b.disconnector.Blocklist(peer.addr, b.blockDuration, "blocker: flag timeout"); err != nil {
b.logger.Warningf("blocker: blocking peer %s failed: %v", peer.addr, err)
}

delete(b.peers, key)
}
}
Expand All @@ -90,19 +83,15 @@ func (b *Blocker) Flag(addr swarm.Address) {
p, ok := b.peers[addr.ByteString()]

if ok {
if !p.flagged {
if p.blockAfter.IsZero() {
p.blockAfter = time.Now().Add(b.flagTimeout)
p.flagged = true
}
} else {
b.peers[addr.ByteString()] = &peer{
blockAfter: time.Now().Add(b.flagTimeout),
flagged: true,
addr: addr,
}
}

b.wakeup()
}

func (b *Blocker) Unflag(addr swarm.Address) {
Expand All @@ -112,7 +101,9 @@ func (b *Blocker) Unflag(addr swarm.Address) {
delete(b.peers, addr.ByteString())
}

// Closed will exit the worker loop.
// must be called only once.
func (b *Blocker) Close() error {
close(b.quit)
b.quit <- struct{}{}
return nil
}
3 changes: 3 additions & 0 deletions pkg/blocker/export_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package blocker

var WakeupTime = &wakeupTime
2 changes: 1 addition & 1 deletion pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type Disconnecter interface {

type Blocklister interface {
// Blocklist will disconnect a peer and put it on a blocklist (blocking in & out connections) for provided duration
// duration 0 is treated as an infinite duration
// Duration 0 is treated as an infinite duration.
Blocklist(overlay swarm.Address, duration time.Duration, reason string) error
}

Expand Down

0 comments on commit 3aadd28

Please sign in to comment.