Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core, eth: announce based transaction propagation #20234

Merged
merged 2 commits into from
Feb 13, 2020

Conversation

rjl493456442
Copy link
Member

@rjl493456442 rjl493456442 commented Nov 4, 2019

Implements ethereum/EIPs#2464

Copy link
Member

@karalabe karalabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this PR (Still need to crunch through the fetcher, didn't go into that yet).

One thing I'm wondering about is whether we should only use hashes for the announcements, or whether we should add some extra metadata too (e.g. sender, nonce). It's a unique opportunity to move the eth protocol away from working purely on hashes.

E.g. If someone announces tx with hash X, I must retrieve it first and can only later realize it might be useless (e.g. old). If we were to include some extra infos that could allow the local node to pre-filter the thing, we might never need to retrieve it in the first place. Not sure what these infos could be and how useful they'd be, just thinking out aloud here.

eth/handler.go Outdated Show resolved Hide resolved
eth/handler.go Outdated
bytes int
txs []rlp.RawValue
)
for bytes < softResponseLimit && len(txs) < fetcher.MaxTransactionFetch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrospectively, I'm not sure it's a good idea to use fixed number limits (this prevents us in the downloader to play with different request sizes). Rather than enforcing a global messages size limit bytes < softResponseLimit and a local item count limit len(txs) < fetcher.MaxTransactionFetch, we should imho have specific limits on data types. E.g. we could set a softTransactionResponseLimit to 128KB and only enforce that. It's small enough that you can't go wild with thousands of lookups, but more flexible than limiting to 64 txs only (e.g. if I only do transfers, might as well send back more than 64).

As for fetcher.MaxTransactionFetch, we can still keep that, it's fine to limit on the requester side, but separating the client and server limits allows us to play with them in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but we need to avoid remote nodes requesting inifnite none existing txs, forcing DB reads. Though the original code doesn't handle that either.

eth/handler.go Outdated Show resolved Hide resolved
eth/fetcher/block_fetcher.go Outdated Show resolved Hide resolved
eth/handler.go Show resolved Hide resolved
eth/handler.go Outdated Show resolved Hide resolved
eth/peer.go Outdated Show resolved Hide resolved
eth/peer.go Outdated Show resolved Hide resolved
eth/protocol.go Outdated Show resolved Hide resolved
Copy link
Member

@karalabe karalabe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having chatted with @holiman a bit, we kind of think that we should limit GetTxMsg to GetPooledTxMsg. I.e. disallow requesting ancient data from the database, as that would be redundant with GetBlockBodies and might end up with other nodes doing weird requests, abusing this direct database access.

eth/handler.go Outdated Show resolved Hide resolved
eth/handler.go Outdated
return true
}
return rawdb.ReadTxLookupEntry(pm.chaindb, hash) != nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I think we should nuke this function altogether and only ever do txpool.Has.

There are two scenarios that can happen with tx propagation: a) an in-sync node sends an actually new tx and b) a stale node sends us something stale.

In a), which is the usual case, the transaction will either be in the pool already or not yet. Looking into the database is just an extra hit uselessly, since more real transaction propagation is live in-flight transactions.

In b), a stale node could indeed trigger us to download a tx we already have, but that will be caught by the txpool, and a properly implemented node won't keep reannouncing the same old tx over and over again, so apart from some initial wasted bandwidth, we're on track again.

The benefit of dropping this db check clause is that we avoid any potential - deliberate or accidental - DoS vector due to the allowed cheap database access. A malicious node could still feed us bad hashes and force a database lookup after retrieving them, but that also entails them having to incur the bandwidth cost of transferring an entire valid, signed transaction in the first place, so there's a lot less reads they can force us to do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also scenario c) where a tx is announced, but by the time we request it it's already mined. In that case we should be able to detect that a block contained the tx and not request it or not be bothered by an empty response. This case should be kind of rare though as it's very improbably for a tx to be announced and immediately included in a block milliseconds later.

@rjl493456442 rjl493456442 force-pushed the newtxhashes_2 branch 2 times, most recently from 78bacc4 to 54f20f1 Compare January 14, 2020 07:06
@rjl493456442 rjl493456442 changed the title [wip] core, eth: implement tx fetcher core, eth: implement tx fetcher Jan 14, 2020
@rjl493456442 rjl493456442 force-pushed the newtxhashes_2 branch 3 times, most recently from 0f23313 to 89cb949 Compare January 14, 2020 12:17
eth/protocol.go Outdated Show resolved Hide resolved
@karalabe karalabe changed the title core, eth: implement tx fetcher core, eth: announce based transaction propagation Jan 15, 2020
// valid transaction is 128KB right now. Worst case scenario, a packet
// can contain 16 transactions, but normally 2MB is totally enough for
// containing 64 transactions.
MaxTransactionFetch = 64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should bump this up. If we overshoot, the extra hashes are wasted, but since a hash is tiny, the cost is negligible. However, the "cost" of extra network RTTs can be impactful if we need to do many requests.

