From d312a07ea9dc10b49ee002129498406da4fb5388 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 24 Jun 2025 07:39:18 +0200 Subject: [PATCH 01/12] Use ERC20 subscriptions instead of eventually to check balances --- .../30_minutes/testmarketplace.nim | 60 ++++++++++++++----- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index b04626c49..5a36793ea 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -109,6 +109,16 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): discard await waitForRequestToStart() + var counter = 0 + var transferEvent = newAsyncEvent() + proc onTransfer(eventResult: ?!Transfer) = + assert not eventResult.isErr + counter += 1 + if counter == 6: + transferEvent.fire() + + let tokenSubscription = await token.subscribe(Transfer, onTransfer) + let purchase = (await client.getPurchase(id)).get check purchase.error == none string @@ -119,17 +129,18 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # only with new transaction await ethProvider.advanceTime(duration.u256) + await transferEvent.wait().wait(timeout = chronos.seconds(60)) + # Checking that the hosting node received reward for at least the time between let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check eventually (await token.balanceOf(hostAccount)) - startBalanceHost >= + check (await token.balanceOf(hostAccount)) - startBalanceHost >= (duration - 5 * 60).u256 * pricePerSlotPerSecond * ecNodes.u256 # Checking that client node receives some funds back that were not used for the host nodes - check eventually( - (await token.balanceOf(clientAccount)) - clientBalanceBeforeFinished > 0, - timeout = 10 * 1000, # give client a bit of time to withdraw its funds - ) + check ((await token.balanceOf(clientAccount)) - clientBalanceBeforeFinished > 0) + + await tokenSubscription.unsubscribe() test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( @@ -286,6 +297,18 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000) let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled) + var counter = 0 + var transferEvent = newAsyncEvent() + proc onTransfer(eventResult: ?!Transfer) = + assert not eventResult.isErr + counter += 1 + if counter == 3: + transferEvent.fire() + + let tokenAddress = await marketplace.token() + let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) + let tokenSubscription = await token.subscribe(Transfer, onTransfer) + # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) @@ -293,26 +316,28 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): await advanceToNextPeriod() + await transferEvent.wait().wait(timeout = chronos.seconds(60)) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check eventually ( + check ( let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) endBalanceProvider > startBalanceProvider and endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond ) - check eventually( + check( ( let endBalanceClient = (await token.balanceOf(client.ethAccount)) let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) (startBalanceClient - endBalanceClient) == (endBalanceProvider - startBalanceProvider) - ), - timeout = 10 * 1000, # give client a bit of time to withdraw its funds + ) ) await slotFilledSubscription.unsubscribe() await requestCancelledSubscription.unsubscribe() + await tokenSubscription.unsubscribe() test "the collateral is returned after a sale is ignored", NodeConfigs( @@ -386,12 +411,15 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): let client = provider.client check eventually( block: - let availabilities = (await client.getAvailabilities()).get - let availability = availabilities[0] - let slots = (await client.getSlots()).get - let availableSlots = (3 - slots.len).u256 - - availability.totalRemainingCollateral == - availableSlots * slotSize * minPricePerBytePerSecond, + try: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 + + availability.totalRemainingCollateral == + availableSlots * slotSize * minPricePerBytePerSecond + except HttpConnectionError: + return false, timeout = 30 * 1000, ) From bf5ae2eb09533d7b84d6c7b87e53cd8769a0890e Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 25 Jun 2025 15:00:42 +0200 Subject: [PATCH 02/12] Enable logs because the test seems to fail occasionally. --- .../integration/30_minutes/testslotrepair.nim | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index f7d8dba7b..2586c1f5d 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -162,15 +162,16 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): test "repair from local and remote store", NodeConfigs( - clients: CodexConfigs.init(nodes = 1) - # .debug() - # .withLogTopics("node", "erasure") - .some, - providers: CodexConfigs.init(nodes = 3) - # .debug() - # .withLogFile() - # .withLogTopics("marketplace", "sales", "statemachine", "reservations") - .some, + clients: CodexConfigs + .init(nodes = 1) + # .debug() + .withLogFile() + .withLogTopics("node", "erasure").some, + providers: CodexConfigs + .init(nodes = 3) + # .debug() + .withLogFile() + .withLogTopics("marketplace", "sales", "statemachine", "reservations").some, ): let client0 = clients()[0] let provider0 = providers()[0] From 3d7a4b790ad4be008bd58cecfe8508b469081cf4 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Tue, 1 Jul 2025 09:58:44 +0200 Subject: [PATCH 03/12] Check exact balances using ERC20 transfer subscription --- .../30_minutes/testmarketplace.nim | 88 ++++++++++++++----- 1 file changed, 65 insertions(+), 23 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 5a36793ea..9569e3bb9 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -85,6 +85,7 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # host makes storage available let startBalanceHost = await token.balanceOf(hostAccount) + let startBalanceClient = await token.balanceOf(clientAccount) discard ( await host.postAvailability( totalSize = size, @@ -94,8 +95,65 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): ) ).get + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + var filledAtPerSlot: seq[UInt256] = @[] + + proc storeFilledAtTimestamps() {.async.} = + let filledAt = await ethProvider.blockTime(BlockTag.latest) + filledAtPerSlot.add(filledAt) + + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + asyncSpawn storeFilledAtTimestamps() + + let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + # client requests storage let cid = (await client.upload(data)).get + + let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize + var requestEnd = 0.SecondsSince1970 + var hostRewardEvent = newAsyncEvent() + + proc checkHostRewards() {.async.} = + var rewards = 0.u256 + + for filledAt in filledAtPerSlot: + rewards += (requestEnd.u256 - filledAt) * pricePerSlotPerSecond + + let endBalanceHost = await token.balanceOf(hostAccount) + + if rewards + startBalanceHost == endBalanceHost: + hostRewardEvent.fire() + + var clientFundsEvent = newAsyncEvent() + + proc checkHostFunds() {.async.} = + var hostRewards = 0.u256 + + for filledAt in filledAtPerSlot: + hostRewards += (requestEnd.u256 - filledAt) * pricePerSlotPerSecond + + let requestPrice = minPricePerBytePerSecond * slotSize * duration.u256 * 3 + let fundsBackToClient = requestPrice - hostRewards + let endBalanceClient = await token.balanceOf(clientAccount) + + if startBalanceClient + fundsBackToClient - requestPrice == endBalanceClient: + clientFundsEvent.fire() + + var transferEvent = newAsyncEvent() + + proc onTransfer(eventResult: ?!Transfer) = + assert not eventResult.isErr + + let data = eventResult.get() + if data.receiver == hostAccount: + asyncSpawn checkHostRewards() + if data.receiver == clientAccount: + asyncSpawn checkHostFunds() + let id = await client.requestStorage( cid, duration = duration, @@ -107,40 +165,24 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): tolerance = ecTolerance, ) - discard await waitForRequestToStart() - - var counter = 0 - var transferEvent = newAsyncEvent() - proc onTransfer(eventResult: ?!Transfer) = - assert not eventResult.isErr - counter += 1 - if counter == 6: - transferEvent.fire() - - let tokenSubscription = await token.subscribe(Transfer, onTransfer) - - let purchase = (await client.getPurchase(id)).get - check purchase.error == none string + let requestId = (await client.requestId(id)).get - let clientBalanceBeforeFinished = await token.balanceOf(clientAccount) + discard await waitForRequestToStart() # Proving mechanism uses blockchain clock to do proving/collect/cleanup round # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks # only with new transaction await ethProvider.advanceTime(duration.u256) - await transferEvent.wait().wait(timeout = chronos.seconds(60)) + requestEnd = await marketplace.requestEnd(requestId) - # Checking that the hosting node received reward for at least the time between - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - check (await token.balanceOf(hostAccount)) - startBalanceHost >= - (duration - 5 * 60).u256 * pricePerSlotPerSecond * ecNodes.u256 + let tokenSubscription = await token.subscribe(Transfer, onTransfer) - # Checking that client node receives some funds back that were not used for the host nodes - check ((await token.balanceOf(clientAccount)) - clientBalanceBeforeFinished > 0) + await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) + await hostRewardEvent.wait().wait(timeout = chronos.seconds(60)) await tokenSubscription.unsubscribe() + await filledSubscription.unsubscribe() test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( From f3311cca30b5be3b10038847616a7d25b6968ae4 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 2 Jul 2025 15:26:09 +0200 Subject: [PATCH 04/12] Fail test immediately on first check failure --- tests/integration/1_minute/testecbug.nim | 4 +--- .../30_minutes/testmarketplace.nim | 16 ++++++++------- tests/integration/30_minutes/testproofs.nim | 4 ++-- .../integration/30_minutes/testslotrepair.nim | 16 +++++++-------- .../integration/30_minutes/testvalidator.nim | 2 +- tests/integration/5_minutes/testsales.nim | 9 ++++----- tests/integration/marketplacesuite.nim | 20 +++++++++++++++---- tests/integration/multinodes.nim | 13 +++++++----- 8 files changed, 49 insertions(+), 35 deletions(-) diff --git a/tests/integration/1_minute/testecbug.nim b/tests/integration/1_minute/testecbug.nim index a5bfa832e..4136adbc3 100644 --- a/tests/integration/1_minute/testecbug.nim +++ b/tests/integration/1_minute/testecbug.nim @@ -4,9 +4,7 @@ import ../marketplacesuite import ../nodeconfigs import ../hardhatconfig -marketplacesuite( - name = "Bug #821 - node crashes during erasure coding", stopOnRequestFail = true -): +marketplacesuite(name = "Bug #821 - node crashes during erasure coding"): test "should be able to create storage request and download dataset", NodeConfigs( clients: CodexConfigs diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 9569e3bb9..0639fed36 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -7,7 +7,7 @@ import ./../marketplacesuite import ../twonodes import ../nodeconfigs -marketplacesuite(name = "Marketplace", stopOnRequestFail = true): +marketplacesuite(name = "Marketplace"): let marketplaceConfig = NodeConfigs( clients: CodexConfigs.init(nodes = 1).some, providers: CodexConfigs.init(nodes = 1).some, @@ -259,7 +259,7 @@ marketplacesuite(name = "Marketplace", stopOnRequestFail = true): # Double check, verify that our second SP hosts the 3 slots check ((await provider1.client.getSlots()).get).len == 3 -marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): +marketplacesuite(name = "Marketplace payouts"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 8 @@ -363,17 +363,19 @@ marketplacesuite(name = "Marketplace payouts", stopOnRequestFail = true): let slotSize = slotSize(blocks, ecNodes, ecTolerance) let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize + let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) + check ( - let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) endBalanceProvider > startBalanceProvider and - endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond + endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond ) + + let endBalanceClient = (await token.balanceOf(client.ethAccount)) + check( ( - let endBalanceClient = (await token.balanceOf(client.ethAccount)) - let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) (startBalanceClient - endBalanceClient) == - (endBalanceProvider - startBalanceProvider) + (endBalanceProvider - startBalanceProvider) ) ) diff --git a/tests/integration/30_minutes/testproofs.nim b/tests/integration/30_minutes/testproofs.nim index b06e4d824..46829c967 100644 --- a/tests/integration/30_minutes/testproofs.nim +++ b/tests/integration/30_minutes/testproofs.nim @@ -13,7 +13,7 @@ export logutils logScope: topics = "integration test proofs" -marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false): +marketplacesuite(name = "Hosts submit regular proofs"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 8 @@ -76,7 +76,7 @@ marketplacesuite(name = "Hosts submit regular proofs", stopOnRequestFail = false await subscription.unsubscribe() -marketplacesuite(name = "Simulate invalid proofs", stopOnRequestFail = false): +marketplacesuite(name = "Simulate invalid proofs"): # TODO: these are very loose tests in that they are not testing EXACTLY how # proofs were marked as missed by the validator. These tests should be # tightened so that they are showing, as an integration test, that specific diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index 2586c1f5d..070843388 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -12,7 +12,7 @@ export logutils logScope: topics = "integration test slot repair" -marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): +marketplacesuite(name = "SP Slot Repair"): const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 const blocks = 3 @@ -153,9 +153,9 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): # We expect that the freed slot is added in the filled slot id list, # meaning that the slot was repaired locally by SP 1. - check eventually( - freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 - ) + # check eventually( + # freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 + # ) await filledSubscription.unsubscribe() await slotFreedsubscription.unsubscribe() @@ -232,8 +232,8 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): # We expect that the freed slot is added in the filled slot id list, # meaning that the slot was repaired locally and remotely (using SP 3) by SP 1. - check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) + # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) + # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) await filledSubscription.unsubscribe() await slotFreedsubscription.unsubscribe() @@ -303,8 +303,8 @@ marketplacesuite(name = "SP Slot Repair", stopOnRequestFail = true): await freeSlot(provider1.client) # At this point, SP 3 should repair the slot from SP 1 and host it. - check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) + # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) + # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) await filledSubscription.unsubscribe() await slotFreedsubscription.unsubscribe() diff --git a/tests/integration/30_minutes/testvalidator.nim b/tests/integration/30_minutes/testvalidator.nim index ed67b5d07..247733989 100644 --- a/tests/integration/30_minutes/testvalidator.nim +++ b/tests/integration/30_minutes/testvalidator.nim @@ -15,7 +15,7 @@ export logutils logScope: topics = "integration test validation" -marketplacesuite(name = "Validation", stopOnRequestFail = false): +marketplacesuite(name = "Validation"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 diff --git a/tests/integration/5_minutes/testsales.nim b/tests/integration/5_minutes/testsales.nim index 246d8fc7d..6de522022 100644 --- a/tests/integration/5_minutes/testsales.nim +++ b/tests/integration/5_minutes/testsales.nim @@ -17,7 +17,7 @@ proc findItem[T](items: seq[T], item: T): ?!T = return failure("Not found") -marketplacesuite(name = "Sales", stopOnRequestFail = true): +marketplacesuite(name = "Sales"): let salesConfig = NodeConfigs( clients: CodexConfigs.init(nodes = 1).some, providers: CodexConfigs.init(nodes = 1) @@ -227,7 +227,6 @@ marketplacesuite(name = "Sales", stopOnRequestFail = true): availabilityId = availability.id, until = until.some ) - check: - response.status == 422 - (await response.body) == - "Until parameter must be greater or equal to the longest currently hosted slot" + check response.status == 422 + check (await response.body) == + "Until parameter must be greater or equal to the longest currently hosted slot" diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 5a0a11a64..de59e866b 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -1,3 +1,7 @@ +import macros +import std/strutils +import std/unittest + import pkg/chronos import pkg/ethers/erc20 from pkg/libp2p import Cid @@ -12,7 +16,7 @@ import ../contracts/deployment export mp export multinodes -template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) = +template marketplacesuite*(name: string, body: untyped) = multinodesuite name: var marketplace {.inject, used.}: Marketplace var period: uint64 @@ -23,20 +27,27 @@ template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) var requestFailedEvent: AsyncEvent var requestFailedSubscription: Subscription + template fail(reason: string) = + raise newException(TestFailedError, reason) + + proc check(cond: bool, reason = "Check failed"): void = + if not cond: + fail(reason) + proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} = requestStartedEvent.fire() proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = requestFailedEvent.fire() - if stopOnRequestFail: - fail() proc getCurrentPeriod(): Future[Period] {.async.} = return periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) proc waitForRequestToStart( seconds = 10 * 60 + 10 - ): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} = + ): Future[Period] {. + async: (raises: [CancelledError, AsyncTimeoutError, TestFailedError]) + .} = await requestStartedEvent.wait().wait(timeout = chronos.seconds(seconds)) # Recreate a new future if we need to wait for another request requestStartedEvent = newAsyncEvent() @@ -53,6 +64,7 @@ template marketplacesuite*(name: string, stopOnRequestFail: bool, body: untyped) let currentTime = (await ethProvider.currentTime()).truncate(uint64) let currentPeriod = periodicity.periodOf(currentTime) let endOfPeriod = periodicity.periodEnd(currentPeriod) + await ethProvider.advanceTimeTo(endOfPeriod.u256 + 1) template eventuallyP(condition: untyped, finalPeriod: Period): bool = diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 42fff1576..51c5ccb41 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -36,6 +36,7 @@ type Hardhat MultiNodeSuiteError = object of CatchableError + TestFailedError* = object of CatchableError const jsonRpcProviderUrl* = "ws://localhost:8545" @@ -106,7 +107,7 @@ template multinodesuite*(name: string, body: untyped) = currentTestName = tname nodeConfigs = startNodeConfigs test tname: - tbody + failAndTeardownOnError("test failed", tbody) proc sanitize(pathSegment: string): string = var sanitized = pathSegment @@ -276,13 +277,15 @@ template multinodesuite*(name: string, body: untyped) = try: tryBody except CatchableError as er: - fatal message, error = er.msg - echo "[FATAL] ", message, ": ", er.msg + if er of TestFailedError: + info "[FAILED] ", reason = er.msg + else: + fatal message, error = er.msg + echo "[FATAL] ", message, ": ", er.msg await teardownImpl() when declared(teardownAllIMPL): teardownAllIMPL() - fail() - quit(1) + raise er proc updateBootstrapNodes( node: CodexProcess From 69b0bf3b8ad8c02fd5b3dd87e75b7632df3747c1 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 2 Jul 2025 16:43:08 +0200 Subject: [PATCH 05/12] Add a template to stop the test when a storage request fails --- .../30_minutes/testmarketplace.nim | 41 ++++++++++--------- tests/integration/marketplacesuite.nim | 30 ++++++++++++++ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 0639fed36..b4d551405 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -447,23 +447,24 @@ marketplacesuite(name = "Marketplace payouts"): discard await waitForRequestToStart() - # Here we will check that for each provider, the total remaining collateral - # will match the available slots. - # So if a SP hosts 1 slot, it should have enough total remaining collateral - # to host 2 more slots. - for provider in providers(): - let client = provider.client - check eventually( - block: - try: - let availabilities = (await client.getAvailabilities()).get - let availability = availabilities[0] - let slots = (await client.getSlots()).get - let availableSlots = (3 - slots.len).u256 - - availability.totalRemainingCollateral == - availableSlots * slotSize * minPricePerBytePerSecond - except HttpConnectionError: - return false, - timeout = 30 * 1000, - ) + stopOnRequestFailed: + # Here we will check that for each provider, the total remaining collateral + # will match the available slots. + # So if a SP hosts 1 slot, it should have enough total remaining collateral + # to host 2 more slots. + for provider in providers(): + let client = provider.client + check eventually( + block: + try: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 + + availability.totalRemainingCollateral == + availableSlots * slotSize * minPricePerBytePerSecond + except HttpConnectionError: + return false, + timeout = 30 * 1000, + ) diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index de59e866b..dbb77540f 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -34,6 +34,36 @@ template marketplacesuite*(name: string, body: untyped) = if not cond: fail(reason) + template stopOnRequestFailed(tbody: untyped) = + let completed = newAsyncEvent() + + let mainFut = ( + proc(): Future[void] {.async.} = + tbody + completed.fire() + )() + + let fastFailFut = ( + proc(): Future[void] {.async.} = + try: + await requestFailedEvent.wait().wait(timeout = chronos.seconds(60)) + completed.fire() + raise newException(TestFailedError, "storage request has failed") + except AsyncTimeoutError: + discard + )() + + await completed.wait().wait(timeout = chronos.seconds(60 * 30)) + + if not fastFailFut.completed: + await fastFailFut.cancelAndWait() + + if mainFut.failed: + raise mainFut.error + + if fastFailFut.failed: + raise fastFailFut.error + proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} = requestStartedEvent.fire() From ff0de4d3e0155857df9505159319d57aeca09aa2 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 2 Jul 2025 19:05:33 +0200 Subject: [PATCH 06/12] Verify that the provider is hosting the request by checking the slots from the smart contracts instead of looking into the reservations --- tests/integration/30_minutes/testmarketplace.nim | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index b4d551405..4084e952b 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -65,14 +65,24 @@ marketplacesuite(name = "Marketplace"): let purchase = (await client.getPurchase(id)).get check purchase.error == none string + + let state = await marketplace.requestState(purchase.requestId) + check state == RequestState.Started + let availabilities = (await host.getAvailabilities()).get check availabilities.len == 1 + let newSize = availabilities[0].freeSize check newSize > 0 and newSize < size - let reservations = (await host.getAvailabilityReservations(availability.id)).get - check reservations.len == 3 - check reservations[0].requestId == purchase.requestId + let signer = ethProvider.getSigner(hostAccount) + let marketplaceWithProviderSigner = marketplace.connect(signer) + let slots = await marketplaceWithProviderSigner.mySlots() + check slots.len == 3 + + for slotId in slots: + let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) + check slot.request.id == purchase.requestId test "node slots gets paid out and rest of tokens are returned to client", marketplaceConfig: From 8f415be2c36675fbb4e57bb0bbb19210678afcb5 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Thu, 3 Jul 2025 18:56:51 +0200 Subject: [PATCH 07/12] Check provider precise balance --- .../30_minutes/testmarketplace.nim | 125 +++++++++++------- 1 file changed, 76 insertions(+), 49 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 4084e952b..4250af489 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -1,5 +1,4 @@ import std/times -import std/httpclient import ../../examples import ../../contracts/time import ../../contracts/deployment @@ -72,8 +71,13 @@ marketplacesuite(name = "Marketplace"): let availabilities = (await host.getAvailabilities()).get check availabilities.len == 1 + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) let newSize = availabilities[0].freeSize - check newSize > 0 and newSize < size + check newSize > 0 and newSize + datasetSize == size + + let reservations = (await host.getAvailabilityReservations(availability.id)).get + check reservations.len == 3 + check reservations[0].requestId == purchase.requestId let signer = ethProvider.getSigner(hostAccount) let marketplaceWithProviderSigner = marketplace.connect(signer) @@ -124,19 +128,17 @@ marketplacesuite(name = "Marketplace"): let cid = (await client.upload(data)).get let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - var requestEnd = 0.SecondsSince1970 - var hostRewardEvent = newAsyncEvent() - - proc checkHostRewards() {.async.} = - var rewards = 0.u256 - - for filledAt in filledAtPerSlot: - rewards += (requestEnd.u256 - filledAt) * pricePerSlotPerSecond + var providerRewardEvent = newAsyncEvent() + proc checkProviderRewards() {.async.} = let endBalanceHost = await token.balanceOf(hostAccount) + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) if rewards + startBalanceHost == endBalanceHost: - hostRewardEvent.fire() + providerRewardEvent.fire() var clientFundsEvent = newAsyncEvent() @@ -160,7 +162,7 @@ marketplacesuite(name = "Marketplace"): let data = eventResult.get() if data.receiver == hostAccount: - asyncSpawn checkHostRewards() + asyncSpawn checkProviderRewards() if data.receiver == clientAccount: asyncSpawn checkHostFunds() @@ -179,20 +181,19 @@ marketplacesuite(name = "Marketplace"): discard await waitForRequestToStart() - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration.u256) - - requestEnd = await marketplace.requestEnd(requestId) + stopOnRequestFailed: + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration.u256) - let tokenSubscription = await token.subscribe(Transfer, onTransfer) + let tokenSubscription = await token.subscribe(Transfer, onTransfer) - await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) - await hostRewardEvent.wait().wait(timeout = chronos.seconds(60)) + await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) - await tokenSubscription.unsubscribe() - await filledSubscription.unsubscribe() + await tokenSubscription.unsubscribe() + await filledSubscription.unsubscribe() test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( @@ -267,7 +268,14 @@ marketplacesuite(name = "Marketplace"): discard await waitForRequestToStart() # Double check, verify that our second SP hosts the 3 slots - check ((await provider1.client.getSlots()).get).len == 3 + let signer = ethProvider.getSigner(hostAccount) + let marketplaceWithProviderSigner = marketplace.connect(signer) + let slots = await marketplaceWithProviderSigner.mySlots() + check slots.len == 3 + + for slotId in slots: + let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) + check slot.request.id == purchase.requestId marketplacesuite(name = "Marketplace payouts"): const minPricePerBytePerSecond = 1.u256 @@ -304,6 +312,8 @@ marketplacesuite(name = "Marketplace payouts"): let providerApi = provider.client let startBalanceProvider = await token.balanceOf(provider.ethAccount) let startBalanceClient = await token.balanceOf(client.ethAccount) + let hostAccount = providers()[0].ethAccount + let clientAccount = clients()[0].ethAccount # provider makes storage available let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) @@ -319,10 +329,20 @@ marketplacesuite(name = "Marketplace payouts"): let cid = (await clientApi.upload(data)).get - var slotIdxFilled = none uint64 + var filledAtPerSlot: seq[UInt256] = @[] + var slotIndex = 0.uint64 + var slotFilledEvent = newAsyncEvent() + + proc storeFilledAtTimestamps() {.async.} = + let filledAt = await ethProvider.blockTime(BlockTag.latest) + filledAtPerSlot.add(filledAt) + slotFilledEvent.fire() + proc onSlotFilled(eventResult: ?!SlotFilled) = assert not eventResult.isErr - slotIdxFilled = some (!eventResult).slotIndex + let event = !eventResult + slotIndex = event.slotIndex + asyncSpawn storeFilledAtTimestamps() let slotFilledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) @@ -344,18 +364,32 @@ marketplacesuite(name = "Marketplace payouts"): nodes = ecNodes, tolerance = ecTolerance, ) + let requestId = (await clientApi.requestId(id)).get # wait until one slot is filled - check eventually(slotIdxFilled.isSome, timeout = expiry.int * 1000) - let slotId = slotId(!(await clientApi.requestId(id)), !slotIdxFilled) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) + let slotId = slotId(!(await clientApi.requestId(id)), slotIndex) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize + var providerRewardEvent = newAsyncEvent() + + proc checkProviderRewards() {.async.} = + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + let endBalanceHost = await token.balanceOf(hostAccount) + + if rewards + startBalanceProvider == endBalanceHost: + providerRewardEvent.fire() - var counter = 0 - var transferEvent = newAsyncEvent() proc onTransfer(eventResult: ?!Transfer) = assert not eventResult.isErr - counter += 1 - if counter == 3: - transferEvent.fire() + + let data = eventResult.get() + if data.receiver == hostAccount: + asyncSpawn checkProviderRewards() let tokenAddress = await marketplace.token() let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) @@ -363,30 +397,23 @@ marketplacesuite(name = "Marketplace payouts"): # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) - await requestCancelledEvent.wait().wait(timeout = chronos.seconds(5)) - await advanceToNextPeriod() - await transferEvent.wait().wait(timeout = chronos.seconds(60)) + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize + # Ensure that total rewards stay within the payout limit + # determined by the expiry date. + let requestEnd = await marketplace.requestEnd(requestId) + let rewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) + check expiry.u256 * pricePerSlotPerSecond >= rewards let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) - - check ( - endBalanceProvider > startBalanceProvider and - endBalanceProvider < startBalanceProvider + expiry.u256 * pricePerSlotPerSecond - ) - let endBalanceClient = (await token.balanceOf(client.ethAccount)) - check( - ( - (startBalanceClient - endBalanceClient) == - (endBalanceProvider - startBalanceProvider) - ) + startBalanceClient - endBalanceClient == endBalanceProvider - startBalanceProvider ) await slotFilledSubscription.unsubscribe() From 5f8a6a9817a0409bc31917b11a0bdf3dde1d4086 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 7 Jul 2025 10:39:27 +0200 Subject: [PATCH 08/12] Refactor and adjust test timeout durations --- tests/integration/1_minute/testecbug.nim | 33 +- .../30_minutes/testmarketplace.nim | 368 +++++++----------- tests/integration/30_minutes/testproofs.nim | 131 +++---- .../integration/30_minutes/testslotrepair.nim | 123 +++--- .../integration/30_minutes/testvalidator.nim | 49 +-- tests/integration/5_minutes/testsales.nim | 42 +- tests/integration/marketplacesuite.nim | 160 +++++--- tests/integration/multinodes.nim | 34 +- 8 files changed, 402 insertions(+), 538 deletions(-) diff --git a/tests/integration/1_minute/testecbug.nim b/tests/integration/1_minute/testecbug.nim index 4136adbc3..d1980c5a5 100644 --- a/tests/integration/1_minute/testecbug.nim +++ b/tests/integration/1_minute/testecbug.nim @@ -16,42 +16,29 @@ marketplacesuite(name = "Bug #821 - node crashes during erasure coding"): providers: CodexConfigs.init(nodes = 0).some, ): let - pricePerBytePerSecond = 1.u256 duration = 20.periods - collateralPerByte = 1.u256 expiry = 10.periods - data = await RandomChunker.example(blocks = 8) client = clients()[0] clientApi = client.client + data = await RandomChunker.example(blocks = 8) + + let (purchaseId, requestId) = await requestStorage( + clientApi, duration = duration, expiry = expiry, data = data.some + ) - let cid = (await clientApi.upload(data)).get + let storageRequestedEvent = newAsyncEvent() - var requestId = none RequestId proc onStorageRequested(eventResult: ?!StorageRequested) = assert not eventResult.isErr - requestId = some (!eventResult).requestId + storageRequestedEvent.fire() - let subscription = await marketplace.subscribe(StorageRequested, onStorageRequested) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = pricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, - ) - - check eventually(requestId.isSome, timeout = expiry.int * 1000) + await marketplaceSubscribe(StorageRequested, onStorageRequested) + await storageRequestedEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) let - request = await marketplace.getRequest(requestId.get) + request = await marketplace.getRequest(requestId) cidFromRequest = request.content.cid downloaded = await clientApi.downloadBytes(cidFromRequest, local = true) check downloaded.isOk check downloaded.get.toHex == data.toHex - - await subscription.unsubscribe() diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 4250af489..235abcddf 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -2,9 +2,10 @@ import std/times import ../../examples import ../../contracts/time import ../../contracts/deployment -import ./../marketplacesuite -import ../twonodes +import ./../marketplacesuite except Subscription +import ../twonodes except Subscription import ../nodeconfigs +from pkg/ethers import Subscription marketplacesuite(name = "Marketplace"): let marketplaceConfig = NodeConfigs( @@ -22,6 +23,11 @@ marketplacesuite(name = "Marketplace"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const size = 0xFFFFFF.uint64 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes setup: host = providers()[0].client @@ -35,45 +41,33 @@ marketplacesuite(name = "Marketplace"): await ethProvider.advanceTime(1.u256) test "nodes negotiate contracts on the marketplace", marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) # host makes storage available let availability = ( await host.postAvailability( totalSize = size, - duration = 20 * 60.uint64, + duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = size.u256 * minPricePerBytePerSecond, ) ).get # client requests storage - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = await requestStorage(client) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string - let state = await marketplace.requestState(purchase.requestId) + let state = await marketplace.requestState(requestId) check state == RequestState.Started let availabilities = (await host.getAvailabilities()).get check availabilities.len == 1 - let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) let newSize = availabilities[0].freeSize - check newSize > 0 and newSize + datasetSize == size + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) + check newSize > 0 and newSize.u256 + datasetSize == size.u256 let reservations = (await host.getAvailabilityReservations(availability.id)).get check reservations.len == 3 @@ -90,46 +84,25 @@ marketplacesuite(name = "Marketplace"): test "node slots gets paid out and rest of tokens are returned to client", marketplaceConfig: - let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = blocks) - let marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let duration = 20 * 60.uint64 + var providerRewardEvent = newAsyncEvent() + var clientFundsEvent = newAsyncEvent() + var transferEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId # host makes storage available let startBalanceHost = await token.balanceOf(hostAccount) let startBalanceClient = await token.balanceOf(clientAccount) - discard ( - await host.postAvailability( - totalSize = size, - duration = 20 * 60.uint64, - minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = size.u256 * minPricePerBytePerSecond, - ) - ).get - - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - - var filledAtPerSlot: seq[UInt256] = @[] proc storeFilledAtTimestamps() {.async.} = let filledAt = await ethProvider.blockTime(BlockTag.latest) filledAtPerSlot.add(filledAt) - proc onSlotFilled(eventResult: ?!SlotFilled) = + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = assert not eventResult.isErr let event = !eventResult asyncSpawn storeFilledAtTimestamps() - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - - # client requests storage - let cid = (await client.upload(data)).get - - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - var providerRewardEvent = newAsyncEvent() - proc checkProviderRewards() {.async.} = let endBalanceHost = await token.balanceOf(hostAccount) let requestEnd = await marketplace.requestEnd(requestId) @@ -140,23 +113,19 @@ marketplacesuite(name = "Marketplace"): if rewards + startBalanceHost == endBalanceHost: providerRewardEvent.fire() - var clientFundsEvent = newAsyncEvent() - - proc checkHostFunds() {.async.} = - var hostRewards = 0.u256 - - for filledAt in filledAtPerSlot: - hostRewards += (requestEnd.u256 - filledAt) * pricePerSlotPerSecond + proc checkClientFunds() {.async.} = + let requestEnd = await marketplace.requestEnd(requestId) + let hostRewards = filledAtPerSlot + .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) + .foldl(a + b, 0.u256) - let requestPrice = minPricePerBytePerSecond * slotSize * duration.u256 * 3 + let requestPrice = pricePerSlotPerSecond * duration.u256 * 3 let fundsBackToClient = requestPrice - hostRewards let endBalanceClient = await token.balanceOf(clientAccount) if startBalanceClient + fundsBackToClient - requestPrice == endBalanceClient: clientFundsEvent.fire() - var transferEvent = newAsyncEvent() - proc onTransfer(eventResult: ?!Transfer) = assert not eventResult.isErr @@ -164,36 +133,39 @@ marketplacesuite(name = "Marketplace"): if data.receiver == hostAccount: asyncSpawn checkProviderRewards() if data.receiver == clientAccount: - asyncSpawn checkHostFunds() + asyncSpawn checkClientFunds() - let id = await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) + discard ( + await host.postAvailability( + totalSize = size, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size.u256 * minPricePerBytePerSecond, + ) + ).get - let requestId = (await client.requestId(id)).get + # client requests storage + let (_, id) = await requestStorage(client) + requestId = id - discard await waitForRequestToStart() + # Subscribe SlotFilled event to receive the filledAt timestamp + # and calculate the provider reward + await marketplaceSubscribe(SlotFilled, onSlotFilled) - stopOnRequestFailed: - # Proving mechanism uses blockchain clock to do proving/collect/cleanup round - # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks - # only with new transaction - await ethProvider.advanceTime(duration.u256) + await waitForRequestToStart(requestId, expiry.int64) - let tokenSubscription = await token.subscribe(Transfer, onTransfer) + # Proving mechanism uses blockchain clock to do proving/collect/cleanup round + # hence we must use `advanceTime` over `sleepAsync` as Hardhat does mine new blocks + # only with new transaction + await ethProvider.advanceTime(duration.u256) - await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) - await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) + await tokenSubscribe(onTransfer) - await tokenSubscription.unsubscribe() - await filledSubscription.unsubscribe() + # Wait for the exact expected balances. + # The timeout is 60 seconds because the event should occur quickly, + # thanks to `advanceTime` moving to the end of the request duration. + await clientFundsEvent.wait().wait(timeout = chronos.seconds(60)) + await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) test "SP are able to process slots after workers were busy with other slots and ignored them", NodeConfigs( @@ -206,76 +178,52 @@ marketplacesuite(name = "Marketplace"): # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") .some, ): - let client0 = clients()[0] - let provider0 = providers()[0] - let provider1 = providers()[1] - let duration = 20 * 60.uint64 - - let data = await RandomChunker.example(blocks = blocks) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + var requestId: RequestId # We create an avavilability allowing the first SP to host the 3 slots. # So the second SP will not have any availability so it will just process # the slots and ignore them. - discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + discard await host.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, - ) - - let cid = (await client0.client.upload(data)).get - - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let requestId = (await client0.client.requestId(purchaseId)).get + let (_, id) = await requestStorage(client) + requestId = id # We wait that the 3 slots are filled by the first SP - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Here we create the same availability as previously but for the second SP. # Meaning that, after ignoring all the slots for the first request, the second SP will process # and host the slots for the second request. - discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + let host1 = providers()[1].client + + discard await host1.postAvailability( + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * collateralPerByte, + totalCollateral = 3 * slotBytes * collateralPerByte, ) - let purchaseId2 = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - let requestId2 = (await client0.client.requestId(purchaseId2)).get + let (_, id2) = await requestStorage(client) + requestId = id2 # Wait that the slots of the second request are filled - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) # Double check, verify that our second SP hosts the 3 slots - let signer = ethProvider.getSigner(hostAccount) + let host1Account = providers()[1].ethAccount + let signer = ethProvider.getSigner(host1Account) let marketplaceWithProviderSigner = marketplace.connect(signer) let slots = await marketplaceWithProviderSigner.mySlots() check slots.len == 3 for slotId in slots: let slot = await marketplaceWithProviderSigner.getActiveSlot(slotId) - check slot.request.id == purchase.requestId + check slot.request.id == requestId marketplacesuite(name = "Marketplace payouts"): const minPricePerBytePerSecond = 1.u256 @@ -283,6 +231,10 @@ marketplacesuite(name = "Marketplace payouts"): const blocks = 8 const ecNodes = 3 const ecTolerance = 1 + const slotBytes = slotSize(blocks, ecNodes, ecTolerance) + const duration = 20 * 60.uint64 + const expiry = 10 * 60.uint64 + const pricePerSlotPerSecond = minPricePerBytePerSecond * slotBytes test "expired request partially pays out for stored time", NodeConfigs( @@ -303,85 +255,46 @@ marketplacesuite(name = "Marketplace payouts"): # ) .some, ): - let duration = 20.periods - let expiry = 10.periods - let data = await RandomChunker.example(blocks = blocks) let client = clients()[0] let provider = providers()[0] let clientApi = client.client let providerApi = provider.client - let startBalanceProvider = await token.balanceOf(provider.ethAccount) - let startBalanceClient = await token.balanceOf(client.ethAccount) let hostAccount = providers()[0].ethAccount let clientAccount = clients()[0].ethAccount - # provider makes storage available - let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) - let totalAvailabilitySize = (datasetSize div 2).truncate(uint64) - discard await providerApi.postAvailability( - # make availability size small enough that we can't fill all the slots, - # thus causing a cancellation - totalSize = totalAvailabilitySize, - duration = duration.uint64, - minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = collateralPerByte * totalAvailabilitySize.u256, - ) - - let cid = (await clientApi.upload(data)).get - - var filledAtPerSlot: seq[UInt256] = @[] var slotIndex = 0.uint64 var slotFilledEvent = newAsyncEvent() + var requestCancelledEvent = newAsyncEvent() + var providerRewardEvent = newAsyncEvent() + var filledAtPerSlot: seq[UInt256] = @[] + var requestId: RequestId + + let startBalanceClient = await token.balanceOf(client.ethAccount) + let startBalanceProvider = await token.balanceOf(hostAccount) proc storeFilledAtTimestamps() {.async.} = let filledAt = await ethProvider.blockTime(BlockTag.latest) filledAtPerSlot.add(filledAt) - slotFilledEvent.fire() - proc onSlotFilled(eventResult: ?!SlotFilled) = + proc onSlotFilled(eventResult: ?!SlotFilled) {.raises: [].} = assert not eventResult.isErr let event = !eventResult slotIndex = event.slotIndex asyncSpawn storeFilledAtTimestamps() + slotFilledEvent.fire() - let slotFilledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - - var requestCancelledEvent = newAsyncEvent() proc onRequestCancelled(eventResult: ?!RequestCancelled) = assert not eventResult.isErr requestCancelledEvent.fire() - let requestCancelledSubscription = - await marketplace.subscribe(RequestCancelled, onRequestCancelled) - - # client requests storage but requires multiple slots to host the content - let id = await clientApi.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - expiry = expiry, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - let requestId = (await clientApi.requestId(id)).get - - # wait until one slot is filled - await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) - let slotId = slotId(!(await clientApi.requestId(id)), slotIndex) - let slotSize = slotSize(blocks, ecNodes, ecTolerance) - - let pricePerSlotPerSecond = minPricePerBytePerSecond * slotSize - var providerRewardEvent = newAsyncEvent() - proc checkProviderRewards() {.async.} = + let endBalanceProvider = await token.balanceOf(hostAccount) let requestEnd = await marketplace.requestEnd(requestId) let rewards = filledAtPerSlot .mapIt((requestEnd.u256 - it) * pricePerSlotPerSecond) .foldl(a + b, 0.u256) - let endBalanceHost = await token.balanceOf(hostAccount) - if rewards + startBalanceProvider == endBalanceHost: + if rewards + startBalanceProvider == endBalanceProvider: providerRewardEvent.fire() proc onTransfer(eventResult: ?!Transfer) = @@ -391,15 +304,37 @@ marketplacesuite(name = "Marketplace payouts"): if data.receiver == hostAccount: asyncSpawn checkProviderRewards() - let tokenAddress = await marketplace.token() - let token = Erc20Token.new(tokenAddress, ethProvider.getSigner()) - let tokenSubscription = await token.subscribe(Transfer, onTransfer) + # provider makes storage available + let datasetSize = datasetSize(blocks, ecNodes, ecTolerance) + let totalAvailabilitySize = (datasetSize div 2).truncate(uint64) + + discard await providerApi.postAvailability( + # make availability size small enough that we can't fill all the slots, + # thus causing a cancellation + totalSize = totalAvailabilitySize, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = collateralPerByte * totalAvailabilitySize.u256, + ) + + let (_, id) = await requestStorage(clientApi) + requestId = id + + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(RequestCancelled, onRequestCancelled) + + # wait until one slot is filled + await slotFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int)) + let slotId = slotId(requestId, slotIndex) + + await tokenSubscribe(onTransfer) # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) await requestCancelledEvent.wait().wait(timeout = chronos.seconds(5)) await advanceToNextPeriod() + # Wait for the expected balance for the provider await providerRewardEvent.wait().wait(timeout = chronos.seconds(60)) # Ensure that total rewards stay within the payout limit @@ -412,14 +347,11 @@ marketplacesuite(name = "Marketplace payouts"): let endBalanceProvider = (await token.balanceOf(provider.ethAccount)) let endBalanceClient = (await token.balanceOf(client.ethAccount)) + check( startBalanceClient - endBalanceClient == endBalanceProvider - startBalanceProvider ) - await slotFilledSubscription.unsubscribe() - await requestCancelledSubscription.unsubscribe() - await tokenSubscription.unsubscribe() - test "the collateral is returned after a sale is ignored", NodeConfigs( hardhat: HardhatConfig.none, @@ -434,13 +366,10 @@ marketplacesuite(name = "Marketplace payouts"): # ) .some, ): - let data = await RandomChunker.example(blocks = blocks) let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] let provider2 = providers()[2] - let duration = 20 * 60.uint64 - let slotSize = slotSize(blocks, ecNodes, ecTolerance) # Here we create 3 SP which can host 3 slot. # While they will process the slot, each SP will @@ -449,59 +378,44 @@ marketplacesuite(name = "Marketplace payouts"): # will be ignored. In that case, the collateral assigned for # the reservation should return to the availability. discard await provider0.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider1.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) discard await provider2.client.postAvailability( - totalSize = 3 * slotSize.truncate(uint64), + totalSize = 3 * slotBytes.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + totalCollateral = 3 * slotBytes * minPricePerBytePerSecond, ) - let cid = (await client0.client.upload(data)).get - - let purchaseId = await client0.client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - - let requestId = (await client0.client.requestId(purchaseId)).get - - discard await waitForRequestToStart() - - stopOnRequestFailed: - # Here we will check that for each provider, the total remaining collateral - # will match the available slots. - # So if a SP hosts 1 slot, it should have enough total remaining collateral - # to host 2 more slots. - for provider in providers(): - let client = provider.client - check eventually( - block: - try: - let availabilities = (await client.getAvailabilities()).get - let availability = availabilities[0] - let slots = (await client.getSlots()).get - let availableSlots = (3 - slots.len).u256 - - availability.totalRemainingCollateral == - availableSlots * slotSize * minPricePerBytePerSecond - except HttpConnectionError: - return false, - timeout = 30 * 1000, - ) + let (_, requestId) = await requestStorage(client0.client) + await waitForRequestToStart(requestId, expiry.int64) + + # Here we will check that for each provider, the total remaining collateral + # will match the available slots. + # So if a SP hosts 1 slot, it should have enough total remaining collateral + # to host 2 more slots. + for provider in providers(): + let client = provider.client + check eventually( + block: + try: + let availabilities = (await client.getAvailabilities()).get + let availability = availabilities[0] + let slots = (await client.getSlots()).get + let availableSlots = (3 - slots.len).u256 + + availability.totalRemainingCollateral == + availableSlots * slotBytes * minPricePerBytePerSecond + except HttpConnectionError: + return false, + timeout = 30 * 1000, + ) diff --git a/tests/integration/30_minutes/testproofs.nim b/tests/integration/30_minutes/testproofs.nim index 46829c967..d77727746 100644 --- a/tests/integration/30_minutes/testproofs.nim +++ b/tests/integration/30_minutes/testproofs.nim @@ -38,8 +38,12 @@ marketplacesuite(name = "Hosts submit regular proofs"): let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 5.periods + let proofSubmittedEvent = newAsyncEvent() + + proc onProofSubmitted(event: ?!ProofSubmitted) = + if event.isOk: + proofSubmittedEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -49,32 +53,20 @@ marketplacesuite(name = "Hosts submit regular proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - ) + let (purchaseId, requestId) = + await requestStorage(client0, duration = duration, expiry = expiry) let purchase = (await client0.getPurchase(purchaseId)).get check purchase.error == none string - let slotSize = slotSize(blocks, ecNodes, ecTolerance) + await marketplaceSubscribe(ProofSubmitted, onProofSubmitted) - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) - var proofWasSubmitted = false - proc onProofSubmitted(event: ?!ProofSubmitted) = - proofWasSubmitted = event.isOk - - let subscription = await marketplace.subscribe(ProofSubmitted, onProofSubmitted) - - check eventually(proofWasSubmitted, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await proofSubmittedEvent.wait().wait( + timeout = chronos.seconds(secondsTillRequestEnd) + ) marketplacesuite(name = "Simulate invalid proofs"): # TODO: these are very loose tests in that they are not testing EXACTLY how @@ -88,6 +80,17 @@ marketplacesuite(name = "Simulate invalid proofs"): const ecNodes = 3 const ecTolerance = 1 + var slotWasFreedEvent: AsyncEvent + var requestId: RequestId + + proc onSlotFreed(event: ?!SlotFreed) = + if event.isOk and event.value.requestId == requestId: + slotWasFreedEvent.fire() + + setup: + requestId = RequestId.default() + slotWasFreedEvent = newAsyncEvent() + test "slot is freed after too many invalid proofs submitted", NodeConfigs( # Uncomment to start Hardhat automatically, typically so logs can be inspected locally @@ -120,7 +123,6 @@ marketplacesuite(name = "Simulate invalid proofs"): let expiry = 10.periods let duration = expiry + 10.periods - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -130,32 +132,14 @@ marketplacesuite(name = "Simulate invalid proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - - let purchaseId = ( - await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) + let (purchaseId, id) = await requestStorage( + client0, duration = duration, proofProbability = 1.u256, expiry = expiry ) - let requestId = (await client0.requestId(purchaseId)).get + requestId = id - discard await waitForRequestToStart(expiry.int) + await marketplaceSubscribe(SlotFreed, onSlotFreed) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let subscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - check eventually(slotWasFreed, timeout = (duration - expiry).int * 1000) - - await subscription.unsubscribe() + await slotWasFreedEvent.wait().wait(timeout = chronos.seconds(duration.int64)) test "slot is not freed when not enough invalid proofs submitted", NodeConfigs( @@ -182,8 +166,15 @@ marketplacesuite(name = "Simulate invalid proofs"): let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods + let slotWasFilledEvent = newAsyncEvent() + + proc onSlotFilled(eventResult: ?!SlotFilled) = + assert not eventResult.isErr + let event = !eventResult + + if event.requestId == requestId: + slotWasFilledEvent.fire() - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -193,44 +184,24 @@ marketplacesuite(name = "Simulate invalid proofs"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get + let (purchaseId, id) = + await requestStorage(client0, duration = duration, expiry = expiry) + requestId = id - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = 1.u256, - ) - let requestId = (await client0.requestId(purchaseId)).get - - var slotWasFilled = false - proc onSlotFilled(eventResult: ?!SlotFilled) = - assert not eventResult.isErr - let event = !eventResult - - if event.requestId == requestId: - slotWasFilled = true - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) # wait for the first slot to be filled - check eventually(slotWasFilled, timeout = expiry.int * 1000) + await slotWasFilledEvent.wait().wait(timeout = chronos.seconds(expiry.int64)) - var slotWasFreed = false - proc onSlotFreed(event: ?!SlotFreed) = - if event.isOk and event.value.requestId == requestId: - slotWasFreed = true - - let freedSubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) - - # In 2 periods you cannot have enough invalid proofs submitted: - await sleepAsync(2.periods.int.seconds) - check not slotWasFreed - - await filledSubscription.unsubscribe() - await freedSubscription.unsubscribe() + try: + # In 2 periods you cannot have enough invalid proofs submitted: + await slotWasFreedEvent.wait().wait( + timeout = chronos.seconds(2.periods.int.seconds) + ) + fail("invalid proofs were not expected in 2 periods") + except AsyncTimeoutError: + discard # TODO: uncomment once fixed # WARNING: in the meantime minPrice has changed to minPricePerBytePerSecond diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index 070843388..32a95959f 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -20,18 +20,17 @@ marketplacesuite(name = "SP Slot Repair"): const ecTolerance = 1 const size = slotSize(blocks, ecNodes, ecTolerance) - var filledSlotIds: seq[SlotId] = @[] - var freedSlotId = none SlotId + var freedSlotIndex = none uint64 var requestId: RequestId + var slotFilledEvent: AsyncEvent # Here we are keeping track of the slot filled using their ids. proc onSlotFilled(eventResult: ?!SlotFilled) = assert not eventResult.isErr let event = !eventResult - if event.requestId == requestId: - let slotId = slotId(event.requestId, event.slotIndex) - filledSlotIds.add slotId + if event.requestId == requestId and event.slotIndex == freedSlotIndex.get: + slotFilledEvent.fire() # Here we are retrieving the slot id freed. # When the event is triggered, the slot id is removed @@ -42,44 +41,23 @@ marketplacesuite(name = "SP Slot Repair"): let slotId = slotId(event.requestId, event.slotIndex) if event.requestId == requestId: - assert slotId in filledSlotIds - filledSlotIds.del(filledSlotIds.find(slotId)) - freedSlotId = some(slotId) - - proc createPurchase(client: CodexClient): Future[PurchaseId] {.async.} = - let data = await RandomChunker.example(blocks = blocks) - let cid = (await client.upload(data)).get - - let purchaseId = await client.requestStorage( - cid, - expiry = 10.periods, - duration = 20.periods, - nodes = ecNodes, - tolerance = ecTolerance, - collateralPerByte = 1.u256, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 1.u256, - ) - requestId = (await client.requestId(purchaseId)).get + freedSlotIndex = some event.slotIndex - return purchaseId - - proc freeSlot(provider: CodexClient): Future[void] {.async.} = + proc freeSlot(provider: CodexProcess): Future[void] {.async.} = # Get the second provider signer. - let signer = ethProvider.getSigner(accounts[2]) + let signer = ethProvider.getSigner(provider.ethAccount) let marketplaceWithSecondProviderSigner = marketplace.connect(signer) # Call freeSlot to speed up the process. # It accelerates the test by skipping validator # proof verification and not waiting for the full period. # The downside is that this doesn't reflect the real slot freed process. - let slots = (await provider.getSlots()).get() + let slots = (await provider.client.getSlots()).get() let slotId = slotId(requestId, slots[0].slotIndex) discard await marketplaceWithSecondProviderSigner.freeSlot(slotId) setup: - filledSlotIds = @[] - freedSlotId = none SlotId + slotFilledEvent = newAsyncEvent() test "repair from local store", NodeConfigs( @@ -124,13 +102,17 @@ marketplacesuite(name = "SP Slot Repair"): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -148,17 +130,16 @@ marketplacesuite(name = "SP Slot Repair"): totalSize = (3 * size.truncate(uint64)).uint64.some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally by SP 1. - # check eventually( - # freedSlotId.get in filledSlotIds, timeout = (duration - expiry).int * 1000 - # ) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from local and remote store", NodeConfigs( @@ -204,13 +185,17 @@ marketplacesuite(name = "SP Slot Repair"): totalCollateral = size * collateralPerByte, ) - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -227,16 +212,16 @@ marketplacesuite(name = "SP Slot Repair"): totalCollateral = (2 * size * collateralPerByte).some, ) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) - # We expect that the freed slot is added in the filled slot id list, + # We expect that the freed slot is filled again, # meaning that the slot was repaired locally and remotely (using SP 3) by SP 1. - # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) test "repair from remote store only", NodeConfigs( @@ -275,13 +260,17 @@ marketplacesuite(name = "SP Slot Repair"): ) ).get - let purchaseId = await createPurchase(client0.client) - - let filledSubscription = await marketplace.subscribe(SlotFilled, onSlotFilled) - let slotFreedsubscription = await marketplace.subscribe(SlotFreed, onSlotFreed) + let (purchaseId, id) = await requestStorage( + client0.client, + blocks = blocks, + expiry = expiry, + duration = duration, + proofProbability = 1.u256, + ) + requestId = id # Wait for purchase starts, meaning that the slots are filled. - discard await waitForRequestToStart(expiry.int) + await waitForRequestToStart(requestId, expiry.int64) # stop client so it doesn't serve any blocks anymore await client0.stop() @@ -299,12 +288,12 @@ marketplacesuite(name = "SP Slot Repair"): # SP 2 will not pick the slot again. await provider1.client.patchAvailability(availability1.id, enabled = false.some) + await marketplaceSubscribe(SlotFilled, onSlotFilled) + await marketplaceSubscribe(SlotFreed, onSlotFreed) + # Let's free the slot to speed up the process - await freeSlot(provider1.client) + await freeSlot(provider1) # At this point, SP 3 should repair the slot from SP 1 and host it. - # check eventually(freedSlotId.isSome, timeout = expiry.int * 1000) - # check eventually(freedSlotId.get in filledSlotIds, timeout = expiry.int * 1000) - - await filledSubscription.unsubscribe() - await slotFreedsubscription.unsubscribe() + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + await slotFilledEvent.wait().wait(timeout = chronos.seconds(secondsTillRequestEnd)) diff --git a/tests/integration/30_minutes/testvalidator.nim b/tests/integration/30_minutes/testvalidator.nim index 247733989..b60f048bd 100644 --- a/tests/integration/30_minutes/testvalidator.nim +++ b/tests/integration/30_minutes/testvalidator.nim @@ -20,7 +20,6 @@ marketplacesuite(name = "Validation"): const ecNodes = 3 const ecTolerance = 1 const proofProbability = 1.u256 - const collateralPerByte = 1.u256 const minPricePerBytePerSecond = 1.u256 @@ -60,9 +59,6 @@ marketplacesuite(name = "Validation"): # let mine a block to sync the blocktime with the current clock discard await ethProvider.send("evm_mine") - var currentTime = await ethProvider.currentTime() - let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) @@ -73,28 +69,19 @@ marketplacesuite(name = "Validation"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) test "validator uses historical state to mark missing proofs", NodeConfigs( @@ -124,7 +111,6 @@ marketplacesuite(name = "Validation"): var currentTime = await ethProvider.currentTime() let requestEndTime = currentTime.truncate(uint64) + duration - let data = await RandomChunker.example(blocks = blocks) let datasetSize = datasetSize(blocks = blocks, nodes = ecNodes, tolerance = ecTolerance) await createAvailabilities( @@ -134,20 +120,13 @@ marketplacesuite(name = "Validation"): minPricePerBytePerSecond, ) - let cid = (await client0.upload(data)).get - let purchaseId = await client0.requestStorage( - cid, - expiry = expiry, - duration = duration, - nodes = ecNodes, - tolerance = ecTolerance, - proofProbability = proofProbability, + let (purchaseId, requestId) = await requestStorage( + client0, duration = duration, expiry = expiry, proofProbability = proofProbability ) - let requestId = (await client0.requestId(purchaseId)).get debug "validation suite", purchaseId = purchaseId.toHex, requestId = requestId - discard await waitForRequestToStart(expiry.int + 60) + await waitForRequestToStart(requestId, expiry.int64) # extra block just to make sure we have one that separates us # from the block containing the last (past) SlotFilled event @@ -168,10 +147,6 @@ marketplacesuite(name = "Validation"): let node = await startValidatorNode(config) running.add RunningNode(role: Role.Validator, node: node) - discard await ethProvider.send("evm_mine") - currentTime = await ethProvider.currentTime() - let secondsTillRequestEnd = (requestEndTime - currentTime.truncate(uint64)).int - - debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd.seconds - - discard await waitForRequestToFail(secondsTillRequestEnd + 60) + let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) + debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) diff --git a/tests/integration/5_minutes/testsales.nim b/tests/integration/5_minutes/testsales.nim index 6de522022..d8d5dcca6 100644 --- a/tests/integration/5_minutes/testsales.nim +++ b/tests/integration/5_minutes/testsales.nim @@ -114,10 +114,11 @@ marketplacesuite(name = "Sales"): test "updating availability - updating totalSize does not allow bellow utilized", salesConfig: let originalSize = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let minPricePerBytePerSecond = 3.u256 let collateralPerByte = 1.u256 let totalCollateral = originalSize.u256 * collateralPerByte + let expiry = 10 * 60.uint64 + let availability = ( await host.postAvailability( totalSize = originalSize, @@ -128,19 +129,11 @@ marketplacesuite(name = "Sales"): ).get # Lets create storage request that will utilize some of the availability's space - let cid = (await client.upload(data)).get - let id = await client.requestStorage( - cid, - duration = 20 * 60.uint64, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = (10 * 60).uint64, - collateralPerByte = collateralPerByte, - nodes = 3, - tolerance = 1, + let (purchaseId, requestId) = await requestStorage( + client = client, pricePerBytePerSecond = minPricePerBytePerSecond ) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) let updatedAvailability = ((await host.getAvailabilities()).get).findItem(availability).get @@ -183,12 +176,9 @@ marketplacesuite(name = "Sales"): test "returns an error when trying to update the until date before an existing a request is finished", salesConfig: let size = 0xFFFFFF.uint64 - let data = await RandomChunker.example(blocks = 8) let duration = 20 * 60.uint64 + let expiry = 10 * 60.uint64 let minPricePerBytePerSecond = 3.u256 - let collateralPerByte = 1.u256 - let ecNodes = 3.uint - let ecTolerance = 1.uint # host makes storage available let availability = ( @@ -200,24 +190,12 @@ marketplacesuite(name = "Sales"): ) ).get - # client requests storage - let cid = (await client.upload(data)).get - let id = ( - await client.requestStorage( - cid, - duration = duration, - pricePerBytePerSecond = minPricePerBytePerSecond, - proofProbability = 3.u256, - expiry = 10 * 60.uint64, - collateralPerByte = collateralPerByte, - nodes = ecNodes, - tolerance = ecTolerance, - ) - ).get + let (purchaseId, requestId) = + await requestStorage(client, pricePerBytePerSecond = minPricePerBytePerSecond) - discard await waitForRequestToStart() + await waitForRequestToStart(requestId, expiry.int64) - let purchase = (await client.getPurchase(id)).get + let purchase = (await client.getPurchase(purchaseId)).get check purchase.error == none string let unixNow = getTime().toUnix() diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index dbb77540f..e3b010a09 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -1,5 +1,4 @@ import macros -import std/strutils import std/unittest import pkg/chronos @@ -9,7 +8,7 @@ import pkg/codex/contracts/marketplace as mp import pkg/codex/periods import pkg/codex/utils/json from pkg/codex/utils import roundUp, divUp -import ./multinodes except Subscription +import ./multinodes except Subscription, Event import ../contracts/time import ../contracts/deployment @@ -22,72 +21,83 @@ template marketplacesuite*(name: string, body: untyped) = var period: uint64 var periodicity: Periodicity var token {.inject, used.}: Erc20Token - var requestStartedEvent: AsyncEvent - var requestStartedSubscription: Subscription - var requestFailedEvent: AsyncEvent - var requestFailedSubscription: Subscription - - template fail(reason: string) = - raise newException(TestFailedError, reason) + var subscriptions: seq[Subscription] = @[] + var tokenSubscription: Subscription proc check(cond: bool, reason = "Check failed"): void = if not cond: fail(reason) - template stopOnRequestFailed(tbody: untyped) = - let completed = newAsyncEvent() - - let mainFut = ( - proc(): Future[void] {.async.} = - tbody - completed.fire() - )() - - let fastFailFut = ( - proc(): Future[void] {.async.} = - try: - await requestFailedEvent.wait().wait(timeout = chronos.seconds(60)) - completed.fire() - raise newException(TestFailedError, "storage request has failed") - except AsyncTimeoutError: - discard - )() + proc marketplaceSubscribe[E: Event]( + event: type E, handler: EventHandler[E] + ) {.async.} = + let sub = await marketplace.subscribe(event, handler) + subscriptions.add(sub) - await completed.wait().wait(timeout = chronos.seconds(60 * 30)) + proc tokenSubscribe( + handler: proc(event: ?!Transfer) {.gcsafe, raises: [].} + ) {.async.} = + let sub = await token.subscribe(Transfer, handler) + tokenSubscription = sub - if not fastFailFut.completed: - await fastFailFut.cancelAndWait() + proc subscribeOnRequestFulfilled( + requestId: RequestId + ): Future[AsyncEvent] {.async.} = + let event = newAsyncEvent() - if mainFut.failed: - raise mainFut.error + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult - if fastFailFut.failed: - raise fastFailFut.error + if er.requestId == requestId: + event.fire() - proc onRequestStarted(eventResult: ?!RequestFulfilled) {.raises: [].} = - requestStartedEvent.fire() + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) - proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = - requestFailedEvent.fire() + return event proc getCurrentPeriod(): Future[Period] {.async.} = return periodicity.periodOf((await ethProvider.currentTime()).truncate(uint64)) proc waitForRequestToStart( - seconds = 10 * 60 + 10 - ): Future[Period] {. - async: (raises: [CancelledError, AsyncTimeoutError, TestFailedError]) - .} = - await requestStartedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestStartedEvent = newAsyncEvent() + requestId: RequestId, seconds = 10 * 60 + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFulfilled(eventResult: ?!RequestFulfilled) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFulfilled, onRequestFulfilled) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) + + proc getSecondsTillRequestEnd(requestId: RequestId): Future[int64] {.async.} = + let currentTime = await ethProvider.currentTime() + let requestEnd = await marketplace.requestEnd(requestId) + return requestEnd.int64 - currentTime.truncate(int64) proc waitForRequestToFail( - seconds = (5 * 60) + 10 - ): Future[Period] {.async: (raises: [CancelledError, AsyncTimeoutError]).} = - await requestFailedEvent.wait().wait(timeout = chronos.seconds(seconds)) - # Recreate a new future if we need to wait for another request - requestFailedEvent = newAsyncEvent() + requestId: RequestId, seconds = (5 * 60) + 10 + ): Future[void] {.async.} = + let event = newAsyncEvent() + + proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = + assert not eventResult.isErr + let er = !eventResult + + if er.requestId == requestId: + event.fire() + + let sub = await marketplace.subscribe(RequestFailed, onRequestFailed) + subscriptions.add(sub) + + await event.wait().wait(timeout = chronos.seconds(seconds)) proc advanceToNextPeriod() {.async.} = let periodicity = Periodicity(seconds: period) @@ -145,7 +155,7 @@ template marketplacesuite*(name: string, body: untyped) = client: CodexClient, cid: Cid, proofProbability = 1.u256, - duration: uint64 = 12.periods, + duration: uint64 = 20 * 60.uint64, pricePerBytePerSecond = 1.u256, collateralPerByte = 1.u256, expiry: uint64 = 4.periods, @@ -167,6 +177,36 @@ template marketplacesuite*(name: string, body: untyped) = return id + proc requestStorage( + client: CodexClient, + proofProbability = 3.u256, + duration = 20 * 60.uint64, + pricePerBytePerSecond = 1.u256, + collateralPerByte = 1.u256, + expiry = 10 * 60.uint64, + nodes = 3, + tolerance = 1, + blocks = 8, + data = seq[byte].none, + ): Future[(PurchaseId, RequestId)] {.async.} = + let bytes = data |? await RandomChunker.example(blocks = blocks) + let cid = (await client.upload(bytes)).get + + let purchaseId = await client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = pricePerBytePerSecond, + proofProbability = proofProbability, + expiry = expiry, + collateralPerByte = collateralPerByte, + nodes = nodes, + tolerance = tolerance, + ) + + let requestId = (await client.requestId(purchaseId)).get + + return (purchaseId, requestId) + setup: marketplace = Marketplace.new(Marketplace.address, ethProvider.getSigner()) let tokenAddress = await marketplace.token() @@ -174,18 +214,12 @@ template marketplacesuite*(name: string, body: untyped) = let config = await marketplace.configuration() period = config.proofs.period periodicity = Periodicity(seconds: period) - - requestStartedEvent = newAsyncEvent() - requestFailedEvent = newAsyncEvent() - - requestStartedSubscription = - await marketplace.subscribe(RequestFulfilled, onRequestStarted) - - requestFailedSubscription = - await marketplace.subscribe(RequestFailed, onRequestFailed) - + subscriptions = @[] teardown: - await requestStartedSubscription.unsubscribe() - await requestFailedSubscription.unsubscribe() + for subscription in subscriptions: + await subscription.unsubscribe() + + if not tokenSubscription.isNil: + await tokenSubscription.unsubscribe() body diff --git a/tests/integration/multinodes.nim b/tests/integration/multinodes.nim index 51c5ccb41..808275510 100644 --- a/tests/integration/multinodes.nim +++ b/tests/integration/multinodes.nim @@ -107,7 +107,10 @@ template multinodesuite*(name: string, body: untyped) = currentTestName = tname nodeConfigs = startNodeConfigs test tname: - failAndTeardownOnError("test failed", tbody) + tbody + + template fail(reason: string) = + raise newException(TestFailedError, reason) proc sanitize(pathSegment: string): string = var sanitized = pathSegment @@ -255,21 +258,34 @@ template multinodesuite*(name: string, body: untyped) = return await newCodexProcess(validatorIdx, config, Role.Validator) - proc teardownImpl() {.async.} = + proc teardownImpl() {.async: (raises: [CancelledError]).} = for nodes in @[validators(), clients(), providers()]: for node in nodes: - await node.stop() # also stops rest client - node.removeDataDir() + try: + await node.stop() # also stops rest client + node.removeDataDir() + except CancelledError as e: + raise e + # Raised by removeDataDir + except Exception as e: + error "error when trying to stop the node", error = e.msg # if hardhat was started in the test, kill the node # otherwise revert the snapshot taken in the test setup let hardhat = hardhat() if not hardhat.isNil: - await hardhat.stop() + try: + await hardhat.stop() + except CancelledError as e: + raise e + except CatchableError as e: + error "error when trying to stop hardhat", error = e.msg else: - discard await send(ethProvider, "evm_revert", @[snapshot]) - - await ethProvider.close() + try: + discard await send(ethProvider, "evm_revert", @[snapshot]) + await ethProvider.close() + except ProviderError as e: + error "error when trying to cleanup the evm", error = e.msg running = @[] @@ -285,7 +301,7 @@ template multinodesuite*(name: string, body: untyped) = await teardownImpl() when declared(teardownAllIMPL): teardownAllIMPL() - raise er + fail(er.msg) proc updateBootstrapNodes( node: CodexProcess From 42422e5844fb3ea523fbe997b0f98a1a3aefc4b4 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 7 Jul 2025 12:30:41 +0200 Subject: [PATCH 09/12] Reduce the colleral for the first availability --- tests/integration/30_minutes/testslotrepair.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index 32a95959f..9e1949d81 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -90,7 +90,7 @@ marketplacesuite(name = "SP Slot Repair"): totalSize = 2 * size.truncate(uint64), duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, - totalCollateral = 3 * size * collateralPerByte, + totalCollateral = 2 * size * collateralPerByte, ) ).get let availability1 = ( @@ -128,6 +128,7 @@ marketplacesuite(name = "SP Slot Repair"): await provider0.client.patchAvailability( availabilityId = availability0.id, totalSize = (3 * size.truncate(uint64)).uint64.some, + totalCollateral = (3.u256 * size * collateralPerByte).some, ) await marketplaceSubscribe(SlotFilled, onSlotFilled) From 24f9454b3846cc18ee7a3f6c5dec102a57212c32 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 7 Jul 2025 12:31:05 +0200 Subject: [PATCH 10/12] Add more delay to give more time to get the request failed event --- tests/integration/30_minutes/testvalidator.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/30_minutes/testvalidator.nim b/tests/integration/30_minutes/testvalidator.nim index b60f048bd..c65a0597f 100644 --- a/tests/integration/30_minutes/testvalidator.nim +++ b/tests/integration/30_minutes/testvalidator.nim @@ -81,7 +81,7 @@ marketplacesuite(name = "Validation"): let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd - await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64 + 60) test "validator uses historical state to mark missing proofs", NodeConfigs( @@ -149,4 +149,4 @@ marketplacesuite(name = "Validation"): let secondsTillRequestEnd = await getSecondsTillRequestEnd(requestId) debug "validation suite", secondsTillRequestEnd = secondsTillRequestEnd - await waitForRequestToFail(requestId, secondsTillRequestEnd.int64) + await waitForRequestToFail(requestId, secondsTillRequestEnd.int64 + 60) From 6a8445d7af78b86e33ae357c6a53d54f3bf5e33a Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 7 Jul 2025 12:31:15 +0200 Subject: [PATCH 11/12] Restore duration value --- tests/integration/marketplacesuite.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index e3b010a09..6923f2f80 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -155,7 +155,7 @@ template marketplacesuite*(name: string, body: untyped) = client: CodexClient, cid: Cid, proofProbability = 1.u256, - duration: uint64 = 20 * 60.uint64, + duration: uint64 = 12.periods, pricePerBytePerSecond = 1.u256, collateralPerByte = 1.u256, expiry: uint64 = 4.periods, From 881a32f929d05969fcd5c18fb291adaca530efeb Mon Sep 17 00:00:00 2001 From: Arnaud Date: Mon, 7 Jul 2025 18:41:08 +0200 Subject: [PATCH 12/12] Stop the test when the request fails --- .../30_minutes/testmarketplace.nim | 14 +++-- tests/integration/30_minutes/testproofs.nim | 9 ++-- .../integration/30_minutes/testslotrepair.nim | 11 ++-- tests/integration/5_minutes/testsales.nim | 16 +++--- tests/integration/marketplacesuite.nim | 51 +++++++++++++++++++ 5 files changed, 82 insertions(+), 19 deletions(-) diff --git a/tests/integration/30_minutes/testmarketplace.nim b/tests/integration/30_minutes/testmarketplace.nim index 235abcddf..5afd627d0 100644 --- a/tests/integration/30_minutes/testmarketplace.nim +++ b/tests/integration/30_minutes/testmarketplace.nim @@ -40,7 +40,8 @@ marketplacesuite(name = "Marketplace"): # As we use in tests ethProvider.currentTime() which uses block timestamp this can lead to synchronization issues. await ethProvider.advanceTime(1.u256) - test "nodes negotiate contracts on the marketplace", marketplaceConfig: + test "nodes negotiate contracts on the marketplace", + marketplaceConfig, stopOnRequestFail = true: # host makes storage available let availability = ( await host.postAvailability( @@ -83,7 +84,7 @@ marketplacesuite(name = "Marketplace"): check slot.request.id == purchase.requestId test "node slots gets paid out and rest of tokens are returned to client", - marketplaceConfig: + marketplaceConfig, stopOnRequestFail = true: var providerRewardEvent = newAsyncEvent() var clientFundsEvent = newAsyncEvent() var transferEvent = newAsyncEvent() @@ -177,7 +178,8 @@ marketplacesuite(name = "Marketplace"): # .withLogFile() # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") .some, - ): + ), + stopOnRequestFail = true: var requestId: RequestId # We create an avavilability allowing the first SP to host the 3 slots. @@ -254,7 +256,8 @@ marketplacesuite(name = "Marketplace payouts"): # "node", "marketplace", "sales", "reservations", "node", "statemachine" # ) .some, - ): + ), + stopOnRequestFail = true: let client = clients()[0] let provider = providers()[0] let clientApi = client.client @@ -365,7 +368,8 @@ marketplacesuite(name = "Marketplace payouts"): # "node", "marketplace", "sales", "reservations", "statemachine" # ) .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] diff --git a/tests/integration/30_minutes/testproofs.nim b/tests/integration/30_minutes/testproofs.nim index d77727746..8b91050d7 100644 --- a/tests/integration/30_minutes/testproofs.nim +++ b/tests/integration/30_minutes/testproofs.nim @@ -34,7 +34,8 @@ marketplacesuite(name = "Hosts submit regular proofs"): # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("marketplace", "sales", "reservations", "node", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 5.periods @@ -118,7 +119,8 @@ marketplacesuite(name = "Simulate invalid proofs"): # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("validator", "onchain", "ethers", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods @@ -162,7 +164,8 @@ marketplacesuite(name = "Simulate invalid proofs"): # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("validator", "onchain", "ethers", "clock") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0].client let expiry = 10.periods let duration = expiry + 10.periods diff --git a/tests/integration/30_minutes/testslotrepair.nim b/tests/integration/30_minutes/testslotrepair.nim index 9e1949d81..69e45ff72 100644 --- a/tests/integration/30_minutes/testslotrepair.nim +++ b/tests/integration/30_minutes/testslotrepair.nim @@ -4,7 +4,7 @@ import ../../contracts/time import ../../contracts/deployment import ../../codex/helpers import ../../examples -import ../marketplacesuite +import ../marketplacesuite as marketplacesuite import ../nodeconfigs export logutils @@ -75,7 +75,8 @@ marketplacesuite(name = "SP Slot Repair"): # .debug() # .withLogFile() # .withLogTopics("validator").some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] @@ -154,7 +155,8 @@ marketplacesuite(name = "SP Slot Repair"): # .debug() .withLogFile() .withLogTopics("marketplace", "sales", "statemachine", "reservations").some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] @@ -236,7 +238,8 @@ marketplacesuite(name = "SP Slot Repair"): # .withLogFile() # .withLogTopics("marketplace", "sales", "statemachine", "reservations") .some, - ): + ), + stopOnRequestFail = true: let client0 = clients()[0] let provider0 = providers()[0] let provider1 = providers()[1] diff --git a/tests/integration/5_minutes/testsales.nim b/tests/integration/5_minutes/testsales.nim index d8d5dcca6..bd70685b8 100644 --- a/tests/integration/5_minutes/testsales.nim +++ b/tests/integration/5_minutes/testsales.nim @@ -34,7 +34,7 @@ marketplacesuite(name = "Sales"): host = providers()[0].client client = clients()[0].client - test "node handles new storage availability", salesConfig: + test "node handles new storage availability", salesConfig, stopOnRequestFail = true: let availability1 = ( await host.postAvailability( totalSize = 1.uint64, @@ -53,7 +53,7 @@ marketplacesuite(name = "Sales"): ).get check availability1 != availability2 - test "node lists storage that is for sale", salesConfig: + test "node lists storage that is for sale", salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 1.uint64, @@ -64,7 +64,7 @@ marketplacesuite(name = "Sales"): ).get check availability in (await host.getAvailabilities()).get - test "updating availability", salesConfig: + test "updating availability", salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -95,7 +95,8 @@ marketplacesuite(name = "Sales"): check updatedAvailability.enabled == false check updatedAvailability.until == until - test "updating availability - updating totalSize", salesConfig: + test "updating availability - updating totalSize", + salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -112,7 +113,7 @@ marketplacesuite(name = "Sales"): check updatedAvailability.freeSize == 100000 test "updating availability - updating totalSize does not allow bellow utilized", - salesConfig: + salesConfig, stopOnRequestFail = true: let originalSize = 0xFFFFFF.uint64 let minPricePerBytePerSecond = 3.u256 let collateralPerByte = 1.u256 @@ -157,7 +158,8 @@ marketplacesuite(name = "Sales"): check newUpdatedAvailability.totalSize == originalSize + 20000 check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 - test "updating availability fails with until negative", salesConfig: + test "updating availability fails with until negative", + salesConfig, stopOnRequestFail = true: let availability = ( await host.postAvailability( totalSize = 140000.uint64, @@ -174,7 +176,7 @@ marketplacesuite(name = "Sales"): (await response.body) == "Cannot set until to a negative value" test "returns an error when trying to update the until date before an existing a request is finished", - salesConfig: + salesConfig, stopOnRequestFail = true: let size = 0xFFFFFF.uint64 let duration = 20 * 60.uint64 let expiry = 10 * 60.uint64 diff --git a/tests/integration/marketplacesuite.nim b/tests/integration/marketplacesuite.nim index 6923f2f80..f226387a1 100644 --- a/tests/integration/marketplacesuite.nim +++ b/tests/integration/marketplacesuite.nim @@ -23,6 +23,56 @@ template marketplacesuite*(name: string, body: untyped) = var token {.inject, used.}: Erc20Token var subscriptions: seq[Subscription] = @[] var tokenSubscription: Subscription + var requestFailedEvent: AsyncEvent + + template test(tname, startNodeConfigs, stopOnRequestFail, tbody) = + test tname, startNodeConfigs: + stopOnRequestFailed: + tbody + + template stopOnRequestFailed(tbody: untyped) = + let completed = newAsyncEvent() + let requestFailedEvent = newAsyncEvent() + + proc onRequestFailed(eventResult: ?!RequestFailed) {.raises: [].} = + assert not eventResult.isErr + requestFailedEvent.fire() + + let sub = await marketplace.subscribe(RequestFailed, onRequestFailed) + subscriptions.add(sub) + + let mainFut = ( + proc(): Future[void] {.async.} = + try: + tbody + completed.fire() + except CancelledError as e: + raise e + except CatchableError as e: + completed.fire() + raise e + )() + + let fastFailFut = ( + proc(): Future[void] {.async.} = + try: + await requestFailedEvent.wait().wait(timeout = chronos.seconds(60)) + completed.fire() + raise newException(TestFailedError, "storage request has failed") + except AsyncTimeoutError: + discard + )() + + await completed.wait().wait(timeout = chronos.seconds(60 * 30)) + + if not fastFailFut.completed: + await fastFailFut.cancelAndWait() + + if mainFut.failed: + raise mainFut.error + + if fastFailFut.failed: + raise fastFailFut.error proc check(cond: bool, reason = "Check failed"): void = if not cond: @@ -215,6 +265,7 @@ template marketplacesuite*(name: string, body: untyped) = period = config.proofs.period periodicity = Periodicity(seconds: period) subscriptions = @[] + requestFailedEvent = newAsyncEvent() teardown: for subscription in subscriptions: await subscription.unsubscribe()