diff --git a/tests/integration/1_minute/testecbug.nim b/tests/integration/1_minute/testecbug.nim index a5bfa832e..d1980c5a5 100644 --- a/tests/integration/1_minute/testecbug.nim +++ b/tests/integration/1_minute/testecbug.nim @@ -4,9 +4,7 @@ import ../marketplacesuite import ../nodeconfigs import ../hardhatconfig -marketplacesuite( - name = "Bug #821 - node crashes during erasure coding", stopOnRequestFail = true -): +marketplacesuite(name = "Bug #821 - node crashes during erasure coding"): test "should be able to create storage request and download dataset", NodeConfigs( clients: CodexConfigs @@ -18,42 +16,29 @@ marketplacesuite( providers: CodexConfigs.init(nodes = 0).some, ): let - pricePerBytePerSecond = 1.u256 duration = 20.periods - collateralPerByte = 1.u256 expiry = 10.periods - data = await RandomChunker.example(blocks = 8) client = clients()[0] clientApi = client.client + data = await RandomChunker.example(blocks = 8) + + let (purchaseId, requestId) = await requestStorage( + clientApi, duration = duration, expiry = expiry, data = data.some + ) - let cid = (await clientApi.upload(data)).get + let storageRequestedEvent = newAsyncEvent() - var requestId = none RequestId proc onStorageRequested(eventResult: ?!StorageRequested) = assert not eventResult.isErr - requestId = some (!eventResult).requestId + storageRequestedEvent.fire() - let subscription = await marketplace.subscribe(StorageRequested, onStorageRequested) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = pricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, - ) - - check eventually(requestId.isSome, timeout = expiry.int * 1000) + await marketplaceSubscribe(StorageRequested, onStorageRequested) + await storageRequestedEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) let - request = await marketplace.getRequest(requestId.get) + request = await marketplace.getRequest(requestId) cidFromRequest = request.content.cid downloaded = await clientApi.downloadBytes(cidFromRequest, local = true) check downloaded.isOk check downloaded.get.toHex == data.toHex - - await subscription.unsubscribe() diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index b04626c49..5afd627d0 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -1,13 +1,13 @@ import std/times -import std/httpclient import ../../examples import ../../contracts/time import ../../contracts/deployment -import ./../marketplacesuite -import ../twonodes +import ./../marketplacesuite except Subscription +import ../twonodes except Subscription import ../nodeconfigs +from pkg/ethers import Subscription -marketplacesuite(name = "Marketplace", stopOnRequestFail = true): +marketplacesuite(name = "Marketplace"): let marketplaceConfig = NodeConfigs( clients: CodexConfigs.init(nodes = 1).some, providers: CodexConfigs.init(nodes = 1).some, @@ -23,6 +23,11 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const size = 0xFFFFFF.uint64 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes setup: host = providers()[0].client @@ -35,101 +40,133 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. await ethProvider.advanceTime(1.u256) - test "nodes negotiate contracts on the marketplace", marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) + test "nodes negotiate contracts on the marketplace", + marketplaceConfig, stopOnRequestFail = true: # host makes storage available let availability = ( await host.postAvailability( totalSize = size, - duration = 20 * 60.uint64, + duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = size.u256 * minPricePerBytePerSecond, ) ).get # client requests storage - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = await requestStorage(client) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string + + let state = await marketplace.requestState(requestId) + check state == RequestState.Started + let availabilities = (await host.getAvailabilities()).get check availabilities.len == 1 + let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) + check newSize > 0 and newSize.u256 + datasetSize == size.u256 let reservations = (await host.getAvailabilityReservations(availability.id)).get check reservations.len == 3 check reservations[0].requestId == purchase.requestId + let signer = ethProvider.getSigner(hostAccount) + let marketplaceWithProviderSigner = marketplace.connect(signer) + let slots = await marketplaceWithProviderSigner.mySlots() + check slots.len == 3 + + for slotId in slots: + let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) + check slot.request.id == purchase.requestId + test "node slots gets paid out and rest of tokens are returned to client", - marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let duration = 20 * 60.uint64 + marketplaceConfig, stopOnRequestFail = true: + var providerRewardEvent = newAsyncEvent() + var clientFundsEvent = newAsyncEvent() + var transferEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId # host makes storage available let startBalanceHost = await token.balanceOf(hostAccount) + let startBalanceClient = await token.balanceOf(clientAccount) + + proc storeFilledAtTimestamps() {.async.} = + let filledAt = await ethProvider.blockTime(BlockTag.latest) + filledAtPerSlot.add(filledAt) + + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = + assert not eventResult.isErr + let event = !eventResult + asyncSpawn storeFilledAtTimestamps() + + proc checkProviderRewards() {.async.} = + let endBalanceHost = await token.balanceOf(hostAccount) + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + + if rewards + startBalanceHost == endBalanceHost: + providerRewardEvent.fire() + + proc checkClientFunds() {.async.} = + let requestEnd = await marketplace.requestEnd(requestId) + let hostRewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + + let requestPrice = pricePerSlotPerSecond * duration.u256 * 3 + let fundsBackToClient = requestPrice - hostRewards + let endBalanceClient = await token.balanceOf(clientAccount) + + if startBalanceClient + fundsBackToClient - requestPrice == endBalanceClient: + clientFundsEvent.fire() + + proc onTransfer(eventResult: ?!Transfer) = + assert not eventResult.isErr + + let data = eventResult.get() + if data.receiver == hostAccount: + asyncSpawn checkProviderRewards() + if data.receiver == clientAccount: + asyncSpawn checkClientFunds() + discard ( await host.postAvailability( totalSize = size, - duration = 20 * 60.uint64, + duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = size.u256 * minPricePerBytePerSecond, ) ).get # client requests storage - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - - discard await waitForRequestToStart() + let (_, id) = await requestStorage(client) + requestId = id - let purchase = (await client.getPurchase(id)).get - check purchase.error == none string + # Subscribe SlotFilled event to receive the filledAt timestamp + # and calculate the provider reward + await marketplaceSubscribe(SlotFilled, onSlotFilled) - let clientBalanceBeforeFinished = await token.balanceOf(clientAccount) + await waitForRequestToStart(requestId, expiry.int64) # Proving mechanism uses blockchain clock to do proving/collect/cleanup round # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks # only with new transaction await ethProvider.advanceTime(duration.u256) - # Checking that the hosting node received reward for at least the time between - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >= - (duration - 5 * 60).u256 * pricePerSlotPerSecond * ecNodes.u256 + await tokenSubscribe(onTransfer) - # Checking that client node receives some funds back that were not used for the host nodes - check eventually( - (await token.balanceOf(clientAccount)) - clientBalanceBeforeFinished > 0, - timeout = 10 * 1000, # give client a bit of time to withdraw its funds - ) + # Wait for the exact expected balances. + # The timeout is 60 seconds because the event should occur quickly, + # thanks to `advanceTime` moving to the end of the request duration. + await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( @@ -141,77 +178,65 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # .withLogFile() # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") .some, - ): - let client0 = clients()[0] - let provider0 = providers()[0] - let provider1 = providers()[1] - let duration = 20 * 60.uint64 - - let data = await RandomChunker.example(blocks = blocks) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + ), + stopOnRequestFail = true: + var requestId: RequestId # We create an avavilability allowing the first SP to host the 3 slots. # So the second SP will not have any availability so it will just process # the slots and ignore them. - discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + discard await host.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, - ) - - let cid = (await client0.client.upload(data)).get - - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let requestId = (await client0.client.requestId(purchaseId)).get + let (_, id) = await requestStorage(client) + requestId = id # We wait that the 3 slots are filled by the first SP - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Here we create the same availability as previously but for the second SP. # Meaning that, after ignoring all the slots for the first request, the second SP will process # and host the slots for the second request. - discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + let host1 = providers()[1].client + + discard await host1.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * collateralPerByte, + totalCollateral = 3 * slotBytes * collateralPerByte, ) - let purchaseId2 = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - let requestId2 = (await client0.client.requestId(purchaseId2)).get + let (_, id2) = await requestStorage(client) + requestId = id2 # Wait that the slots of the second request are filled - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Double check, verify that our second SP hosts the 3 slots - check ((await provider1.client.getSlots()).get).len == 3 + let host1Account = providers()[1].ethAccount + let signer = ethProvider.getSigner(host1Account) + let marketplaceWithProviderSigner = marketplace.connect(signer) + let slots = await marketplaceWithProviderSigner.mySlots() + check slots.len == 3 + + for slotId in slots: + let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) + check slot.request.id == requestId -marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): +marketplacesuite(name = "Marketplace payouts"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes test "expired request partially pays out for stored time", NodeConfigs( @@ -231,88 +256,104 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): # "node", "marketplace", "sales", "reservations", "node", "statemachine" # ) .some, - ): - let duration = 20.periods - let expiry = 10.periods - let data = await RandomChunker.example(blocks = blocks) + ), + stopOnRequestFail = true: let client = clients()[0] let provider = providers()[0] let clientApi = client.client let providerApi = provider.client - let startBalanceProvider = await token.balanceOf(provider.ethAccount) + let hostAccount = providers()[0].ethAccount + let clientAccount = clients()[0].ethAccount + + var slotIndex = 0.uint64 + var slotFilledEvent = newAsyncEvent() + var requestCancelledEvent = newAsyncEvent() + var providerRewardEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId + let startBalanceClient = await token.balanceOf(client.ethAccount) + let startBalanceProvider = await token.balanceOf(hostAccount) + + proc storeFilledAtTimestamps() {.async.} = + let filledAt = await ethProvider.blockTime(BlockTag.latest) + filledAtPerSlot.add(filledAt) + + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = + assert not eventResult.isErr + let event = !eventResult + slotIndex = event.slotIndex + asyncSpawn storeFilledAtTimestamps() + slotFilledEvent.fire() + + proc onRequestCancelled(eventResult: ?!RequestCancelled) = + assert not eventResult.isErr + requestCancelledEvent.fire() + + proc checkProviderRewards() {.async.} = + let endBalanceProvider = await token.balanceOf(hostAccount) + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + + if rewards + startBalanceProvider == endBalanceProvider: + providerRewardEvent.fire() + + proc onTransfer(eventResult: ?!Transfer) = + assert not eventResult.isErr + + let data = eventResult.get() + if data.receiver == hostAccount: + asyncSpawn checkProviderRewards() # provider makes storage available let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) let totalAvailabilitySize = (datasetSize div 2).truncate(uint64) + discard await providerApi.postAvailability( # make availability size small enough that we can't fill all the slots, # thus causing a cancellation totalSize = totalAvailabilitySize, - duration = duration.uint64, + duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = collateralPerByte * totalAvailabilitySize.u256, ) - let cid = (await clientApi.upload(data)).get + let (_, id) = await requestStorage(clientApi) + requestId = id - var slotIdxFilled = none uint64 - proc onSlotFilled(eventResult: ?!SlotFilled) = - assert not eventResult.isErr - slotIdxFilled = some (!eventResult).slotIndex - - let slotFilledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - - var requestCancelledEvent = newAsyncEvent() - proc onRequestCancelled(eventResult: ?!RequestCancelled) = - assert not eventResult.isErr - requestCancelledEvent.fire() - - let requestCancelledSubscription = - await marketplace.subscribe(RequestCancelled, onRequestCancelled) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(RequestCancelled, onRequestCancelled) # wait until one slot is filled - check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000) - let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) + let slotId = slotId(requestId, slotIndex) + + await tokenSubscribe(onTransfer) # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) - await requestCancelledEvent.wait().wait(timeout = chronos.seconds(5)) - await advanceToNextPeriod() - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize + # Wait for the expected balance for the provider + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) - check eventually ( - let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) - endBalanceProvider > startBalanceProvider and - endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond - ) - check eventually( - ( - let endBalanceClient = (await token.balanceOf(client.ethAccount)) - let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) - (startBalanceClient - endBalanceClient) == - (endBalanceProvider - startBalanceProvider) - ), - timeout = 10 * 1000, # give client a bit of time to withdraw its funds - ) + # Ensure that total rewards stay within the payout limit + # determined by the expiry date. + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + check expiry.u256 * pricePerSlotPerSecond >= rewards + + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) + let endBalanceClient = (await token.balanceOf(client.ethAccount)) - await slotFilledSubscription.unsubscribe() - await requestCancelledSubscription.unsubscribe() + check( + startBalanceClient - endBalanceClient == endBalanceProvider - startBalanceProvider + ) test "the collateral is returned after a sale is ignored", NodeConfigs( @@ -327,14 +368,12 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): # "node", "marketplace", "sales", "reservations", "statemachine" # ) .some, - ): - let data = await RandomChunker.example(blocks = blocks) + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] let provider2 = providers()[2] - let duration = 20 * 60.uint64 - let slotSize = slotSize(blocks, ecNodes, ecTolerance) # Here we create 3 SP which can host 3 slot. # While they will process the slot, each SP will @@ -343,40 +382,26 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): # will be ignored. In that case, the collateral assigned for # the reservation should return to the availability. discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider2.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let cid = (await client0.client.upload(data)).get - - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - - let requestId = (await client0.client.requestId(purchaseId)).get - - discard await waitForRequestToStart() + let (_, requestId) = await requestStorage(client0.client) + await waitForRequestToStart(requestId, expiry.int64) # Here we will check that for each provider, the total remaining collateral # will match the available slots. @@ -386,12 +411,15 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): let client = provider.client check eventually( block: - let availabilities = (await client.getAvailabilities()).get - let availability = availabilities[0] - let slots = (await client.getSlots()).get - let availableSlots = (3 - slots.len).u256 - - availability.totalRemainingCollateral == - availableSlots * slotSize * minPricePerBytePerSecond, + try: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 + + availability.totalRemainingCollateral == + availableSlots * slotBytes * minPricePerBytePerSecond + except HttpConnectionError: + return false, timeout = 30 * 1000, ) diff --git a/tests/integration/30_minutes/testproofs.nim b/tests/integration/30_minutes/testproofs.nim index b06e4d824..8b91050d7 100644 --- a/tests/integration/30_minutes/testproofs.nim +++ b/tests/integration/30_minutes/testproofs.nim @@ -13,7 +13,7 @@ export logutils logScope: topics = "integration test proofs" -marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false): +marketplacesuite(name = "Hosts submit regular proofs"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 8 @@ -34,12 +34,17 @@ marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("marketplace", "sales", "reservations", "node", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 5.periods + let proofSubmittedEvent = newAsyncEvent() + + proc onProofSubmitted(event: ?!ProofSubmitted) = + if event.isOk: + proofSubmittedEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -49,34 +54,22 @@ marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = + await requestStorage(client0, duration = duration, expiry = expiry) let purchase = (await client0.getPurchase(purchaseId)).get check purchase.error == none string - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + await marketplaceSubscribe(ProofSubmitted, onProofSubmitted) - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) - var proofWasSubmitted = false - proc onProofSubmitted(event: ?!ProofSubmitted) = - proofWasSubmitted = event.isOk - - let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) - - check eventually(proofWasSubmitted, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await proofSubmittedEvent.wait().wait( + timeout = chronos.seconds(secondsTillRequestEnd) + ) -marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): +marketplacesuite(name = "Simulate invalid proofs"): # TODO: these are very loose tests in that they are not testing EXACTLY how # proofs were marked as missed by the validator. These tests should be # tightened so that they are showing, as an integration test, that specific @@ -88,6 +81,17 @@ marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): const ecNodes = 3 const ecTolerance = 1 + var slotWasFreedEvent: AsyncEvent + var requestId: RequestId + + proc onSlotFreed(event: ?!SlotFreed) = + if event.isOk and event.value.requestId == requestId: + slotWasFreedEvent.fire() + + setup: + requestId = RequestId.default() + slotWasFreedEvent = newAsyncEvent() + test "slot is freed after too many invalid proofs submitted", NodeConfigs( # Uncomment to start Hardhat automatically, typically so logs can be inspected locally @@ -115,12 +119,12 @@ marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("validator", "onchain", "ethers", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -130,32 +134,14 @@ marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = ( - await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) + let (purchaseId, id) = await requestStorage( + client0, duration = duration, proofProbability = 1.u256, expiry = expiry ) - let requestId = (await client0.requestId(purchaseId)).get + requestId = id - discard await waitForRequestToStart(expiry.int) + await marketplaceSubscribe(SlotFreed, onSlotFreed) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + await slotWasFreedEvent.wait().wait(timeout = chronos.seconds(duration.int64)) test "slot is not freed when not enough invalid proofs submitted", NodeConfigs( @@ -178,12 +164,20 @@ marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("validator", "onchain", "ethers", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods + let slotWasFilledEvent = newAsyncEvent() + + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + + if event.requestId == requestId: + slotWasFilledEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -193,44 +187,24 @@ marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get + let (purchaseId, id) = + await requestStorage(client0, duration = duration, expiry = expiry) + requestId = id - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) - let requestId = (await client0.requestId(purchaseId)).get - - var slotWasFilled = false - proc onSlotFilled(eventResult: ?!SlotFilled) = - assert not eventResult.isErr - let event = !eventResult - - if event.requestId == requestId: - slotWasFilled = true - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) # wait for the first slot to be filled - check eventually(slotWasFilled, timeout = expiry.int * 1000) + await slotWasFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let freedSubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - # In 2 periods you cannot have enough invalid proofs submitted: - await sleepAsync(2.periods.int.seconds) - check not slotWasFreed - - await filledSubscription.unsubscribe() - await freedSubscription.unsubscribe() + try: + # In 2 periods you cannot have enough invalid proofs submitted: + await slotWasFreedEvent.wait().wait( + timeout = chronos.seconds(2.periods.int.seconds) + ) + fail("invalid proofs were not expected in 2 periods") + except AsyncTimeoutError: + discard # TODO: uncomment once fixed # WARNING: in the meantime minPrice has changed to minPricePerBytePerSecond diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index f7d8dba7b..69e45ff72 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -4,7 +4,7 @@ import ../../contracts/time import ../../contracts/deployment import ../../codex/helpers import ../../examples -import ../marketplacesuite +import ../marketplacesuite as marketplacesuite import ../nodeconfigs export logutils @@ -12,7 +12,7 @@ export logutils logScope: topics = "integration test slot repair" -marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): +marketplacesuite(name = "SP Slot Repair"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 3 @@ -20,18 +20,17 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): const ecTolerance = 1 const size = slotSize(blocks, ecNodes, ecTolerance) - var filledSlotIds: seq[SlotId] = @[] - var freedSlotId = none SlotId + var freedSlotIndex = none uint64 var requestId: RequestId + var slotFilledEvent: AsyncEvent # Here we are keeping track of the slot filled using their ids. proc onSlotFilled(eventResult: ?!SlotFilled) = assert not eventResult.isErr let event = !eventResult - if event.requestId == requestId: - let slotId = slotId(event.requestId, event.slotIndex) - filledSlotIds.add slotId + if event.requestId == requestId and event.slotIndex == freedSlotIndex.get: + slotFilledEvent.fire() # Here we are retrieving the slot id freed. # When the event is triggered, the slot id is removed @@ -42,44 +41,23 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): let slotId = slotId(event.requestId, event.slotIndex) if event.requestId == requestId: - assert slotId in filledSlotIds - filledSlotIds.del(filledSlotIds.find(slotId)) - freedSlotId = some(slotId) - - proc createPurchase(client: CodexClient): Future[PurchaseId] {.async.} = - let data = await RandomChunker.example(blocks = blocks) - let cid = (await client.upload(data)).get - - let purchaseId = await client.requestStorage( - cid, - expiry = 10.periods, - duration = 20.periods, - nodes = ecNodes, - tolerance = ecTolerance, - collateralPerByte = 1.u256, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - ) - requestId = (await client.requestId(purchaseId)).get + freedSlotIndex = some event.slotIndex - return purchaseId - - proc freeSlot(provider: CodexClient): Future[void] {.async.} = + proc freeSlot(provider: CodexProcess): Future[void] {.async.} = # Get the second provider signer. - let signer = ethProvider.getSigner(accounts[2]) + let signer = ethProvider.getSigner(provider.ethAccount) let marketplaceWithSecondProviderSigner = marketplace.connect(signer) # Call freeSlot to speed up the process. # It accelerates the test by skipping validator # proof verification and not waiting for the full period. # The downside is that this doesn't reflect the real slot freed process. - let slots = (await provider.getSlots()).get() + let slots = (await provider.client.getSlots()).get() let slotId = slotId(requestId, slots[0].slotIndex) discard await marketplaceWithSecondProviderSigner.freeSlot(slotId) setup: - filledSlotIds = @[] - freedSlotId = none SlotId + slotFilledEvent = newAsyncEvent() test "repair from local store", NodeConfigs( @@ -97,7 +75,8 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): # .debug() # .withLogFile() # .withLogTopics("validator").some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] @@ -112,7 +91,7 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): totalSize = 2 * size.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * size * collateralPerByte, + totalCollateral = 2 * size * collateralPerByte, ) ).get let availability1 = ( @@ -124,13 +103,17 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -146,32 +129,34 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): await provider0.client.patchAvailability( availabilityId = availability0.id, totalSize = (3 * size.truncate(uint64)).uint64.some, + totalCollateral = (3.u256 * size * collateralPerByte).some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally by SP 1. - check eventually( - freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 - ) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from local and remote store", NodeConfigs( - clients: CodexConfigs.init(nodes = 1) - # .debug() - # .withLogTopics("node", "erasure") - .some, - providers: CodexConfigs.init(nodes = 3) - # .debug() - # .withLogFile() - # .withLogTopics("marketplace", "sales", "statemachine", "reservations") - .some, - ): + clients: CodexConfigs + .init(nodes = 1) + # .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 3) + # .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "statemachine", "reservations").some, + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] @@ -203,13 +188,17 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): totalCollateral = size * collateralPerByte, ) - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -226,16 +215,16 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): totalCollateral = (2 * size * collateralPerByte).some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally and remotely (using SP 3) by SP 1. - check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from remote store only", NodeConfigs( @@ -249,7 +238,8 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): # .withLogFile() # .withLogTopics("marketplace", "sales", "statemachine", "reservations") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] @@ -274,13 +264,17 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -298,12 +292,12 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): # SP 2 will not pick the slot again. await provider1.client.patchAvailability(availability1.id, enabled = false.some) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) # At this point, SP 3 should repair the slot from SP 1 and host it. - check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) diff --git a/tests/integration/30_minutes/testvalidator.nim b/tests/integration/30_minutes/testvalidator.nim index ed67b5d07..c65a0597f 100644 --- a/tests/integration/30_minutes/testvalidator.nim +++ b/tests/integration/30_minutes/testvalidator.nim @@ -15,12 +15,11 @@ export logutils logScope: topics = "integration test validation" -marketplacesuite(name = "Validation", stopOnRequestFail = false): +marketplacesuite(name = "Validation"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 const proofProbability = 1.u256 - const collateralPerByte = 1.u256 const minPricePerBytePerSecond = 1.u256 @@ -60,9 +59,6 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): # let mine a block to sync the blocktime with the current clock discard await ethProvider.send("evm_mine") - var currentTime = await ethProvider.currentTime() - let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) @@ -73,28 +69,19 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64 + 60) test "validator uses historical state to mark missing proofs", NodeConfigs( @@ -124,7 +111,6 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): var currentTime = await ethProvider.currentTime() let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -134,20 +120,13 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) # extra block just to make sure we have one that separates us # from the block containing the last (past) SlotFilled event @@ -168,10 +147,6 @@ marketplacesuite(name = "Validation", stopOnRequestFail = false): let node = await startValidatorNode(config) running.add RunningNode(role: Role.Validator, node: node) - discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64 + 60) diff --git a/tests/integration/5_minutes/testsales.nim b/tests/integration/5_minutes/testsales.nim index 246d8fc7d..bd70685b8 100644 --- a/tests/integration/5_minutes/testsales.nim +++ b/tests/integration/5_minutes/testsales.nim @@ -17,7 +17,7 @@ proc findItem[T](items: seq[T], item: T): ?!T = return failure("Not found") -marketplacesuite(name = "Sales", stopOnRequestFail = true): +marketplacesuite(name = "Sales"): let salesConfig = NodeConfigs( clients: CodexConfigs.init(nodes = 1).some, providers: CodexConfigs.init(nodes = 1) @@ -34,7 +34,7 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): host = providers()[0].client client = clients()[0].client - test "node handles new storage availability", salesConfig: + test "node handles new storage availability", salesConfig, stopOnRequestFail = true: let availability1 = ( await host.postAvailability( totalSize = 1.uint64, @@ -53,7 +53,7 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): ).get check availability1 != availability2 - test "node lists storage that is for sale", salesConfig: + test "node lists storage that is for sale", salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 1.uint64, @@ -64,7 +64,7 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): ).get check availability in (await host.getAvailabilities()).get - test "updating availability", salesConfig: + test "updating availability", salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -95,7 +95,8 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): check updatedAvailability.enabled == false check updatedAvailability.until == until - test "updating availability - updating totalSize", salesConfig: + test "updating availability - updating totalSize", + salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -112,12 +113,13 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): check updatedAvailability.freeSize == 100000 test "updating availability - updating totalSize does not allow bellow utilized", - salesConfig: + salesConfig, stopOnRequestFail = true: let originalSize = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let minPricePerBytePerSecond = 3.u256 let collateralPerByte = 1.u256 let totalCollateral = originalSize.u256 * collateralPerByte + let expiry = 10 * 60.uint64 + let availability = ( await host.postAvailability( totalSize = originalSize, @@ -128,19 +130,11 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): ).get # Lets create storage request that will utilize some of the availability's space - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = (10 * 60).uint64, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, + let (purchaseId, requestId) = await requestStorage( + client = client, pricePerBytePerSecond = minPricePerBytePerSecond ) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) let updatedAvailability = ((await host.getAvailabilities()).get).findItem(availability).get @@ -164,7 +158,8 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): check newUpdatedAvailability.totalSize == originalSize + 20000 check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 - test "updating availability fails with until negative", salesConfig: + test "updating availability fails with until negative", + salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -181,14 +176,11 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): (await response.body) == "Cannot set until to a negative value" test "returns an error when trying to update the until date before an existing a request is finished", - salesConfig: + salesConfig, stopOnRequestFail = true: let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let duration = 20 * 60.uint64 + let expiry = 10 * 60.uint64 let minPricePerBytePerSecond = 3.u256 - let collateralPerByte = 1.u256 - let ecNodes = 3.uint - let ecTolerance = 1.uint # host makes storage available let availability = ( @@ -200,24 +192,12 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): ) ).get - # client requests storage - let cid = (await client.upload(data)).get - let id = ( - await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - ).get + let (purchaseId, requestId) = + await requestStorage(client, pricePerBytePerSecond = minPricePerBytePerSecond) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string let unixNow = getTime().toUnix() @@ -227,7 +207,6 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): availabilityId = availability.id, until = until.some ) - check: - response.status == 422 - (await response.body) == - "Until parameter must be greater or equal to the longest currently hosted slot" + check response.status == 422 + check (await response.body) == + "Until parameter must be greater or equal to the longest currently hosted slot" diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 5a0a11a64..f226387a1 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -1,3 +1,6 @@ +import macros +import std/unittest + import pkg/chronos import pkg/ethers/erc20 from pkg/libp2p import Cid @@ -5,54 +8,153 @@ import pkg/codex/contracts/marketplace as mp import pkg/codex/periods import pkg/codex/utils/json from pkg/codex/utils import roundUp, divUp -import ./multinodes except Subscription +import ./multinodes except Subscription, Event import ../contracts/time import ../contracts/deployment export mp export multinodes -template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) = +template marketplacesuite*(name: string, body: untyped) = multinodesuite name: var marketplace {.inject, used.}: Marketplace var period: uint64 var periodicity: Periodicity var token {.inject, used.}: Erc20Token - var requestStartedEvent: AsyncEvent - var requestStartedSubscription: Subscription + var subscriptions: seq[Subscription] = @[] + var tokenSubscription: Subscription var requestFailedEvent: AsyncEvent - var requestFailedSubscription: Subscription - proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} = - requestStartedEvent.fire() + template test(tname, startNodeConfigs, stopOnRequestFail, tbody) = + test tname, startNodeConfigs: + stopOnRequestFailed: + tbody + + template stopOnRequestFailed(tbody: untyped) = + let completed = newAsyncEvent() + let requestFailedEvent = newAsyncEvent() + + proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = + assert not eventResult.isErr + requestFailedEvent.fire() + + let sub = await marketplace.subscribe(RequestFailed, onRequestFailed) + subscriptions.add(sub) + + let mainFut = ( + proc(): Future[void] {.async.} = + try: + tbody + completed.fire() + except CancelledError as e: + raise e + except CatchableError as e: + completed.fire() + raise e + )() + + let fastFailFut = ( + proc(): Future[void] {.async.} = + try: + await requestFailedEvent.wait().wait(timeout = chronos.seconds(60)) + completed.fire() + raise newException(TestFailedError, "storage request has failed") + except AsyncTimeoutError: + discard + )() + + await completed.wait().wait(timeout = chronos.seconds(60 * 30)) + + if not fastFailFut.completed: + await fastFailFut.cancelAndWait() + + if mainFut.failed: + raise mainFut.error + + if fastFailFut.failed: + raise fastFailFut.error + + proc check(cond: bool, reason = "Check failed"): void = + if not cond: + fail(reason) - proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = - requestFailedEvent.fire() - if stopOnRequestFail: - fail() + proc marketplaceSubscribe[E: Event]( + event: type E, handler: EventHandler[E] + ) {.async.} = + let sub = await marketplace.subscribe(event, handler) + subscriptions.add(sub) + + proc tokenSubscribe( + handler: proc(event: ?!Transfer) {.gcsafe, raises: [].} + ) {.async.} = + let sub = await token.subscribe(Transfer, handler) + tokenSubscription = sub + + proc subscribeOnRequestFulfilled( + requestId: RequestId + ): Future[AsyncEvent] {.async.} = + let event = newAsyncEvent() + + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) + + return event proc getCurrentPeriod(): Future[Period] {.async.} = return periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) proc waitForRequestToStart( - seconds = 10 * 60 + 10 - ): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} = - await requestStartedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestStartedEvent = newAsyncEvent() + requestId: RequestId, seconds = 10 * 60 + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) + + proc getSecondsTillRequestEnd(requestId: RequestId): Future[int64] {.async.} = + let currentTime = await ethProvider.currentTime() + let requestEnd = await marketplace.requestEnd(requestId) + return requestEnd.int64 - currentTime.truncate(int64) proc waitForRequestToFail( - seconds = (5 * 60) + 10 - ): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} = - await requestFailedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestFailedEvent = newAsyncEvent() + requestId: RequestId, seconds = (5 * 60) + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFailed, onRequestFailed) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) proc advanceToNextPeriod() {.async.} = let periodicity = Periodicity(seconds: period) let currentTime = (await ethProvider.currentTime()).truncate(uint64) let currentPeriod = periodicity.periodOf(currentTime) let endOfPeriod = periodicity.periodEnd(currentPeriod) + await ethProvider.advanceTimeTo(endOfPeriod.u256 + 1) template eventuallyP(condition: untyped, finalPeriod: Period): bool = @@ -125,6 +227,36 @@ template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) return id + proc requestStorage( + client: CodexClient, + proofProbability = 3.u256, + duration = 20 * 60.uint64, + pricePerBytePerSecond = 1.u256, + collateralPerByte = 1.u256, + expiry = 10 * 60.uint64, + nodes = 3, + tolerance = 1, + blocks = 8, + data = seq[byte].none, + ): Future[(PurchaseId, RequestId)] {.async.} = + let bytes = data |? await RandomChunker.example(blocks = blocks) + let cid = (await client.upload(bytes)).get + + let purchaseId = await client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = pricePerBytePerSecond, + proofProbability = proofProbability, + expiry = expiry, + collateralPerByte = collateralPerByte, + nodes = nodes, + tolerance = tolerance, + ) + + let requestId = (await client.requestId(purchaseId)).get + + return (purchaseId, requestId) + setup: marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) let tokenAddress = await marketplace.token() @@ -132,18 +264,13 @@ template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) let config = await marketplace.configuration() period = config.proofs.period periodicity = Periodicity(seconds: period) - - requestStartedEvent = newAsyncEvent() + subscriptions = @[] requestFailedEvent = newAsyncEvent() - - requestStartedSubscription = - await marketplace.subscribe(RequestFulfilled, onRequestStarted) - - requestFailedSubscription = - await marketplace.subscribe(RequestFailed, onRequestFailed) - teardown: - await requestStartedSubscription.unsubscribe() - await requestFailedSubscription.unsubscribe() + for subscription in subscriptions: + await subscription.unsubscribe() + + if not tokenSubscription.isNil: + await tokenSubscription.unsubscribe() body diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 42fff1576..808275510 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -36,6 +36,7 @@ type Hardhat MultiNodeSuiteError = object of CatchableError + TestFailedError* = object of CatchableError const jsonRpcProviderUrl* = "ws://localhost:8545" @@ -108,6 +109,9 @@ template multinodesuite*(name: string, body: untyped) = test tname: tbody + template fail(reason: string) = + raise newException(TestFailedError, reason) + proc sanitize(pathSegment: string): string = var sanitized = pathSegment for invalid in invalidFilenameChars.items: @@ -254,21 +258,34 @@ template multinodesuite*(name: string, body: untyped) = return await newCodexProcess(validatorIdx, config, Role.Validator) - proc teardownImpl() {.async.} = + proc teardownImpl() {.async: (raises: [CancelledError]).} = for nodes in @[validators(), clients(), providers()]: for node in nodes: - await node.stop() # also stops rest client - node.removeDataDir() + try: + await node.stop() # also stops rest client + node.removeDataDir() + except CancelledError as e: + raise e + # Raised by removeDataDir + except Exception as e: + error "error when trying to stop the node", error = e.msg # if hardhat was started in the test, kill the node # otherwise revert the snapshot taken in the test setup let hardhat = hardhat() if not hardhat.isNil: - await hardhat.stop() + try: + await hardhat.stop() + except CancelledError as e: + raise e + except CatchableError as e: + error "error when trying to stop hardhat", error = e.msg else: - discard await send(ethProvider, "evm_revert", @[snapshot]) - - await ethProvider.close() + try: + discard await send(ethProvider, "evm_revert", @[snapshot]) + await ethProvider.close() + except ProviderError as e: + error "error when trying to cleanup the evm", error = e.msg running = @[] @@ -276,13 +293,15 @@ template multinodesuite*(name: string, body: untyped) = try: tryBody except CatchableError as er: - fatal message, error = er.msg - echo "[FATAL] ", message, ": ", er.msg + if er of TestFailedError: + info "[FAILED] ", reason = er.msg + else: + fatal message, error = er.msg + echo "[FATAL] ", message, ": ", er.msg await teardownImpl() when declared(teardownAllIMPL): teardownAllIMPL() - fail() - quit(1) + fail(er.msg) proc updateBootstrapNodes( node: CodexProcess