Skip to content

Commit

Permalink
test: script improvements for recent testing (#4474)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson authored Aug 29, 2022
1 parent 63ddd47 commit 4d862ab
Show file tree
Hide file tree
Showing 10 changed files with 1,073 additions and 60 deletions.
50 changes: 45 additions & 5 deletions test/heapwatch/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,54 @@
# Heap Watch

Tools for checking if algod has memory leaks.
Collect RAM, bandwidth, and other stats over the course of a test cluster run.

Run a local private network of three nodes and two pingpongs.
Produce reports and plots from data.

Periodically sample pprof memory profiles.
## Scripts

Watch memory usage from `ps` and write to a CSV file for each algod.
* heapWatch.py
* collect data from algod
* heap profiling, /metrics, cpu profiling, block headers, goroutine profile
* capture from local algod by data dir or cluster from terraform-inventory.host
* convert profiles to svg or other reports

# Usage
* block_history.py
* Capture block headers every round from a running `algod`

* block_history_relays.py
* Capture block headers every round from one or more running `algod`
* Talk to a set of relays found in a terraform-inventory.host file.

* block_history_plot.py
* Plot the output of test/heapwatch/{block_history.py,block_history_relays.py}

* client_ram_report.py
* Process heap profiles (*.heap) collected from heapWatch.py
* Create a report on `algod` RAM usage

* plot_crr_csv.py
* Plot the output of test/heapwatch/client_ram_report.py --csv

* metrics_delta.py
* Process /metrics data captured by heapWatch.py
* Generate text report on bandwidth in and out of relays/PN/NPN
* optionally plot txn pool fullness

* start.sh stop.sh
* Run a local private network of three nodes and two pingpongs.
* Periodically sample pprof memory profiles.
* Watch memory usage from `ps` and write to a CSV file for each algod.

* bwstart.sh stop.sh
* Run a local private network of 3 relays and 8 leafs
* Run 40 TPS of payment txns through it.
* Record metrics for bandwidth analysis.

* runNodeHost.py nodeHostTarget.py
* run new ec2 host with npn and pn algod on it pointed at one relay (no DNS needed)


## heapWatch.py local cluster usage

To start:

Expand Down
258 changes: 258 additions & 0 deletions test/heapwatch/block_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#!/usr/bin/env python3
# Copyright (C) 2019-2022 Algorand, Inc.
# This file is part of go-algorand
#
# go-algorand is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# go-algorand is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
#
###
#
# Capture block headers every round from a running `algod`
#
# pip install py-algorand-sdk

import argparse
import base64
import logging
import os
import re
import signal
import sys
import time

import algosdk
from algosdk.encoding import msgpack
from algosdk.v2client.algod import AlgodClient

logger = logging.getLogger(__name__)

def addr_token_from_algod(algorand_data):
with open(os.path.join(algorand_data, 'algod.net')) as fin:
addr = fin.read().strip()
with open(os.path.join(algorand_data, 'algod.token')) as fin:
token = fin.read().strip()
if not addr.startswith('http'):
addr = 'http://' + addr
return addr, token

def loads(blob):
return msgpack.loads(base64.b64decode(blob), strict_map_key=False)

def dumps(blob):
return base64.b64encode(msgpack.dumps(blob))

class Fetcher:
def __init__(self, algorand_data=None, token=None, addr=None, headers=None, prev_round=None, outpath=None):
"""
algorand_data = path to algod data dir
addr, token = algod URI and access token
headers = dict of HTTP headers to send to algod
prev_round = start with (prev_round + 1)
outpath = path to append base64-msgpack-per-line data to
"""
self.algorand_data = algorand_data
self.token = token
self.addr = addr
self.headers = headers
self._algod = None
self.go = True
self.prev_round = prev_round
self.block_time = None
self.outpath = outpath
self._outf = None
if outpath and ((prev_round is None) or (prev_round == -1)):
# load data, find last known round in data
try:
with open(outpath) as fin:
for line in fin:
if not line:
continue
line = line.strip()
if not line:
continue
if line[0] == '#':
continue
ob = loads(line)
rnd = ob['block'].get('rnd', 0)
if (self.prev_round is None) or (rnd > self.prev_round):
self.prev_round = rnd
except:
pass # whatever
return

def algod(self):
"return an open algosdk.v2client.algod.AlgodClient"
if self._algod is None:
if self.algorand_data:
addr, token = addr_token_from_algod(self.algorand_data)
logger.debug('algod from %r, (%s %s)', self.algorand_data, addr, token)
else:
token = self.token
addr = self.addr
logger.debug('algod from args (%s %s)', self.addr, self.token)
self._algod = AlgodClient(token, addr, headers=self.headers)
return self._algod

def outf(self):
if self._outf is None:
self._outf = open(self.outpath, 'ab')
return self._outf

def nextblock(self, lastround=None, retries=30):
trycount = 0
while (trycount < retries) and self.go:
trycount += 1
try:
return self._nextblock_inner(lastround)
except Exception as e:
if trycount >= retries:
logger.error('too many errors in nextblock retries')
raise
else:
logger.warning('error in nextblock(%r) (retrying): %s', lastround, e)
self._algod = None # retry with a new connection
time.sleep(1.2)
return None

def _nextblock_inner(self, lastround):
self.block_time = None
algod = self.algod()
if lastround is None:
status = algod.status()
lastround = status['last-round']
logger.debug('nextblock status last-round %s', lastround)
else:
try:
blk = self.algod().block_info(lastround + 1, response_format='msgpack')
if blk:
return blk
logger.warning('null block %d, lastround=%r', lastround+1, lastround)
except Exception as e:
pass
#logger.debug('could not get block %d: %s', lastround + 1, e, exc_info=True)
status = algod.status_after_block(lastround)
block_time = time.time() # the block has happened, don't count block data transit time
nbr = status['last-round']
retries = 30
while (nbr > lastround + 1) and self.go:
# if more than one block elapsed, we don't have a good time for either block
block_time = None
# try lastround+1 one last time
try:
blk = self.algod().block_info(lastround + 1, response_format='msgpack')
if blk:
return blk
logger.warning('null block %d, lastround=%r, status.last-round=%d', lastround+1, lastround, nbr)
time.sleep(1.1)
retries -= 1
if retries <= 0:
raise Exception("too many null block for %d", lastround+1)
except:
break
blk = self.algod().block_info(nbr, response_format='msgpack')
if blk:
self.block_time = block_time
return blk
raise Exception('got None for blk {}'.format(nbr))

def loop(self):
"""Start processing blocks and txns
runs until error or bot.go=False
"""
try:
self._loop_inner(self.prev_round)
finally:
self.close()

def _loop_inner(self, lastround):
while self.go:
b = self.nextblock(lastround)
if b is None:
print("got None nextblock. exiting")
return
b = msgpack.loads(b, strict_map_key=False)
nowround = b['block'].get('rnd', 0)
if (lastround is not None) and (nowround != lastround + 1):
logger.info('round jump %d to %d', lastround, nowround)
self._block_handler(b)
lastround = nowround

def _block_handler(self, b):
# throw away txns, count is kept in round differential ['block']['tc']
b['block'].pop('txns', [])
# throw away certs
b.pop('cert', None)
# Add fine grained time. This should be better than ['block']['ts']
b['_time'] = self.block_time or time.time()
self.outf().write(dumps(b) + b'\n')

def close(self):
self._algod = None

def header_list_to_dict(hlist):
if not hlist:
return None
p = re.compile(r':\s+')
out = {}
for x in hlist:
a, b = p.split(x, 1)
out[a] = b
return out

def main():
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--algod', default=None, help='algod data dir')
ap.add_argument('-a', '--addr', default=None, help='algod host:port address')
ap.add_argument('-t', '--token', default=None, help='algod API access token')
ap.add_argument('--header', dest='headers', nargs='*', help='"Name: value" HTTP header (repeatable)')
ap.add_argument('--all', default=False, action='store_true', help='fetch all blocks from 0')
ap.add_argument('--verbose', default=False, action='store_true')
ap.add_argument('-o', '--out', default=None, help='file to append json lines to')
args = ap.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

algorand_data = args.algod or os.getenv('ALGORAND_DATA')
if not algorand_data and not (args.token and args.addr):
sys.stderr.write('must specify algod data dir by $ALGORAND_DATA or -d/--algod; OR --a/--addr and -t/--token\n')
sys.exit(1)

prev_round = None
if args.all:
prev_round = -1
bot = Fetcher(
algorand_data,
token=args.token,
addr=args.addr,
headers=header_list_to_dict(args.headers),
outpath=args.out,
prev_round=prev_round,
)

import signal
def do_graceful_stop(signum, frame):
if bot.go == False:
sys.stderr.write("second signal, quitting\n")
sys.exit(1)
sys.stderr.write("graceful stop...\n")
bot.go = False
signal.signal(signal.SIGTERM, do_graceful_stop)
signal.signal(signal.SIGINT, do_graceful_stop)

bot.loop()

if __name__ == '__main__':
main()
Loading

0 comments on commit 4d862ab

Please sign in to comment.