Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add size retention policy #2093

Merged
merged 15 commits into from
Sep 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pre-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ jobs:

create-release-candidate:
runs-on: ubuntu-latest
needs: [ tag-name, build-and-publish, js-waku ]
needs: [ tag-name, build-and-publish, js-waku-node, js-waku-node-optional ]
steps:
- name: Checkout code
uses: actions/checkout@v3
Expand Down
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,10 @@ nimbus-build-system.paths
*.sqlite3-wal

/examples/nodejs/build/

# Coverage
coverage_html_report/
*.info

# Wildcard
*.ignore.*
14 changes: 13 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ DOCKER_IMAGE_NIMFLAGS := $(DOCKER_IMAGE_NIMFLAGS) $(HEAPTRACK_PARAMS)
# build a docker image for the fleet
docker-image: MAKE_TARGET ?= wakunode2
docker-image: DOCKER_IMAGE_TAG ?= $(MAKE_TARGET)-$(GIT_VERSION)
docker-image: DOCKER_IMAGE_NAME ?= statusteam/nim-waku:$(DOCKER_IMAGE_TAG)
docker-image: DOCKER_IMAGE_NAME ?= wakuorg/nwaku:$(DOCKER_IMAGE_TAG)
docker-image:
docker build \
--build-arg="MAKE_TARGET=$(MAKE_TARGET)" \
Expand Down Expand Up @@ -273,6 +273,18 @@ cwaku_example: | build libwaku
vendor/nim-libbacktrace/libbacktrace_wrapper.o \
vendor/nim-libbacktrace/install/usr/lib/libbacktrace.a

cppwaku_example: | build libwaku
echo -e $(BUILD_MSG) "build/$@" && \
g++ -o "build/$@" \
./examples/cpp/waku.cpp \
./examples/cpp/base64.cpp \
-lwaku -Lbuild/ \
-pthread -ldl -lm \
-lminiupnpc -Lvendor/nim-nat-traversal/vendor/miniupnp/miniupnpc/build/ \
-lnatpmp -Lvendor/nim-nat-traversal/vendor/libnatpmp-upstream/ \
vendor/nim-libbacktrace/libbacktrace_wrapper.o \
vendor/nim-libbacktrace/install/usr/lib/libbacktrace.a

nodejswaku: | build deps
echo -e $(BUILD_MSG) "build/$@" && \
node-gyp build --directory=examples/nodejs/
Expand Down
4 changes: 2 additions & 2 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -374,10 +374,10 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..."

if conf.fleet == Fleet.test:
dnsDiscoveryUrl = some("enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@test.waku.nodes.status.im")
dnsDiscoveryUrl = some("enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im")
else:
# Connect to prod by default
dnsDiscoveryUrl = some("enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.waku.nodes.status.im")
dnsDiscoveryUrl = some("enrtree://ANEDLO25QVUGJOUTQFRYKWX6P4Z4GKVESBMHML7DZ6YK4LGS5FC5O@prod.wakuv2.nodes.status.im")

