Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les: implement new les fetcher #20692

Merged
merged 15 commits into from
Jul 28, 2020
92 changes: 58 additions & 34 deletions les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ import (
)

const (
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
trustedItemsThreshold = 64 // The maximum queued trusted announcements
blockDelayTimeout = 10 * time.Second // Timeout for retrieving the headers from the peer
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired requests
cachedAnnosThreshold = 64 // The maximum queued announcements
)

// announce represents an new block announcement from the les server.
Expand Down Expand Up @@ -65,42 +65,49 @@ type response struct {

// fetcherPeer holds the fetcher-specific information for each active peer
type fetcherPeer struct {
latest *announceData // The latest announcement sent from the peer
trustedMap map[common.Hash]uint64 // Trusted announces map
trustedQueue *prque.Prque // Trusted announces queue
latest *announceData // The latest announcement sent from the peer

// These following two fields can track the latest announces
// from the peer with limited size for caching.
announces map[common.Hash]*announce // Announcement map
announceQueue *prque.Prque // Announcement queue
}

// addTrustedAnno enqueues an new trusted announcement. If the queued
// announces overflow, evict from the oldest.
func (fp *fetcherPeer) addTrustedAnno(number uint64, hash common.Hash) {
// addAnno enqueues an new trusted announcement. If the queued announces overflow,
// evict from the oldest.
func (fp *fetcherPeer) addAnno(announce *announce) {
// Short circuit if the announce already exists. In normal case it should
// never happen since only monotonic announce is accepted. But the adversary
// may feed us fake announces with higher td but same hash. In this case,
// ignore the announce anyway.
if _, exist := fp.trustedMap[hash]; exist {
hash, number := announce.data.Hash, announce.data.Number
if _, exist := fp.announces[hash]; exist {
return
}
fp.trustedMap[hash] = number
fp.trustedQueue.Push(hash, -int64(number))
fp.announces[hash] = announce
fp.announceQueue.Push(hash, -int64(number))

// Evict oldest if the announces are oversized.
for fp.trustedQueue.Size() > trustedItemsThreshold {
item, _ := fp.trustedQueue.Pop()
delete(fp.trustedMap, item.(common.Hash))
for fp.announceQueue.Size() > cachedAnnosThreshold {
item, _ := fp.announceQueue.Pop()
delete(fp.announces, item.(common.Hash))
}
}

// forwardTrustedAnno removes all announces from the map with a number lower than
// forwardAnno removes all announces from the map with a number lower than
rjl493456442 marked this conversation as resolved.
Show resolved Hide resolved
// the provided threshold.
func (fp *fetcherPeer) forwardTrustedAnno(number uint64) {
for !fp.trustedQueue.Empty() {
item, priority := fp.trustedQueue.Pop()
func (fp *fetcherPeer) forwardAnno(number uint64) []*announce {
var ret []*announce
for !fp.announceQueue.Empty() {
item, priority := fp.announceQueue.Pop()
if uint64(-priority) > number {
fp.trustedQueue.Push(item, priority)
return
fp.announceQueue.Push(item, priority)
break
}
delete(fp.trustedMap, item.(common.Hash))
ret = append(ret, fp.announces[item.(common.Hash)])
delete(fp.announces, item.(common.Hash))
}
return ret
}

// lightFetcher implements retrieval of newly announced headers. It reuses
Expand Down Expand Up @@ -191,8 +198,8 @@ func (f *lightFetcher) registerPeer(p *serverPeer) {
defer f.plock.Unlock()

f.peers[p.ID()] = &fetcherPeer{
trustedMap: make(map[common.Hash]uint64),
trustedQueue: prque.New(nil),
announces: make(map[common.Hash]*announce),
announceQueue: prque.New(nil),
}
}

Expand Down Expand Up @@ -271,7 +278,7 @@ func (f *lightFetcher) mainloop() {
trusted bool
)
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
if n := p.trustedMap[hash]; n == number {
if anno := p.announces[hash]; anno != nil && anno.trust && anno.data.Number == number {
agreed = append(agreed, id)
if 100*len(agreed)/len(f.ulc.keys) >= f.ulc.fraction {
trusted = true
Expand All @@ -294,7 +301,7 @@ func (f *lightFetcher) mainloop() {
continue
}
// Announced tds should be strictly monotonic, drop the peer if
// there are too many out-of-order announces accumulated.
// the announce is out-of-order.
if peer.latest != nil && data.Td.Cmp(peer.latest.Td) <= 0 {
f.peerset.unregister(peerid.String())
log.Debug("Non-monotonic td", "peer", peerid, "current", data.Td, "previous", peer.latest.Td)
Expand All @@ -306,6 +313,8 @@ func (f *lightFetcher) mainloop() {
if localTd != nil && data.Td.Cmp(localTd) <= 0 {
continue
}
peer.addAnno(anno)

// If we are not syncing, try to trigger a single retrieval or re-sync
if !ulc && !syncing {
// Two scenarios lead to re-sync:
Expand All @@ -326,8 +335,6 @@ func (f *lightFetcher) mainloop() {
}
// Keep collecting announces from trusted server even we are syncing.
if ulc && anno.trust {
peer.addTrustedAnno(data.Number, data.Hash)

// Notify underlying fetcher to retrieve header or trigger a resync if
// we have receive enough announcements from trusted server.
trusted, agreed := trustedHeader(data.Hash, data.Number)
Expand Down Expand Up @@ -381,12 +388,29 @@ func (f *lightFetcher) mainloop() {
reset(ev.Block.Header())
number := localHead.Number.Uint64()

// Clean stale announcements from trusted server.
if ulc {
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
p.forwardTrustedAnno(number)
return true
})
// Clean stale announcements from les-servers.
var droplist []enode.ID
f.forEachPeer(func(id enode.ID, p *fetcherPeer) bool {
removed := p.forwardAnno(number)
for _, anno := range removed {
if header := f.chain.GetHeaderByHash(anno.data.Hash); header != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first I thought that this check is going to be simpler and we just need to add the check after delivering a header to the block fetcher by f.fetcher.FilterHeaders. Unfortunately that function returns before actually inserting the header to the chain so we still don't have the Td calculated. I guess this is why you are checking after evicting the announcements from the queue. This might work too but then I think you should also do this check in the other case when you are evicting the announcement because the queue is full (this is actually the more likely case when an attacker is spamming with bad announcements). Then we can be sure that causing the client to request useless headers by fake announcements is always punished. Either the hash is non-existent (in which case the retrieval will fail) or the hash exists but the number/Td does not match (in which case this check is always going to catch it when it gets out of the queue one way or the other).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think about it. We have two cases to evict announces: (a) local chain has inserted some headers, to evict stale or useless announces (b) the announce queue is full.

In the latter one, we can't do any meaningful check since they are all "future" announces.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not necessarily true if the server is lying about the Td (which is the primary attack vector). If the headers exist but it is a worthless sidechain (or a fork like ETC) and the attacking server just keeps feeding announcements of it with fake high Tds then it is never evicted by forwardAnno since the Td appearing in the FIFO list is high (higher than the real chain). It will be evicted by addAnno where there is no check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it's still ok. Image the client keeps feeding super high td all the time, it will finally trigger a syncing. And the downloader will detect this announcement is invalid and drop this peer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And also it's very hard to differentiate "fake announcement with high td" and "valid announcement because local client is out of sync". The main difference is for the latter one, we can finally sync to this point, but for the former, we will never reach the announcement point(it can be dropped by the downloader)

if header.Number.Uint64() != anno.data.Number {
droplist = append(droplist, id)
break
}
// In theory td should exists.
td := f.chain.GetTd(anno.data.Hash, anno.data.Number)
if td != nil && td.Cmp(anno.data.Td) != 0 {
droplist = append(droplist, id)
break
}
}
}
return true
})
for _, id := range droplist {
f.peerset.unregister(id.String())
log.Debug("Kicked out peer for invalid announcement")
}
if f.newHeadHook != nil {
f.newHeadHook(localHead)
Expand Down
35 changes: 35 additions & 0 deletions les/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package les

import (
"math/big"
"testing"
"time"

Expand Down Expand Up @@ -231,3 +232,37 @@ func testTrustedAnnouncement(t *testing.T, protocol int) {
check([]uint64{4}, 4, func() { <-newHead }) // ULC-style light syncing, rollback untrusted headers
check([]uint64{10}, 10, func() { <-newHead }) // Sync the whole chain.
}

func TestInvalidAnnounces(t *testing.T) {
s, c, teardown := newClientServerEnv(t, 4, lpv3, nil, nil, 0, false, false)
defer teardown()

// Create connected peer pair.
c.handler.fetcher.noAnnounce = true // Ignore the first announce from peer which can trigger a resync.
peer, _, err := newTestPeerPair("peer", lpv3, s.handler, c.handler)
if err != nil {
t.Fatalf("Failed to create peer pair %v", err)
}
c.handler.fetcher.noAnnounce = false

done := make(chan *types.Header, 1)
c.handler.fetcher.newHeadHook = func(header *types.Header) { done <- header }

// Prepare announcement by latest header.
headerOne := s.backend.Blockchain().GetHeaderByNumber(1)
hash, number := headerOne.Hash(), headerOne.Number.Uint64()
td := big.NewInt(200) // bad td

// Sign the announcement if necessary.
announce := announceData{hash, number, td, 0, nil}
if peer.cpeer.announceType == announceTypeSigned {
announce.sign(s.handler.server.privateKey)
}
peer.cpeer.sendAnnounce(announce)
<-done // Wait syncing

// Ensure the bad peer is evicited
if c.handler.backend.peers.len() != 0 {
t.Fatalf("Failed to evict invalid peer")
}
}