diff --git a/share/shwap/p2p/bitswap/bitswap.go b/share/shwap/p2p/bitswap/bitswap.go index 1949986d31..37f51e05ba 100644 --- a/share/shwap/p2p/bitswap/bitswap.go +++ b/share/shwap/p2p/bitswap/bitswap.go @@ -17,58 +17,91 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) +// THESE VALUES HAVE TO BE REVISED ON EVERY SINGLE MAX BLOCK SIZE BUMP +// CURRENT BLOCK SIZE TARGET: 8MB (256 EDS) + // Client constants const ( - // simulateDontHaves emulates DONT_HAVE message from a peer after 5 second timeout. - // This protects us from unresponsive/slow peers. - // TODO(@Wondertan): PR to bitswap to make this timeout configurable - // Higher timeout increases the probability of successful reconstruction + // simulateDontHaves emulates DONT_HAVE message from a peer after dynamically estimated timeout. + // Simulating protects from malicious peers and ensure Bitswap tries new peers if originally + // selected one is slow or not responding. + // Dynamic timeout, in our case, can be treated as optimization allowing to move on a next peer faster. + // See simulateDontHaveConfig for more details. simulateDontHaves = true - // providerSearchDelay defines the initial delay before Bitswap client starts aggressive - // broadcasting of WANTs to all the peers. We offset this for longer than the default to minimize + // broadcastDelay defines the initial delay before Bitswap client starts aggressive + // broadcasting of live WANTs to all the peers. We offset this for longer than the default to minimize // unnecessary broadcasting as in most cases we already have peers connected with needed data on // a new request. - providerSearchDelay = time.Second * 10 - // rebroadcastDelay is similar to the providerSearchDelay, but it targets DHT/ContentRouting + broadcastDelay = time.Second * 10 + // disablePerPeerRetries disables rebroadcasting of WANTs with no response in peer message queue. + // We keep it enabled to account for case where maxServerWantListsPerPeer gets exceeded, loosing WANTs. + disablePerPeerRetries = false + // provSearchDelay is similar to the broadcastDelay, but it targets DHT/ContentRouting // peer discovery and a gentle broadcast of a single random live WANT to all connected peers. - // Considering no DHT usage and broadcasting configured by providerSearchDelay, we set - // rebroadcastDelay to max value, effectively disabling it - rebroadcastDelay = 1<<63 - 1 + // Considering no DHT usage and broadcasting configured by broadcastDelay, we set + // provSearchDelay to max value, effectively disabling it + provSearchDelay = 1<<63 - 1 ) // Server constants const ( - // providesEnabled dictates Bitswap Server not to provide content to DHT/ContentRouting as we don't use it - providesEnabled = false - // sendDontHaves prevents Bitswap Server from sending DONT_HAVEs while keeping peers on hold instead: - // * Clients simulate DONT_HAVEs after timeout anyway - // * Servers may not have data immediately and this gives an opportunity to subscribe - // * This is necessary for reconstruction. See https://github.com/celestiaorg/celestia-node/issues/732 + // sendDontHaves dictates Bitswap Server to send DONT_HAVE messages to peers when they requested block is not + // available locally. This is useful for clients to quickly move on to another peer. + // This breaks reconstruction, unless we make reconstruction case detectable on the Server side blocking Bitswap + // from serving DONT_HAVE messages in Blockstore, which would be the goal. + // TODO(@Wondertan): enable once Blockstore handles recent blocks sendDontHaves = false // maxServerWantListsPerPeer defines the limit for maximum possible cached wants/requests per peer - // in the Bitswap. Exceeding this limit will cause Bitswap server to drop requested wants leaving - // client stuck for sometime. - // Thus, we make the limit a bit generous, so we minimize the chances of this happening. + // in the Bitswap. Exceeding this limit will cause Bitswap server to drop requested WANTs leaving + // client stuck for some time. // This is relevant until https://github.com/ipfs/boxo/pull/629#discussion_r1653362485 is fixed. - maxServerWantListsPerPeer = 8096 - // targetMessageSize defines how much data Bitswap will aim to pack within a single message, before - // splitting it up in multiple. Bitswap first looks up the size of the requested data across - // multiple requests and only after reads up the data in portions one-by-one targeting the - // targetMessageSize. - // - // Bigger number will speed transfers up if reading data from disk is fast. In our case, the - // Bitswap's size lookup via [Blockstore] will already cause underlying cache to keep the data, - // so reading up data is fast, and we can aim to pack as much as we can. - targetMessageSize = 1 << 20 // 1MB - // outstandingBytesPerPeer limits number of bytes queued for work for a peer across multiple requests. - // We set it to be equal to targetMessageSize * N, so there can max N messages being prepared for - // a peer at once. - outstandingBytesPerPeer = targetMessageSize * 4 + // 1024 is 64 sampling requests of size 16 and 8 EDS requests with 8mb blocks + maxServerWantListsPerPeer = 1024 + // targetResponseSize defines soft-limit of how much data server packs into a response. + // More data means more compute and time spend while serving a single peer under load, and thus increasing serving + // latency for other peers. + // We set it to 65KiB, which fits a Row of 8MB block with additional metadata. + targetResponseSize = 65 << 10 + // responseWorkersCount is the number of workers packing responses on the server. + // More workers mean more parallelism and faster responses, but also more memory usage. + // Default is 8. + responseWorkersCount = 32 + // maxWorkPerPeer sets maximum concurrent processing work allowed for a peer. + // We set it to be equal to targetResponseSize * responseWorkersCount, so a single peer + // can utilize all workers. Note that when there are more peers, prioritization still ensures + // even spread across all peers. + maxWorkPerPeer = targetResponseSize * responseWorkersCount // replaceHasWithBlockMaxSize configures Bitswap to use Has method instead of GetSize to check existence // of a CID in Blockstore. replaceHasWithBlockMaxSize = 0 + // providesEnabled dictates Bitswap Server not to provide content to DHT/ContentRouting as we don't use it + providesEnabled = false ) +// simulateDontHaveConfig contains the configuration for Bitswap's DONT_HAVE simulation. +// This relies on latency to determine when to simulate a DONT_HAVE from a peer. +var simulateDontHaveConfig = &client.DontHaveTimeoutConfig{ + // MaxTimeout is the limit cutoff time for the dynamic timeout estimation. + MaxTimeout: 30 * time.Second, + // MessageLatencyMultiplier is the multiplier for the message latency to account for errors + // THIS IS THE MOST IMPORTANT KNOB and is tricky to get right due to high variance in latency + // and particularly request processing time. + MessageLatencyMultiplier: 4, + // MessageLatencyAlpha is the alpha value for the exponential moving average of message latency. + // It's a 0-1 value. More prefers recent latencies, less prefers older measurements. + MessageLatencyAlpha: 0.2, + + // Below are less important knobs and target initial timeouts for when there is no latency data yet + // + // DontHaveTimeout is default timeout used until ping latency is measured + DontHaveTimeout: 10 * time.Second, + // time estimate for how long it takes to process a WANT/Block message + // used for ping timeout calculation + MaxExpectedWantProcessTime: 7 * time.Second, + // multiplier to account for errors with ping latency estimates + PingLatencyMultiplier: 3, +} + // NewNetwork constructs Bitswap network for Shwap protocol composition. func NewNetwork(host host.Host, prefix protocol.ID) network.BitSwapNetwork { prefix = shwapProtocolID(prefix) @@ -90,11 +123,15 @@ func NewClient( ) *client.Client { opts := []client.Option{ client.SetSimulateDontHavesOnTimeout(simulateDontHaves), - client.ProviderSearchDelay(providerSearchDelay), - client.RebroadcastDelay(delay.Fixed(rebroadcastDelay)), + client.WithDontHaveTimeoutConfig(simulateDontHaveConfig), + client.WithDisabledMessageQueueRebroadcast(disablePerPeerRetries), // Prevents Has calls to Blockstore for metric that counts duplicates // Unnecessary for our use case, so we can save some disk lookups. client.WithoutDuplicatedBlockStats(), + + // These two options have mixed up named. One should be another and vice versa. + client.ProviderSearchDelay(broadcastDelay), + client.RebroadcastDelay(delay.Fixed(provSearchDelay)), } return client.New( ctx, @@ -115,9 +152,10 @@ func NewServer( server.ProvideEnabled(providesEnabled), server.SetSendDontHaves(sendDontHaves), server.MaxQueuedWantlistEntriesPerPeer(maxServerWantListsPerPeer), - server.WithTargetMessageSize(targetMessageSize), - server.MaxOutstandingBytesPerPeer(outstandingBytesPerPeer), + server.WithTargetMessageSize(targetResponseSize), + server.MaxOutstandingBytesPerPeer(maxWorkPerPeer), server.WithWantHaveReplaceSize(replaceHasWithBlockMaxSize), + server.TaskWorkerCount(responseWorkersCount), } return server.New(ctx, net, bstore, opts...) }