elif conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
# No pre-selected fleet. Discover nodes via DNS using user config
Expand Down
2 changes: 1 addition & 1 deletion apps/networkmonitor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Connect to the network through a given bootstrap node, with default parameters.
```

```console
./build/networkmonitor --log-level=INFO --dns-discovery-url=enrtree://AOGECG2SPND25EEFMAJ5WF3KSGJNSGV356DSTL2YVLLZWIV6SAYBM@prod.nodes.status.im
./build/networkmonitor --log-level=INFO --dns-discovery-url=enrtree://AL65EKLJAUXKKPG43HVTML5EFFWEZ7L4LOKTLZCLJASG4DSESQZEC@prod.status.nodes.status.im
```

## Metrics
Expand Down
223 changes: 143 additions & 80 deletions apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,71 @@ logScope:
topics = "networkmonitor"

const ReconnectTime = 60
const MaxConnectionRetries = 10
const MaxConnectionRetries = 5
const ResetRetriesAfter = 1200
const AvgPingWindow = 10.0

const git_version* {.strdefine.} = "n/a"

proc setDiscoveredPeersCapabilities(
routingTableNodes: seq[Node]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
networkmonitor_peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])

proc analyzePeer(
customPeerInfo: CustomPeerInfoRef,
peerInfo: RemotePeerInfo,
node: WakuNode,
timeout: chronos.Duration
): Future[Result[string, string]] {.async.} =
var pingDelay: chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

customPeerInfo.connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
customPeerInfo.retries += 1
return err(customPeerInfo.connError)

customPeerInfo.connError = ""
info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
networkmonitor_peer_ping.observe(pingDelay.millis)

if customPeerInfo.avgPingDuration == 0.millis:
customPeerInfo.avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
customPeerInfo.avgPingDuration = int64((float64(customPeerInfo.avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
customPeerInfo.lastPingDuration = pingDelay

return ok(customPeerInfo.peerId)

proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool =
let reconnetIntervalCheck = getTime().toUnix() >= customPeerInfo.lastTimeConnected + ReconnectTime
var retriesCheck = customPeerInfo.retries < MaxConnectionRetries

if not retriesCheck and getTime().toUnix() >= customPeerInfo.lastTimeConnected + ResetRetriesAfter:
customPeerInfo.retries = 0
retriesCheck = true
info "resetting retries counter", peerId=customPeerInfo.peerId

return reconnetIntervalCheck and retriesCheck

# TODO: Split in discover, connect
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
Expand All @@ -54,11 +110,10 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

let currentTime = getTime().toUnix()

# Protocols and agent string and its count
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]

var newPeers = 0
var successfulConnections = 0

var analyzeFuts: seq[Future[Result[string, string]]]

# iterate all newly discovered nodes
for discNode in discoveredNodes:
Expand All @@ -80,103 +135,104 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],

# create new entry if new peerId found
let peerId = $peerInfo.peerId
let customPeerInfo = CustomPeerInfo(peerId: peerId)

if not allPeers.hasKey(peerId):
allPeers[peerId] = customPeerInfo
allPeers[peerId] = CustomPeerInfoRef(peerId: peerId)
newPeers += 1
else:
info "already seen", peerId=peerId

allPeers[peerId].lastTimeDiscovered = currentTime
allPeers[peerId].enr = discNode.record.toURI()
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
allPeers[peerId].discovered += 1
let customPeerInfo = allPeers[peerId]

customPeerInfo.lastTimeDiscovered = currentTime
customPeerInfo.enr = discNode.record.toURI()
customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it)
customPeerInfo.discovered += 1

if not typedRecord.get().ip.isSome():
warn "ip field is not set", record=typedRecord.get()
continue

let ip = $typedRecord.get().ip.get().join(".")
allPeers[peerId].ip = ip
customPeerInfo.ip = ip

# try to ping the peer
if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries:
if allPeers[peerId].retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries

var pingDelay:chronos.Duration

proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
try:
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
pingDelay = await node.libp2pPing.ping(conn)
return ok()

except CatchableError:
var msg = getCurrentExceptionMsg()
if msg == "Future operation cancelled!":
msg = "timedout"
warn "failed to ping the peer", peer=peerInfo, err=msg

allPeers[peerId].connError = msg
return err("could not ping peer: " & msg)

let timedOut = not await ping().withTimeout(timeout)
# need this check for pingDelat == 0 because there may be a conn error before timeout
if timedOut or pingDelay == 0.millis:
allPeers[peerId].retries += 1
continue

info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
peer_ping.observe(pingDelay.millis)

if allPeers[peerId].avgPingDuration == 0.millis:
allPeers[peerId].avgPingDuration = pingDelay

# TODO: check why the calculation ends up losing precision
allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
allPeers[peerId].lastPingDuration = pingDelay

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId]
allPeers[peerId].supportedProtocols = nodeProtocols
allPeers[peerId].lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId]
allPeers[peerId].userAgent = nodeUserAgent

# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1

# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1

debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
if shouldReconnect(customPeerInfo):
if customPeerInfo.retries > 0:
warn "trying to dial failed peer again", peerId=peerId, retry=customPeerInfo.retries
analyzeFuts.add(analyzePeer(customPeerInfo, peerInfo, node, timeout))

# Wait for all connection attempts to finish
let analyzedPeers = await allFinished(analyzeFuts)

for peerIdFut in analyzedPeers:
let peerIdRes = await peerIdFut
let peerIdStr = peerIdRes.valueOr():
continue

successfulConnections += 1
let peerId = PeerId.init(peerIdStr).valueOr():
warn "failed to parse peerId", peerId=peerIdStr
continue
var customPeerInfo = allPeers[peerIdStr]

debug "connected to peer", peer=customPeerInfo[]

# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peerId]
customPeerInfo.supportedProtocols = nodeProtocols
customPeerInfo.lastTimeConnected = currentTime

# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peerId]
customPeerInfo.userAgent = nodeUserAgent

info "number of newly discovered peers", amount=newPeers
# inform the total connections that we did in this round
let nOfOkConnections = allProtocols.len()
info "number of successful connections", amount=nOfOkConnections
info "number of successful connections", amount=successfulConnections

proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe.} =
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]
var countries: Table[string, int]
var connectedPeers = 0
var failedPeers = 0

for peerInfo in allPeersRef.values:
if peerInfo.connError == "":
for protocol in peerInfo.supportedProtocols:
allProtocols[protocol] = allProtocols.mgetOrPut(protocol, 0) + 1

# store available user-agents in the network
allAgentStrings[peerInfo.userAgent] = allAgentStrings.mgetOrPut(peerInfo.userAgent, 0) + 1

if peerInfo.country != "":
countries[peerInfo.country] = countries.mgetOrPut(peerInfo.country, 0) + 1

# update count on each protocol
connectedPeers += 1
else:
failedPeers += 1

networkmonitor_peer_count.set(int64(connectedPeers), labelValues = ["true"])
networkmonitor_peer_count.set(int64(failedPeers), labelValues = ["false"])
# update count on each protocol
for protocol in allProtocols.keys():
let countOfProtocols = allProtocols[protocol]
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
let countOfProtocols = allProtocols.mgetOrPut(protocol, 0)
networkmonitor_peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
info "supported protocols in the network", protocol=protocol, count=countOfProtocols

# update count on each user-agent
for userAgent in allAgentStrings.keys():
let countOfUserAgent = allAgentStrings[userAgent]
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
let countOfUserAgent = allAgentStrings.mgetOrPut(userAgent, 0)
networkmonitor_peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent

for country in countries.keys():
let peerCount = countries.mgetOrPut(country, 0)
networkmonitor_peer_country_count.set(int64(peerCount), labelValues = [country])
info "number of peers per country", country=country, count=peerCount

proc populateInfoFromIp(allPeersRef: CustomPeersTableRef,
restClient: RestClientRef) {.async.} =
for peer in allPeersRef.keys():
Expand Down Expand Up @@ -209,6 +265,7 @@ proc crawlNetwork(node: WakuNode,

let crawlInterval = conf.refreshInterval * 1000
while true:
let startTime = Moment.now()
# discover new random nodes
let discoveredNodes = await wakuDiscv5.protocol.queryRandom()

Expand All @@ -223,6 +280,8 @@ proc crawlNetwork(node: WakuNode,
# note random discovered nodes can be already known
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, restClient, allPeersRef)

updateMetrics(allPeersRef)

# populate info from ip addresses
await populateInfoFromIp(allPeersRef, restClient)

Expand All @@ -234,8 +293,12 @@ proc crawlNetwork(node: WakuNode,
# Notes:
# we dont run ipMajorityLoop
# we dont run revalidateLoop
let endTime = Moment.now()
let elapsed = (endTime - startTime).nanos

info "crawl duration", time=elapsed.millis

await sleepAsync(crawlInterval.millis)
await sleepAsync(crawlInterval.millis - elapsed.millis)

proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): Result[seq[RemotePeerInfo], string] =
if dnsDiscovery and dnsDiscoveryUrl != "":
Expand Down
3 changes: 1 addition & 2 deletions apps/networkmonitor/networkmonitor_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ type
defaultValue: 8009,
name: "metrics-rest-port" }: uint16


proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
try:
result = ValidIpAddress.init(p)
Expand All @@ -85,7 +84,7 @@ proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] =

proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] =
try:
let conf = NetworkMonitorConf.load()
let conf = NetworkMonitorConf.load(version=git_version)
ok(conf)
except CatchableError:
err(getCurrentExceptionMsg())
Loading
Loading