Skip to content

Commit

Permalink
Merge d67f70d into 103d398
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc authored Dec 6, 2023
2 parents 103d398 + d67f70d commit 90074c2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 60 deletions.
120 changes: 60 additions & 60 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import
../../../waku_keystore,
../../rln,
../../conversion_utils,
../group_manager_base
../group_manager_base,
./retry_wrapper

from strutils import parseHexInt

Expand Down Expand Up @@ -142,20 +143,21 @@ method register*(g: OnchainGroupManager, identityCredentials: IdentityCredential
let registryContract = g.registryContract.get()
let membershipFee = g.membershipFee.get()

let gasPrice = int(await ethRpc.provider.eth_gasPrice()) * 2
var gasPrice: int
retryWrapper(gasPrice, RetryStrategy.new(), "Failed to get gas price"):
int(await ethRpc.provider.eth_gasPrice()) * 2
let idCommitment = identityCredentials.idCommitment.toUInt256()

var txHash: TxHash
try: # send the registration transaction and check if any error occurs
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
txHash = await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)
except CatchableError:
error "error while registering the member", msg = getCurrentExceptionMsg()
raise newException(CatchableError, "could not register the member: " & getCurrentExceptionMsg())
let storageIndex = g.usingStorageIndex.get()
debug "registering the member", idCommitment = idCommitment, storageIndex = storageIndex
retryWrapper(txHash, RetryStrategy.new(), "Failed to register the member"):
await registryContract.register(storageIndex, idCommitment).send(gasPrice = gasPrice)

# wait for the transaction to be mined
let tsReceipt = await ethRpc.getMinedTransactionReceipt(txHash)
var tsReceipt: ReceiptObject
retryWrapper(tsReceipt, RetryStrategy.new(), "Failed to get the transaction receipt"):
await ethRpc.getMinedTransactionReceipt(txHash)
debug "registration transaction mined", txHash = txHash
g.registrationTxHash = some(txHash)
# the receipt topic holds the hash of signature of the raised events
Expand Down Expand Up @@ -241,9 +243,11 @@ proc getRawEvents(g: OnchainGroupManager,
let ethRpc = g.ethRpc.get()
let rlnContract = g.rlnContract.get()

let events = await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId()))
var events: JsonNode
retryWrapper(events, RetryStrategy.new(), "Failed to get the events"):
await rlnContract.getJsonLogs(MemberRegistered,
fromBlock = some(fromBlock.blockId()),
toBlock = some(toBlock.blockId()))
return events

proc getBlockTable(g: OnchainGroupManager,
Expand Down Expand Up @@ -340,10 +344,9 @@ proc startListeningToEvents(g: OnchainGroupManager): Future[void] {.async.} =

let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback()
try:
discard await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
except CatchableError:
raise newException(ValueError, "failed to subscribe to block headers: " & getCurrentExceptionMsg())
var blockHeaderSub: Subscription
retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)

proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
initializedGuard(g)
Expand All @@ -364,7 +367,9 @@ proc startOnchainSync(g: OnchainGroupManager): Future[void] {.async.} =
# we always want to sync from last processed block => latest
# chunk events
while true:
let currentLatestBlock = cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
var currentLatestBlock: BlockNumber
retryWrapper(currentLatestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())
if fromBlock >= currentLatestBlock:
break

Expand Down Expand Up @@ -400,14 +405,12 @@ method onWithdraw*(g: OnchainGroupManager, cb: OnWithdrawCallback) {.gcsafe.} =
method init*(g: OnchainGroupManager): Future[void] {.async.} =
var ethRpc: Web3
# check if the Ethereum client is reachable
try:
ethRpc = await newWeb3(g.ethClientUrl)
except CatchableError:
let errMsg = "could not connect to the Ethereum client: " & getCurrentExceptionMsg()
raise newException(ValueError, errMsg)

retryWrapper(ethRpc, RetryStrategy.new(), "Failed to connect to the Ethereum client"):
await newWeb3(g.ethClientUrl)
# Set the chain id
let chainId = await ethRpc.provider.eth_chainId()
var chainId: Quantity
retryWrapper(chainId, RetryStrategy.new(), "Failed to get the chain id"):
await ethRpc.provider.eth_chainId()
g.chainId = some(chainId)

if g.ethPrivateKey.isSome():
Expand All @@ -422,9 +425,14 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
let registryContract = ethRpc.contractSender(WakuRlnRegistry, registryAddress)

# get the current storage index
let usingStorageIndex = await registryContract.usingStorageIndex().call()
var usingStorageIndex: Uint16
retryWrapper(usingStorageIndex, RetryStrategy.new(), "Failed to get the storage index"):
await registryContract.usingStorageIndex().call()

