Skip to content

Commit

Permalink
chore(rln-relay): remove websocket from OnchainGroupManager (#2364)
Browse files Browse the repository at this point in the history
* chore(rln-relay): remove websocket from OnchainGroupManager

* fix: swap ws for http
  • Loading branch information
rymnc authored Jan 23, 2024
1 parent 7de91d9 commit efdc524
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 43 deletions.
4 changes: 2 additions & 2 deletions apps/chat2/config_chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ type
name: "rln-relay-id-commitment-key" }: string

rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
defaultValue: "ws://localhost:8540/"
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "http://localhost:8540/"
name: "rln-relay-eth-client-address" }: string

rlnRelayEthContractAddress* {.
Expand Down
4 changes: 2 additions & 2 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type
name: "rln-relay-cred-path" }: string

rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
defaultValue: "ws://localhost:8540/",
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "http://localhost:8540/",
name: "rln-relay-eth-client-address" }: string

rlnRelayEthContractAddress* {.
Expand Down
19 changes: 9 additions & 10 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ suite "Onchain group manager":

try:
await manager.startGroupSync()
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -330,9 +330,9 @@ suite "Onchain group manager":

try:
await manager.register(dummyCommitment)
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -399,9 +399,9 @@ suite "Onchain group manager":

try:
await manager.withdraw(idSecretHash)
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -627,7 +627,7 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return false if ethRpc is none":
var manager = await setup()
let manager = await setup()
await manager.init()

manager.ethRpc = none(Web3)
Expand All @@ -644,7 +644,7 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup()
let manager = await setup()
await manager.init()

var isReady = true
Expand All @@ -659,14 +659,13 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup()
let manager = await setup()
await manager.init()
# node can only be ready after group sync is done
try:
await manager.startGroupSync()
except Exception, CatchableError:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()

var isReady = false
try:
isReady = await manager.isReady()
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_rln_relay/constants.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const
MembershipFee* = 1000000000000000.u256
# the current implementation of the rln lib supports a circuit for Merkle tree with depth 20
MerkleTreeDepth* = 20
EthClient* = "ws://127.0.0.1:8540"
EthClient* = "http://127.0.0.1:8540"

const
# the size of poseidon hash output in bits
Expand Down
82 changes: 55 additions & 27 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ type
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode]
# interval loop to shut down gracefully
blockFetchingActive*: bool

const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"

const DefaultBlockPollRate* = 6.seconds

template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized")
raise newException(CatchableError, "OnchainGroupManager is not initialized")


proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
Expand Down Expand Up @@ -316,12 +320,15 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable):

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: BlockNumber): Future[void] {.async: (raises: [Exception]).} =
toBlock: BlockNumber): Future[bool] {.async: (raises: [Exception]).} =
initializedGuard(g)

let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
try:
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
except CatchableError:
error "failed to handle events", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to handle events")

g.latestProcessedBlock = toBlock
let metadataSetRes = g.setMetadata()
Expand All @@ -330,32 +337,49 @@ proc getAndHandleEvents(g: OnchainGroupManager,
warn "failed to persist rln metadata", error=metadataSetRes.error()
else:
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock

return true

proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
g.blockFetchingActive = false

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
let latestBlock = BlockNumber(blockheader.number)
trace "block received", blockNumber = latestBlock
# get logs from the last block
try:
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback

proc newHeadErrCallback(error: CatchableError) =
warn "failed to get new head", error=error.msg
proc runIntervalLoop() {.async, gcsafe.} =
g.blockFetchingActive = true

while g.blockFetchingActive:
var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
await cb()
await sleepAsync(interval)

asyncSpawn runIntervalLoop()


proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())

if latestBlock <= g.latestProcessedBlock:
return
# get logs from the last block
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
return true
return wrappedCb

proc startListeningToEvents(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback()
var blockHeaderSub: Subscription
retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
let newBlockCallback = g.getNewBlockCallback()
g.runInInterval(newBlockCallback, DefaultBlockPollRate)

proc startOnchainSync(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} =
Expand Down Expand Up @@ -385,7 +409,9 @@ proc startOnchainSync(g: OnchainGroupManager):

let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, toBlock)
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1

except CatchableError:
Expand Down Expand Up @@ -523,13 +549,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to restart group sync", error = getCurrentExceptionMsg()

ethRpc.ondisconnect = proc() =
asyncCheck onDisconnect()
asyncSpawn onDisconnect()


waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true

method stop*(g: OnchainGroupManager): Future[void] {.async.} =
method stop*(g: OnchainGroupManager): Future[void] {.async,gcsafe.} =
g.blockFetchingActive = false

if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close()
Expand Down
5 changes: 4 additions & 1 deletion waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ template retryWrapper*(res: auto,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
var exceptionMessage = ""

while shouldRetry and retryCount > 0:
try:
res = body
shouldRetry = false
except:
retryCount -= 1
exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())
raise newException(CatchableError, errStr & ": " & exceptionMessage)

0 comments on commit efdc524

Please sign in to comment.