Looking at a few recent blocks on Etherscan, they seem to contain transactions weighing <200B on average. If we'd like to have meaningful reply messages of 128KB, that's 640 transactions of this size. Maybe that's a bit high to waste if a few txs are large, but let's try a middle ground. Let's set this limit to 256. That should allow packing up 50KB of useful data if tx are small and wastes 8KB if txs are gigantic (1 per reply).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then again, I can be convinced otherwise too. CC @holiman @AlexeyAkhunov ?

eth/peer.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/peer.go Outdated
queuedTxAnns: make(chan []common.Hash, maxQueuedTxAnns),
queuedBlocks: make(chan *propEvent, maxQueuedBlocks),
queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
txRetrievals: make(chan []common.Hash, maxQueuedTxRetrieval),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been thinking that this might need a bit more expanded approach. For block announcements and broadcasts channels are good, because we expect these events to be relatively rare (i.e. the channels are mostly DoS protection).

I've tried to mimic the same for txs, but since there we can have much much more, I used slices to avoid spinning a channel too hard. The downside however is that now we have no control over the size (e.g. we limit to 128 slices, but those can be arbitrary large). This is a problem because we can't guarantee the memory used for queued up txs (especially if peers have different bandwidths and thus ens up queuing different txs). If we're reworking this part, it might be a good opportunity to fix this.

My alternative proposal would be to replace all 3 queues (tx broadcasts, annoucements and requests) with a []common.Hash (and a lock to protect them). Then scheduling either of the 3 operations would just append Nx32 bytes at the end (we would exactly know how much memory we waste per peer max).

A nice thing we can do for the announcements and broadcasts here is to sanity check them before sending out. This way if a tx gets included in a block (or dropped) then we can sanity check against th epool before pushing to the peer, thus reducing useless traffic for slow peers.

This seems an easy enough solution that will make our memory usage smaller, would enable us to have a guaranteed memory cap and also reduce network traffic for peers that don't read everything. Win-win-win :)