g.usingStorageIndex = some(usingStorageIndex)
let rlnContractAddress = await registryContract.storages(usingStorageIndex).call()
var rlnContractAddress: Address
retryWrapper(rlnContractAddress, RetryStrategy.new(), "Failed to get the rln contract address"):
await registryContract.storages(usingStorageIndex).call()
let rlnContract = ethRpc.contractSender(RlnStorage, rlnContractAddress)

g.ethRpc = some(ethRpc)
Expand Down Expand Up @@ -477,39 +485,36 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =

# check if the contract exists by calling a static function
var membershipFee: Uint256
try:
membershipFee = await rlnContract.MEMBERSHIP_DEPOSIT().call()
except CatchableError:
raise newException(ValueError,
"could not get the membership deposit: " & getCurrentExceptionMsg())
retryWrapper(membershipFee, RetryStrategy.new(), "Failed to get the membership deposit"):
await rlnContract.MEMBERSHIP_DEPOSIT().call()
g.membershipFee = some(membershipFee)

var deployedBlockNumber: Uint256
try:
deployedBlockNumber = await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress
except CatchableError:
raise newException(ValueError,
"could not get the deployed block number: " & getCurrentExceptionMsg())
retryWrapper(deployedBlockNumber, RetryStrategy.new(), "Failed to get the deployed block number"):
await rlnContract.deployedBlockNumber().call()
debug "using rln storage", deployedBlockNumber, rlnContractAddress
g.rlnContractDeployedBlockNumber = cast[BlockNumber](deployedBlockNumber)
g.latestProcessedBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)

ethRpc.ondisconnect = proc() =
proc onDisconnect() {.async.} =
error "Ethereum client disconnected"
let fromBlock = max(g.latestProcessedBlock, g.rlnContractDeployedBlockNumber)
info "reconnecting with the Ethereum client, and restarting group sync", fromBlock = fromBlock
try:
let newEthRpc = waitFor newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)
except CatchableError:
error "failed to reconnect with the Ethereum client", error = getCurrentExceptionMsg()
return
var newEthRpc: Web3
retryWrapper(newEthRpc, RetryStrategy.new(), "Failed to reconnect with the Ethereum client"):
await newWeb3(g.ethClientUrl)
newEthRpc.ondisconnect = ethRpc.ondisconnect
g.ethRpc = some(newEthRpc)

try:
asyncSpawn g.startOnchainSync()
except CatchableError:
error "failed to restart group sync", error = getCurrentExceptionMsg()

ethRpc.ondisconnect = proc() =
asyncCheck onDisconnect()


waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true

Expand All @@ -526,12 +531,10 @@ method stop*(g: OnchainGroupManager): Future[void] {.async.} =
proc isSyncing*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
let ethRpc = g.ethRpc.get()

try:
let syncing = await ethRpc.provider.eth_syncing()
return syncing.getBool()
except CatchableError:
error "failed to get the syncing status", error = getCurrentExceptionMsg()
return false
var syncing: JsonNode
retryWrapper(syncing, RetryStrategy.new(), "Failed to get the syncing status"):
await ethRpc.provider.eth_syncing()
return syncing.getBool()

method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
initializedGuard(g)
Expand All @@ -540,14 +543,11 @@ method isReady*(g: OnchainGroupManager): Future[bool] {.async,gcsafe.} =
return false

var currentBlock: BlockNumber
try:
currentBlock = cast[BlockNumber](await g.ethRpc
.get()
.provider
.eth_blockNumber())
except CatchableError:
error "failed to get the current block number", error = getCurrentExceptionMsg()
return false
retryWrapper(currentBlock, RetryStrategy.new(), "Failed to get the current block number"):
cast[BlockNumber](await g.ethRpc
.get()
.provider
.eth_blockNumber())

if g.latestProcessedBlock < currentBlock:
return false
Expand Down
32 changes: 32 additions & 0 deletions waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import
chronos


type RetryStrategy* = object
shouldRetry*: bool
retryDelay*: Duration
retryCount*: uint

proc new*(T: type RetryStrategy): RetryStrategy =
return RetryStrategy(
shouldRetry: true,
retryDelay: 1000.millis,
retryCount: 3
)


template retryWrapper*(res: auto,
retryStrategy: RetryStrategy,
errStr: string,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
while shouldRetry and retryCount > 0:
try:
res = body
shouldRetry = false
except:
retryCount -= 1
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())

0 comments on commit 90074c2

Please sign in to comment.