-
Notifications
You must be signed in to change notification settings - Fork 48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: race condition when requesting the same block twice #214
fix: race condition when requesting the same block twice #214
Conversation
Fixes ipfs/js-ipfs#2814 |
src/index.js
Outdated
self.network.provide(block.cid).catch((err) => { | ||
self._log.error('Failed to provide: %s', err.message) | ||
}) | ||
self._sendHaveBlockNotifications(block) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my understanding that the async function asyncFn
passed to blockstore.putMany(asyncFn)
supplies the blocks to be put - but doesn't make guarantee about when those blocks will be commited to the blockstore.
So I think it may make more sense to send notifications after blockstore.putMany() has completed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought similar, but you'd have to iterate over the list of blocks twice and they may not be available to the source any more (e.g. it may not be an array).
const iter = function * () {
yield 'cid'
}
function main (source) {
for (const cid of source) {
console.info(cid)
}
for (const cid of source) {
console.info(cid)
}
}
main(iter())
// prints 'cid' once, not twice
Either that or you store the blocks in a list for processing after putMany
, which may exhaust available memory, or you store just the CIDs and retrieve the blocks after the putMany
, though they may not be available and the list may be unreasonably long, etc..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The solution I am using in the new message types PR is to store them in an array:
https://github.com/ipfs/js-ipfs-bitswap/pull/211/files/f2b6be5ffbae25bdd911496a356b02b4490714ce#diff-1fdf421c05c1140f6d71444ea2b27638R285
I don't think it will use much more memory, because it's just a list of pointers, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's going to be a factor of the size of the added blocks - if you add a 10GB .iso file to IPFS, sticking them all in an array before sending notifications will cause the process to consume 10GB of memory.
If instead each block is processed one at a time (or in small batches) from reading the file to writing it out to disk it'll consume a lot less.
This isn't the only problem site, obviously - if our final blockstore batch size is unbounded it'll crash there too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I’m not sure bitswap is the right place to solve this problem. The blockstore has created it by making the behaviour of putMany non-obvious (to me at least), it should make the caller’s life a bit easier by limiting batch sizes, caching things that have been batched up, etc, though this is way out of scope for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the blocks are a parameter to bitswap.putMany(blocks)
, so they're not going to get cleaned up until the function returns. If we create an array of pointers to the blocks, that will use a negligible amount of memory compared to the blocks themselves.
I agree that the blockstore is probably a better place to tackle memory issues.
Just to clarify, when we call yield
on line 292 I don't believe that means that the block has been committed to the blockstore. That's why I'm suggesting we only send out notifications after blockstore.putMany()
completes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bitswap.putMany(blocks)
takes an iterable as an argument, which might be an array in which case yes, they are all in memory at once so storing references to them in an array does not hurt, but it also might be a readable stream or a generator in which case they are pulled one block at a time. It's for this case that we do not want to stick them all in an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I see you're right blocks
is an async iterable.
For correctness I believe we need to send notifications after the block has been committed, ie at the end of blockstore.putMany()
.
In order to be able to send notifications we could check the type of blocks
and then
- if it's an array: store a list of refs to each block that was put
- if it's an async iterable: store a list of CIDs then call
datastore.getBlocks()
for notifications. Probably most datastore implementations will have some caching that will make this perform reasonably well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking back on this blockstore.putMany()
conflates batching with streaming which I think is a mistake.
The intention (as I see it) of putMany
is to allow streaming blocks into the blockstore, but it can actually end up batching those blocks, worst case storing them until the stream ends - and in our case forcing the caller to store something to do with the whole stream (e.g. the list of CIDs to notify of)
I think we should be explicit in our interfaces and remove the batching semantic from blockstore.putMany()
and instead expose a blockstore.putBatch()
or similar method for when that behaviour is desirable.
The batching is done at the repo level, I'm more than happy to pull that out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's been pulled out. ipfs-repo@3.x.x
.putMany()
now streams to the datastore instead of using batches.
@achingbrain what do you think about this approach? #216 I still need to fix the test but want to get your thoughts first |
2d8c892
to
57185ea
Compare
When we call `blockstore.putMany`, some implementations will batch up all the `put`s and write them at once. This means that `blockstore.has` might not return `true` for a little while - if another request for a given block comes in before `blockstore.has` returns `true` it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang idefinately. The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block.
57185ea
to
1fc09ed
Compare
We weren't using 1.2.0 in the protocol negotiation. Now we do.
Further improvements in this branch:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM but i would keep .has and would try to find another way to surface the issues that come with relying in the .has output without a transaction
- Key notifications by CID multihashes so one block can service multiple wants if they are for the same data but requested with difference CIDs - Pass in an AbortSignal to notifications and that tears down only the listeners set up for that invocation and rejects the current promise only when aborted - notifications.unwant will now reject all outstanding promises for that CID A follow up commit will handle removing things from the want list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM great work!!
@dirkmc can this be merged & released? |
networkA.start() | ||
networkB.start() | ||
|
||
// FIXME: have to already be connected as sendMessage only accepts a peer id, not a PeerInfo |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This gets fixed in #217
The peer address just needs to be added to the AddressBook before the operation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create an issue to track this please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already updated #217 to add the peer multiaddrs to the AddressBook before the sendMessage
and removed the dial operation from the test.
We are removing the PeerInfo from libp2p@0.28. multiaddrs
for the peer should be in the AddressBook before any attempt to dial using a PeerId. Considering this case, the nodes needed to "discover" themselves, in order to send a message. The "discover" here is adding the multiaddrs to the AddressBook and then use the sendMessage
with the PeerId.
So, the goal here is to use the new libp2p and not to accept PeerInfo in the sendMessage. I can create an issue to track this, but I am considering this as part of updating libp2p in bitswap.
it('dials to peer using Bitswap 1.2.0', async () => {
networkA = new Network(p2pA, bitswapMockA)
// only supports 1.2.0
networkB = new Network(p2pB, bitswapMockB)
networkB.protocols = ['/ipfs/bitswap/1.2.0']
networkA.start()
networkB.start()
const deferred = pDefer()
bitswapMockB._receiveMessage = () => {
deferred.resolve()
}
p2pA.peerStore.addressBook.set(p2pB.peerId, p2pB.multiaddrs)
await networkA.sendMessage(p2pB.peerId, new Message(true))
return deferred
})
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some good cleanup in here as well, thanks! 👍
Co-authored-by: dirkmc <dirkmdev@gmail.com>
When we call
blockstore.putMany
, some implementations will batch up all theput
s and write them at once. This means thatblockstore.has
might not returntrue
for a little while - if another request for a given block comes in beforeblockstore.has
returnstrue
it'll get added to the want list. If the block then finishes it's batch and finally a remote peer supplies the wanted block, the notifications that complete the second block request will never get sent and the process will hang indefinitely.The change made here is to separate the sending of notifications out from putting things into the blockstore. If the blockstore has a block, but the block is still in the wantlist, send notifications that we now have the block.