@@ -260,20 +259,9 @@ func (f *TxFetcher) loop() {
// Send out all block header requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not block header, should be transaction

Suggested change
// Send out all block header requests
// Send out all transaction requests

@@ -260,20 +259,9 @@ func (f *TxFetcher) loop() {
// Send out all block header requests
for peer, hashes := range request {
log.Trace("Fetching scheduled transactions", "peer", peer, "txs", hashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the hashes here can be quite huge, right? It's not very nice to fill the console with thousands (or even hundreds) of hashes in case the trace level is used

Comment on lines 267 to 268
fetchTxs := f.fetching[hashes[0]].fetchTxs
fetchTxs(hashes)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just

f.fetching[hashes[0]].fetchTxs(hashes)

?

txs []rlp.RawValue
)
for bytes < softResponseLimit {
// Retrieve the hash of the next block
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Retrieve the hash of the next block

log.Error("Failed to encode transaction", "err", err)
} else {
txs = append(txs, encoded)
bytes += len(encoded)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be any sense in doing p.MarkTransaction here too? If a node requests a certain hash from several peers, (not requesting explicitly from the announcer)... ?

Just a thought

@karalabe
Copy link
Member

I've made a number of changes to this PR:

  • Removed the metrics from the eth package as all network packets are already covered by p2p generically, so it's better not to have to maintain this code.
  • Split out PooledTransaction (0x0a) per the update in the EIP ethereum/EIPs@b33fa99.
  • Reworked initial transaction exchange so on eth/65+ we don't broadcast the list of transactions any more on that weird throttled txsyncer, rather just schedule our entire list of pooled tx hashes to be sent async. This is light-years better than what we had before and with the introduction of the announcements, there's no need for the throttling any more. The old mechanism was left in for eth/64-.
  • Reworked the tx broadcast and announcement sends so that these can only be schedules async by eth/handler instead of having a public direct sender (ok, made the method private and added a bunch of comments). This should ensure that we don't accidentally cause memory blowups.
  • The broadcastTxs was broken as it scheduled 2 goroutines, but used a single done channel in both, one overwriting the other. This could have caused either goroutine leaks or double closes, causing a panic. Fixed it by splitting the method into broadcastTransactions and announceTransactions. This way the code is simpler too.
  • Fixed broadcast/announce scheduling to discard old entries if the queue is full instead of new ones.
  • Added known hash tracking when sending PooledTransactionsMsg. Thinking about it a bit more, this might not have been needed since the remote peer should only request whatever we announced (and announcing sets the known hash). Might need to revert this and add a comment accordingly.

TODO:

  • Transaction and PooledTransaction messages are now handled the exact same way in the tx fetcher. We need to differentiate the two, since the former should not cancel pending requests whereas the latter should. I think we don't track pending requests now, but still, should rethink it a bit.
  • We currently have the txRetrieval mechanism that piles requests similar to broadcasts. We added this to avoid many concurrent 1-tx requests, which we do now because we couldn't figure out if a request exists or not. With the split between Transaction and PooledTransaction messages, we can actually track if a peer is waiting for a response or if that arrived, so we should probably simplify this code a bit (at the expense of the fetcher).

@karalabe
Copy link
Member

Another idea I'd been pondering about. The knownTxs set size and the tx propagation queue sizes should maybe be proportional to the node's txpool cap. E.g. if I bump my cap to 10000 txs, then I must increase my prop queue to at least that much, otherwise I will never propagate all my txs to my peers. Wrt knowntx set, similarly if it's smaller/close to my tx limit, a single transfer will overflow it (though this might not be an issue, since I don't try to rebcast my txs currently)

@karalabe
Copy link
Member

I've pushed another massive commit:

  • Drop the background async per-peer tx fetchers since we lose too much control over the request chunking and the replies, so we cannot know what tx are missing and what haven't been requested yet. The old fetcher relied of 5 second timeouts and rerequests to fill is not-delivered txs, but we'd bee operating too blindly.
  • The fetcher dragged along origin peer, fetcher method, and time instance for every single annoncement contained, which was just way too nasty to shuffle around. I replaced this complexity with a 2 phase operation: unknown transaction announcements go into the waiting list, and after being there for 500ms and not getting nuked by a broadcast are moved into the download queue. In the second phase the fetcher will schedule the to a suitable peer when one is idling.
  • The internals were mostly rewritten so scheduling doesn't operate on hashes, rather on peers (i.e. previously when there was a hash to be retrieved, it was shoved to a peer, independent if that is idle or not). Now we properly track which peer is executing a request and if everyone's busy, hashes wait until they free up. This way the tight loop is iterating when peers deliver (small number of events), not when hashes trigger (2 orders of magnitude more txs than peers, specially churn wise).
  • The loop also had an initial boilerplate section which ran on every event and tried to time out requests. This is again very wasteful, because with tx announcements arriving by the thousands per sec, we'd check for retrieval expirations thousands of times per sec. I've replaced with with a similar timer mechanism that te fetch had previously (now wait list), so the timeouts are also globally tracked by 1 timer, and we only iterate the pool when we surely know something timed out.

TODO:

  • The DOS tracker is currently a separate counter. That is super brittle with the complex map shuffling, so we should drop the counter and try to detect DoS in a "stateless" way based on the contents of the announcement queues. Maybe we'll need an extra helper, but that still seems simpler than ensuring all code paths correctly account everything.
  • Currently (and the old code too) we don't take into account disconnects, rather rely on 5 second timeout to detect these. That is nasty, so we need to forward delivery failures too into the loop. This should be doable either via a new channel or extending txDelivery with an error (possibly simpler). This would allos us to immediately reschedule txs blocked by a disconnected peer.
  • I commented out the tests due to the API change, will fix these tomorrow.

f.underpriced.Add(txs[i].Hash())
}
}
added = append(added, txs[i].Hash())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this statement is executed even when err != nil -- I suppose there should be a continue in the if-clause above?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's correct. The fetcher requested these, so someone announced them. Even though they are junk, we need to push the delivery into the fetcher so it cleans them out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though the name is probably bad being added, should revert it to hashes

Comment on lines 471 to 482
hashes = append(hashes, hash)
if len(hashes) >= maxTransactionFetch {
break
}
}
}
// If any hashes were allocated, mark those as fetching
if len(hashes) > 0 {
for _, hash := range hashes {
if _, ok := f.fetching[hash]; ok {
panic("fetching tracker already contains fetching item")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you simply delete this? Why loop twice here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, we construct the hashes based on fetching not containing them, so this panic is a noop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant delete the entire segment between L471:482, basically (yes, including the panic, and obviously keep the exit-condition). But mash it into one loop

Comment on lines 403 to 404
if _, ok := f.announced[hash]; ok {
panic("announced tracker already contains alternate item")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this odd. Let's say req.hashes is [h1,h2]. I deliver [h3, h4]. You will purge f.announced[h3] and f.announced[h4] on line 363, but afaict this panic will hit, because now you're iterating over req and not delivery

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The panic here really is a sanity check that every hash exists only either in the waitlist (waiting to time out to be fetched); either in the announced list (waiting to be shceudled for fetching); or the fetching sets.

Re your example, if I request A and B, and you deliver C and D, I'll nuke out all notions of C and D, but since A and B is missing, those muse only exist in f.alternates (the fethcing sets), which I move back into the queues.

@karalabe karalabe added this to the 1.9.11 milestone Feb 11, 2020
Copy link
Contributor

@holiman holiman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

eth/fetcher/tx_fetcher.go Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
eth/fetcher/tx_fetcher.go Outdated Show resolved Hide resolved
@karalabe karalabe merged commit eddcecc into ethereum:master Feb 13, 2020
@adamschmideg adamschmideg mentioned this pull request Feb 27, 2020
20 tasks
@gzliudan gzliudan mentioned this pull request May 11, 2024
20 tasks
gzliudan added a commit to gzliudan/XDPoSChain that referenced this pull request May 13, 2024
@wanwiset25 wanwiset25 mentioned this pull request Jun 3, 2024
19 tasks
wanwiset25 pushed a commit to XinFinOrg/XDPoSChain that referenced this pull request Jun 19, 2024
wanwiset25 pushed a commit to XinFinOrg/XDPoSChain that referenced this pull request Jun 28, 2024
wanwiset25 added a commit to XinFinOrg/XDPoSChain that referenced this pull request Aug 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants