Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: script improvements for recent testing #4474

Merged
merged 4 commits into from
Aug 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions test/heapwatch/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,54 @@
# Heap Watch

Tools for checking if algod has memory leaks.
Collect RAM, bandwidth, and other stats over the course of a test cluster run.

Run a local private network of three nodes and two pingpongs.
Produce reports and plots from data.

Periodically sample pprof memory profiles.
## Scripts

Watch memory usage from `ps` and write to a CSV file for each algod.
* heapWatch.py
* collect data from algod
* heap profiling, /metrics, cpu profiling, block headers, goroutine profile
* capture from local algod by data dir or cluster from terraform-inventory.host
* convert profiles to svg or other reports

# Usage
* block_history.py
* Capture block headers every round from a running `algod`

* block_history_relays.py
* Capture block headers every round from one or more running `algod`
* Talk to a set of relays found in a terraform-inventory.host file.

* block_history_plot.py
* Plot the output of test/heapwatch/{block_history.py,block_history_relays.py}

* client_ram_report.py
* Process heap profiles (*.heap) collected from heapWatch.py
* Create a report on `algod` RAM usage

* plot_crr_csv.py
* Plot the output of test/heapwatch/client_ram_report.py --csv

* metrics_delta.py
* Process /metrics data captured by heapWatch.py
* Generate text report on bandwidth in and out of relays/PN/NPN
* optionally plot txn pool fullness

* start.sh stop.sh
* Run a local private network of three nodes and two pingpongs.
* Periodically sample pprof memory profiles.
* Watch memory usage from `ps` and write to a CSV file for each algod.

* bwstart.sh stop.sh
* Run a local private network of 3 relays and 8 leafs
* Run 40 TPS of payment txns through it.
* Record metrics for bandwidth analysis.

* runNodeHost.py nodeHostTarget.py
* run new ec2 host with npn and pn algod on it pointed at one relay (no DNS needed)


## heapWatch.py local cluster usage

To start:

Expand Down
258 changes: 258 additions & 0 deletions test/heapwatch/block_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
#!/usr/bin/env python3
# Copyright (C) 2019-2022 Algorand, Inc.
# This file is part of go-algorand
#
# go-algorand is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# go-algorand is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with go-algorand. If not, see <https://www.gnu.org/licenses/>.
#
###
#
# Capture block headers every round from a running `algod`
#
# pip install py-algorand-sdk

import argparse
import base64
import logging
import os
import re
import signal
import sys
import time

import algosdk
from algosdk.encoding import msgpack
from algosdk.v2client.algod import AlgodClient

logger = logging.getLogger(__name__)

def addr_token_from_algod(algorand_data):
with open(os.path.join(algorand_data, 'algod.net')) as fin:
addr = fin.read().strip()
with open(os.path.join(algorand_data, 'algod.token')) as fin:
token = fin.read().strip()
if not addr.startswith('http'):
addr = 'http://' + addr
return addr, token

def loads(blob):
return msgpack.loads(base64.b64decode(blob), strict_map_key=False)

def dumps(blob):
return base64.b64encode(msgpack.dumps(blob))

class Fetcher:
def __init__(self, algorand_data=None, token=None, addr=None, headers=None, prev_round=None, outpath=None):
"""
algorand_data = path to algod data dir
addr, token = algod URI and access token
headers = dict of HTTP headers to send to algod
prev_round = start with (prev_round + 1)
outpath = path to append base64-msgpack-per-line data to
"""
self.algorand_data = algorand_data
self.token = token
self.addr = addr
self.headers = headers
self._algod = None
self.go = True
self.prev_round = prev_round
self.block_time = None
self.outpath = outpath
self._outf = None
if outpath and ((prev_round is None) or (prev_round == -1)):
# load data, find last known round in data
try:
with open(outpath) as fin:
for line in fin:
if not line:
continue
line = line.strip()
if not line:
continue
if line[0] == '#':
continue
ob = loads(line)
rnd = ob['block'].get('rnd', 0)
if (self.prev_round is None) or (rnd > self.prev_round):
self.prev_round = rnd
except:
pass # whatever
return

def algod(self):
"return an open algosdk.v2client.algod.AlgodClient"
if self._algod is None:
if self.algorand_data:
addr, token = addr_token_from_algod(self.algorand_data)
logger.debug('algod from %r, (%s %s)', self.algorand_data, addr, token)
else:
token = self.token
addr = self.addr
logger.debug('algod from args (%s %s)', self.addr, self.token)
self._algod = AlgodClient(token, addr, headers=self.headers)
return self._algod

def outf(self):
if self._outf is None:
self._outf = open(self.outpath, 'ab')
return self._outf

def nextblock(self, lastround=None, retries=30):
trycount = 0
while (trycount < retries) and self.go:
trycount += 1
try:
return self._nextblock_inner(lastround)
except Exception as e:
if trycount >= retries:
logger.error('too many errors in nextblock retries')
raise
else:
logger.warning('error in nextblock(%r) (retrying): %s', lastround, e)
self._algod = None # retry with a new connection
time.sleep(1.2)
return None

def _nextblock_inner(self, lastround):
self.block_time = None
algod = self.algod()
if lastround is None:
status = algod.status()
lastround = status['last-round']
logger.debug('nextblock status last-round %s', lastround)
else:
try:
blk = self.algod().block_info(lastround + 1, response_format='msgpack')
if blk:
return blk
logger.warning('null block %d, lastround=%r', lastround+1, lastround)
except Exception as e:
pass
#logger.debug('could not get block %d: %s', lastround + 1, e, exc_info=True)
status = algod.status_after_block(lastround)
block_time = time.time() # the block has happened, don't count block data transit time
nbr = status['last-round']
retries = 30
while (nbr > lastround + 1) and self.go:
# if more than one block elapsed, we don't have a good time for either block
block_time = None
# try lastround+1 one last time
try:
blk = self.algod().block_info(lastround + 1, response_format='msgpack')
if blk:
return blk
logger.warning('null block %d, lastround=%r, status.last-round=%d', lastround+1, lastround, nbr)
time.sleep(1.1)
retries -= 1
if retries <= 0:
raise Exception("too many null block for %d", lastround+1)
except:
break
blk = self.algod().block_info(nbr, response_format='msgpack')
if blk:
self.block_time = block_time
return blk
raise Exception('got None for blk {}'.format(nbr))

def loop(self):
"""Start processing blocks and txns
runs until error or bot.go=False
"""
try:
self._loop_inner(self.prev_round)
finally:
self.close()

def _loop_inner(self, lastround):
while self.go:
b = self.nextblock(lastround)
if b is None:
print("got None nextblock. exiting")
return
b = msgpack.loads(b, strict_map_key=False)
nowround = b['block'].get('rnd', 0)
if (lastround is not None) and (nowround != lastround + 1):
logger.info('round jump %d to %d', lastround, nowround)
self._block_handler(b)
lastround = nowround

def _block_handler(self, b):
# throw away txns, count is kept in round differential ['block']['tc']
b['block'].pop('txns', [])
# throw away certs
b.pop('cert', None)
# Add fine grained time. This should be better than ['block']['ts']
b['_time'] = self.block_time or time.time()
self.outf().write(dumps(b) + b'\n')

def close(self):
self._algod = None

def header_list_to_dict(hlist):
if not hlist:
return None
p = re.compile(r':\s+')
out = {}
for x in hlist:
a, b = p.split(x, 1)
out[a] = b
return out

def main():
ap = argparse.ArgumentParser()
ap.add_argument('-d', '--algod', default=None, help='algod data dir')
ap.add_argument('-a', '--addr', default=None, help='algod host:port address')
ap.add_argument('-t', '--token', default=None, help='algod API access token')
ap.add_argument('--header', dest='headers', nargs='*', help='"Name: value" HTTP header (repeatable)')
ap.add_argument('--all', default=False, action='store_true', help='fetch all blocks from 0')
ap.add_argument('--verbose', default=False, action='store_true')
ap.add_argument('-o', '--out', default=None, help='file to append json lines to')
args = ap.parse_args()

if args.verbose:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)

algorand_data = args.algod or os.getenv('ALGORAND_DATA')
if not algorand_data and not (args.token and args.addr):
sys.stderr.write('must specify algod data dir by $ALGORAND_DATA or -d/--algod; OR --a/--addr and -t/--token\n')
sys.exit(1)

prev_round = None
if args.all:
prev_round = -1
bot = Fetcher(
algorand_data,
token=args.token,
addr=args.addr,
headers=header_list_to_dict(args.headers),
outpath=args.out,
prev_round=prev_round,
)

import signal
def do_graceful_stop(signum, frame):
if bot.go == False:
sys.stderr.write("second signal, quitting\n")
sys.exit(1)
sys.stderr.write("graceful stop...\n")
bot.go = False
signal.signal(signal.SIGTERM, do_graceful_stop)
signal.signal(signal.SIGINT, do_graceful_stop)

bot.loop()

if __name__ == '__main__':
main()
Loading