diff --git a/debezium/demo/docker-compose.yml b/debezium/demo/docker-compose.yml index 5927f7a2ace..e1abe7df828 100644 --- a/debezium/demo/docker-compose.yml +++ b/debezium/demo/docker-compose.yml @@ -10,6 +10,7 @@ services: service: server volumes: - ../scripts:/scripts + - ./logs:/logs web: extends: diff --git a/debezium/perf/.gitignore b/debezium/perf/.gitignore new file mode 100644 index 00000000000..98d8a5a6304 --- /dev/null +++ b/debezium/perf/.gitignore @@ -0,0 +1 @@ +logs diff --git a/debezium/perf/README.md b/debezium/perf/README.md index 909975caaf9..3b5e2d475ec 100644 --- a/debezium/perf/README.md +++ b/debezium/perf/README.md @@ -1,5 +1,4 @@ -Debezium - Kafka Perf -===================== +# Debezium - Kafka Perf The docker compose file in this directory is similar to the one in the ../demo directory, with additional @@ -19,8 +18,130 @@ analysis oriented, it has considerably larger requirements than our other feature-oriented demos. -Once the compose is running -=========================== +## Additional building steps + +On top of what is required for `../demo` (see +`../demo/README.md`), the automated testing +requires building the Deephaven Java client examples. +At the toplevel directory of your git clone (`../..`), run: + +``` +./gradlew java-client-session-examples:installDist +``` + +## Automated testing + +The script `run_experiment.sh` in this directory performs a +full test for one engine (either Deephaven or Materialize). +It will: + +- Start the containers required for a particular run (and only those). +- Ensure container logs are preserved for the run. +- Load the demo code in the respective engine and sample update delays to a log file. +- Set the given pageviews per second rate, and wait a fixed amount of time thereafter for processing to settle. +- Take multiple samples for CPU and memory utilization over a defined period. + Output from top in batch mode is sent to a log file and later post-processed. +- Stop and "reset" the containers. + +The example + +``` +cd debezium/perf +./run_experiment.sh dh 5000 20 10 1.0 +``` + +will run an experiment for Deephaven (tag `dh`; use tag `mz` for Materialize) with a target rate of 5,000 pageviews per second. +It will wait 20 seconds after setting the target rate to begin sampling CPU and memory utilization using `top` in batch mode. +10 samples will be obtained, with a delay between samples of 1.0 seconds. + +Example output from a run: + +``` +cfs@erke 12:18:20 ~/dh/oss3/deephaven-core/debezium/perf +$ ./run_experiment.sh dh 5000 20 10 1.0 +About to run an experiment for dh with 5000 pageviews/s. + +Actions that will be performed in this run: +1. Start compose services required for for dh. +2. Execute demo in dh and setup update delay logging. +3. Set 5000 pageviews per second rate. +4. Wait 20 seconds. +5. Take 10 samples for mem and CPU utilization, 1.0 seconds between samples. +6. Stop and 'reset' (down) compose. + +Running experiment. + +1. Starting compose. +PERF_TAG=2022.03.22.16.18.41_UTC_dh_5000 + +Logs are being saved to logs/2022.03.22.16.18.41_UTC_dh_5000. + +2. Running demo in dh and sampling delays. +1 compiler directives added +Table users = +Table items = +Table purchases = +Table pageviews = +Table pageviews_stg = +Table purchases_by_item = +Table pageviews_by_item = +Table item_summary = +Table top_viewed_items = +Table top_converting_items = +Table profile_views_per_minute_last_10 = +Table profile_views = +Table profile_views_enriched = +Table dd_flagged_profiles = +Table dd_flagged_profile_view = +Table high_value_users = +Table hvu_test = +Table pageviews_summary = + +1 compiler directives added +No displayable variables updated + + +3. Setting pageviews per second +LOADGEN Connected. +Setting pageviews_per_second: old value was 50, new value is 5000. +Goodbye. + +4. Waiting for 20 seconds. + +5. Sampling top. +name=redpanda, tag=CPU_PCT, mean=84.14, samples=80.0, 84.2, 85.0, 87.0, 85.0, 82.0, 85.0, 84.0, 84.2, 85.0 +name=redpanda, tag=RES_GiB, mean=0.77, samples=0.7678, 0.7698, 0.7698, 0.7698, 0.7718, 0.7718, 0.7718, 0.7718, 0.7718, 0.7776 +name=deephaven, tag=CPU_PCT, mean=35.21, samples=66.7, 31.7, 28.0, 31.0, 27.0, 23.0, 46.0, 47.0, 25.7, 26.0 +name=deephaven, tag=RES_GiB, mean=2.40, samples=2.4, 2.4, 2.4, 2.4, 2.4, 2.4, 2.4, 2.4, 2.4, 2.4 + +6. Stopping and 'reset' (down) compose. +Stopping core-debezium-perf_envoy_1 ... done +Stopping core-debezium-perf_grpc-proxy_1 ... done +Stopping core-debezium-perf_loadgen_1 ... done +Stopping core-debezium-perf_debezium_1 ... done +Stopping core-debezium-perf_server_1 ... done +Stopping core-debezium-perf_redpanda_1 ... done +Stopping core-debezium-perf_mysql_1 ... done +Stopping core-debezium-perf_web_1 ... done +Removing core-debezium-perf_envoy_1 ... done +Removing core-debezium-perf_grpc-proxy_1 ... done +Removing core-debezium-perf_loadgen_1 ... done +Removing core-debezium-perf_debezium_1 ... done +Removing core-debezium-perf_server_1 ... done +Removing core-debezium-perf_redpanda_1 ... done +Removing core-debezium-perf_mysql_1 ... done +Removing core-debezium-perf_web_1 ... done +Removing network core-debezium-perf_default + +Experiment finished. +``` + +The CPU and memory utilization samples are shown on stdout and also saved to a file in the +new directory under `logs/`, in this case `logs/2022.03.22.16.18.41_UTC_dh_5000.` + +## Manual testing + +### Once the compose is running Both Materialize and Deephaven are running. We now can make them execute their respective demo scripts. @@ -54,8 +175,7 @@ a command socket interface for loadgen; see `../demo/README.md` for instructions. -Tracking the last processed pageview timestamp -============================================== +### Tracking the last processed pageview timestamp * In DH, the `pageviews_summary` table can help track the last pageview seen. @@ -72,8 +192,7 @@ Tracking the last processed pageview timestamp FROM pageviews_summary;' -U materialize -h localhost -p 6875 ``` -Memory and CPU requirements -=========================== +## Memory and CPU requirements The parameters used for images in the docker compose file in this directory are geared towards high message throughput. While Deephaven diff --git a/debezium/perf/dh_run_demo.sh b/debezium/perf/dh_run_demo.sh new file mode 100755 index 00000000000..f312e41274c --- /dev/null +++ b/debezium/perf/dh_run_demo.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -eu + +exec ../../java-client/session-examples/build/install/java-client-session-examples/bin/execute-script --python ../scripts/demo.py diff --git a/debezium/perf/dh_sample_dt.sh b/debezium/perf/dh_sample_dt.sh new file mode 100755 index 00000000000..a37cb27ea67 --- /dev/null +++ b/debezium/perf/dh_sample_dt.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -eu + +exec ../../java-client/session-examples/build/install/java-client-session-examples/bin/execute-script --python ../scripts/sample_dt.py diff --git a/debezium/perf/docker-compose.yml b/debezium/perf/docker-compose.yml index 1fede60aba8..be66c91cbc1 100644 --- a/debezium/perf/docker-compose.yml +++ b/debezium/perf/docker-compose.yml @@ -12,11 +12,13 @@ services: # build: ../../jprofiler-server environment: - JAVA_TOOL_OPTIONS=-Xmx${DEEPHAVEN_HEAP} -Ddeephaven.console.type=${DEEPHAVEN_CONSOLE_TYPE} -Ddeephaven.application.dir=${DEEPHAVEN_APPLICATION_DIR} + - PERF_TAG # Used to specify a subdirectory under ./logs where to store perf samples logs # For jprofiler sessions (if you tweaked the jprofiler version in jprofiler-server/Dockerfile you need to tweak the path below): # Then use this startup options: # - JAVA_TOOL_OPTIONS=-agentpath:/opt/jprofiler13.0/bin/linux-x64/libjprofilerti.so=port=8849,nowait -Xmx4g -Ddeephaven.console.type=${DEEPHAVEN_CONSOLE_TYPE} -Ddeephaven.application.dir=${DEEPHAVEN_APPLICATION_DIR} volumes: - ../scripts:/scripts + - ./logs:/logs # For jprofiler sessions: (change if using different port) # ports: # - '8849:8849' diff --git a/debezium/perf/mz_run_demo.sh b/debezium/perf/mz_run_demo.sh new file mode 100755 index 00000000000..607769c9e17 --- /dev/null +++ b/debezium/perf/mz_run_demo.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -eu + +exec docker-compose run -T mzcli -f /scripts/demo.sql diff --git a/debezium/perf/mz_sample_dt.sh b/debezium/perf/mz_sample_dt.sh new file mode 100755 index 00000000000..019e016f1ce --- /dev/null +++ b/debezium/perf/mz_sample_dt.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -eu + +if [ -z "$PERF_TAG" ]; then + echo "$0: PERF_TAG environment variable is not defined, aborting." 1>&2 + exit 1 +fi + +DATA_TAG="mz_sample_dt" +OUT=logs/${PERF_TAG}/${DATA_TAG}.log + +SCRIPT=$(cat <<'EOF' +while true; do + DATE_TAG=$(date -u '+%Y-%m-%d %H:%M:%S%z') + echo -n "$DATE_TAG|" + psql --csv -A -t -f /scripts/sample_dt.sql -U materialize -h materialized -p 6875 + sleep 1 +done +EOF +) + +(nohup docker-compose run -T --entrypoint /bin/bash mzcli -c "$SCRIPT" < /dev/null >& $OUT &) + +exit 0 diff --git a/debezium/perf/pid_from_cmdline.py b/debezium/perf/pid_from_cmdline.py new file mode 100644 index 00000000000..fc64d7b3af4 --- /dev/null +++ b/debezium/perf/pid_from_cmdline.py @@ -0,0 +1,55 @@ +import argparse +import datetime as dt +import os +import re +import subprocess +import sys + +parser = argparse.ArgumentParser(description='Match process command line regex to pid') +parser.add_argument( + 'proc_specs_strs', + metavar='PROCSPEC', + type=str, nargs='+', + help='a string of the form "name:regex" where regex should only match one process in `ps -o command` output') + +args = parser.parse_args() + +proc_specs = {} +for proc_spec_str in args.proc_specs_strs: + name, regex_str = proc_spec_str.split(':', maxsplit=1) + proc_specs[name] = re.compile(regex_str) + +ps_lines = subprocess.run( + ['ps', '-ahxww', '-o', 'pid,command' ], + stdout=subprocess.PIPE).stdout.decode('utf-8').splitlines() + +matches = {} +nmatches = 0 +my_pid = f'{os.getpid()}' + +for ps_line in ps_lines: + pid, cmd = ps_line.split(maxsplit=1) + if pid == my_pid: + continue + for name, regex in proc_specs.items(): + if re.search(regex, cmd) is not None: + prev = matches.get(name, None) + if prev is not None: + print(f"{sys.argv[0]}: found more than one match for '{name}': {prev}, {pid}, aborting", + file=sys.stderr) + sys.exit(1) + matches[name] = pid + +for name in proc_specs.keys(): + if matches.get(name, None) is None: + print(f"{sys.argv[0]}: couldn't find a match for {name}, aborting", file=sys.stderr) + sys.exit(1) + +first = True +for name, pid in matches.items(): + s = f'{name}:{pid}' + if not first: + s = ' ' + s + print(s, end='') + first = False +print() diff --git a/debezium/perf/run_experiment.sh b/debezium/perf/run_experiment.sh new file mode 100755 index 00000000000..0f846f0fdac --- /dev/null +++ b/debezium/perf/run_experiment.sh @@ -0,0 +1,66 @@ +#!/bin/sh + +set -eu + +if [ $# -ne 5 -o \( "$1" != 'dh' -a "$1" != 'mz' \) ]; then + echo "Usage: $0 dh|mz per_second_rate wait_seconds top_samples top_delay_seconds" 1>&2 + exit 1 +fi + +engine="$1" +rate_per_s="$2" +wait_s="$3" +top_samples="$4" +top_delay="$5" + +echo "About to run an experiment for ${engine} with ${rate_per_s} pageviews/s." +echo +echo "Actions that will be performed in this run:" +echo "1. Start compose services required for for ${engine}." +echo "2. Execute demo in ${engine} and setup update delay logging." +echo "3. Set ${rate_per_s} pageviews per second rate." +echo "4. Wait ${wait_s} seconds." +echo "5. Take ${top_samples} samples for mem and CPU utilization, ${top_delay} seconds between samples." +echo "6. Stop and 'reset' (down) compose." +echo +echo "Running experiment." +echo +echo "1. Starting compose." +export PERF_TAG=$(./start_perf_run.sh "$engine" "$rate_per_s") +echo "PERF_TAG=${PERF_TAG}" +echo +echo "Logs are being saved to logs/$PERF_TAG." +echo + +echo "2. Running demo in ${engine} and sampling delays." +if [ "$engine" = "mz" ]; then + ./mz_run_demo.sh + ./mz_sample_dt.sh +elif [ "$engine" = "dh" ]; then + ./dh_run_demo.sh + ./dh_sample_dt.sh +else + echo "$0: Internal error, aborting." 1>&2 + exit 1 +fi +echo + +echo "3. Setting pageviews per second" +./set_pageviews_per_second.sh $rate_per_s +echo + +echo "4. Waiting for $wait_s seconds." +sleep "$wait_s" +echo + +echo "5. Sampling top." +./sample_top.sh "$engine" "$top_samples" "$top_delay" +echo + +echo "6. Stopping and 'reset' (down) compose." + +./stop_all.sh +echo +echo "Experiment finished." + +exit 0 diff --git a/debezium/perf/sample_top.py b/debezium/perf/sample_top.py new file mode 100644 index 00000000000..3cfdcdc9a62 --- /dev/null +++ b/debezium/perf/sample_top.py @@ -0,0 +1,95 @@ +import argparse +import datetime as dt +import re +import os +import subprocess +import statistics as stats +import sys + +now_str = dt.datetime.utcnow().astimezone().strftime('%Y.%m.%d.%H.%M.%S_%Z') + +parser = argparse.ArgumentParser(description='Sample cpu utilization and memory consumption with top for given processes') + +perf_tag = os.environ.get('PERF_TAG', None) +if perf_tag is None: + print(f'{os.argv[0]}: PERF_TAG environment variable is not defined, aborting.', file=sys.stderr) + sys.exit(1) + +parser.add_argument('nsamples', metavar='NSAMPLES', type=int, + help='number of samples') +parser.add_argument('delay_s', metavar='DELAY_MS', type=str, + help='delay between samples, in seconds') +parser.add_argument('proc_specs_strs', metavar='PROCSPEC', type=str, nargs='+', + help='a string of the form "name:pid"') + +args = parser.parse_args() + +proc_specs = {} +pids = [] +pids_args = [] +name_pidre = {} +for proc_spec_str in args.proc_specs_strs: + name, pid = proc_spec_str.split(':', maxsplit=1) + pids.append(pid) + pids_args.append('-p') + pids_args.append(pid) + name_pidre[name] = re.compile(f'^{pid} ') + proc_specs[name] = pid + +top_out = subprocess.run( + ['top', '-Eg', '-n', f'{args.nsamples}', '-b', '-c', '-d', f'{args.delay_s}'] + pids_args, + stdout=subprocess.PIPE).stdout.decode('utf-8').splitlines() + +out_dir = f'./logs/{perf_tag}' +with open(f'{out_dir}/{now_str}_top_details.log', 'w') as f: + for line in top_out: + f.write(line) + f.write('\n') + +cputag = 'CPU_PCT' # CPU utilization in percentage +restag = 'RES_GiB' # Resident size in GiB + +kib_2_gib = 1.0/(1024 * 1024) + +results={} +for line in top_out: + for name, regex in name_pidre.items(): + if re.search(regex, line) is not None: + cols = line.split(maxsplit=12) + pid = proc_specs[name] + d = results.get(name, None) + if d is None: + d = results[name] = { cputag : [], restag : [] } + cpu = float(cols[8]) + d[cputag].append(cpu) + res_str = cols[5] + if res_str.endswith('g'): + res_kib = float(res_str[:-1])*1024*1024 + elif res_str.endswith('m'): + res_kib = float(res_str[:-1])*1024 + else: + res_kib = int(res_str) + d[restag].append(res_kib * kib_2_gib) + +def format_samples(precision : int, samples): + first = True + s = '' + for sample in samples: + if not first: + s += ', ' + if precision != -1: + s += f'{sample:.{precision}}' + else: + s += f'{sample}' + first = False + return s + +with open(f'{out_dir}/{now_str}_top_samples.log', 'w') as f: + for name, result in results.items(): + for tag, samples in result.items(): + mean = stats.mean(samples) + precision = 4 if tag == restag else -1 + samples_str = format_samples(precision, samples) + line = f'name={name}, tag={tag}, mean={mean:.2f}, samples={samples_str}' + print(line) + f.write(line + '\n') diff --git a/debezium/perf/sample_top.sh b/debezium/perf/sample_top.sh new file mode 100755 index 00000000000..671b6ce2efb --- /dev/null +++ b/debezium/perf/sample_top.sh @@ -0,0 +1,23 @@ +#!/bin/bash + +set -eu + +if [ $# -ne 3 -o \( "$1" != 'dh' -a "$1" != 'mz' \) ]; then + echo "Usage: $0 mz|dh rate nsamples delay_sec" +fi + +engine=$1 +nsamples=$2 +delay_s=$3 + +if [ "$engine" = "mz" ]; then + proc_spec='materialize:^materialized redpanda:^/opt/redpanda/bin/redpanda' +elif [ "$engine" = "dh" ]; then + proc_spec='deephaven:java.*deephaven redpanda:^/opt/redpanda/bin/redpanda' +else + echo "$0: Internal error, aborting." 1>&2 + exit 1 +fi + +PROC_SPECS=$(python3 ./pid_from_cmdline.py $proc_spec) +exec python3 ./sample_top.py "$nsamples" "$delay_s" $PROC_SPECS diff --git a/debezium/perf/set_pageviews_per_second.sh b/debezium/perf/set_pageviews_per_second.sh new file mode 100755 index 00000000000..0d7d95cb98b --- /dev/null +++ b/debezium/perf/set_pageviews_per_second.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +set -eu + +if [ $# -ne 1 ]; then + echo "Usage: $0 pageviews_per_second" 1>&2 + exit 1 +fi + +(echo "set pageviews_per_second $1"; echo quit) | nc localhost 8090 +exit 0 diff --git a/debezium/perf/start_perf_run.sh b/debezium/perf/start_perf_run.sh new file mode 100755 index 00000000000..60f46e1f95c --- /dev/null +++ b/debezium/perf/start_perf_run.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +set -eu + +if [ $(docker ps -q | wc -l) -gt 0 ]; then + echo "$0: some docker containers seem to be running, aborting." 1>&2 + exit 1 +fi + +if [ $# -ne 2 -o \( "$1" != 'dh' -a "$1" != 'mz' \) ]; then + echo "Usage: $0 dh|mz per_second_rate" 1>&2 + exit 1 +fi + +engine=$1 +rate=$2 + +PERF_TAG=$(date -u '+%Y.%m.%d.%H.%M.%S_%Z')_${engine}_${rate} +export PERF_TAG + +LOGS_DIR=./logs +if [ ! -d "$LOGS_DIR" ]; then + mkdir $LOGS_DIR +fi + +OUT_DIR="${LOGS_DIR}/${PERF_TAG}" +mkdir -p "$OUT_DIR" + +LOG="$OUT_DIR/start.log" +rm -f $LOG + +if [ "$engine" = "mz" ]; then + docker-compose up -d mysql redpanda debezium loadgen materialized mzcli >> $LOG 2>&1 +elif [ "$engine" = "dh" ]; then + docker-compose up -d mysql redpanda debezium loadgen server grpc-proxy envoy web >> $LOG 2>&1 +else + echo "$0: Internal error, aborting." 1>&2 + exit 1 +fi + +# fire and forget; will stop when compose stops. +COMPOSE_LOG="${OUT_DIR}/docker-compose.log" +(nohup docker-compose logs -f >& $COMPOSE_LOG < /dev/null &) + +# avoid race with creation of log file above +sleep 0.2 + +# Wait till loadgen simulation started +echo -n "Waiting for loadgen simulation to start... " >> $LOG 2>&1 +tail -f "$COMPOSE_LOG" | sed -n '/loadgen.*Simulated [0-9]* pageview actions in the last/ q' +echo "done." >> $LOG 2>&1 + +echo "${PERF_TAG}" + +exit 0 diff --git a/debezium/perf/stop_all.sh b/debezium/perf/stop_all.sh new file mode 100755 index 00000000000..4a258d82866 --- /dev/null +++ b/debezium/perf/stop_all.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +set -eu + +docker-compose stop && docker-compose down -v diff --git a/debezium/scripts/sample_dt.py b/debezium/scripts/sample_dt.py new file mode 100644 index 00000000000..c9dcd1e4ab9 --- /dev/null +++ b/debezium/scripts/sample_dt.py @@ -0,0 +1,19 @@ +from deephaven import PythonListenerAdapter +import datetime as dt +import os +import sys + +def onUpdate(added_unused, modified_unused, deleted_unused): + timestamp = dt.datetime.now().astimezone().isoformat() + total = pageviews_summary.getColumn('total').get(0) + max_received_at = pageviews_summary.getColumn('max_received_at').get(0) + dt_ms = pageviews_summary.getColumn('dt_ms').get(0) + log.write(f'timestamp={timestamp}, total={total}, max_received_at={max_received_at}, dt_ms={dt_ms}\n') + log.flush() + +now_str = dt.datetime.utcnow().astimezone().strftime('%Y.%m.%d.%H.%M.%S_%Z') + +perf_tag = os.environ.get('PERF_TAG', None) +if perf_tag is not None: + log = open(f'/logs/{perf_tag}/{now_str}_dh_sample_dt.log', 'w') + PythonListenerAdapter(pageviews_summary, onUpdate, replayInitialImage=False) diff --git a/debezium/scripts/sample_dt.sql b/debezium/scripts/sample_dt.sql new file mode 100644 index 00000000000..a482b392d02 --- /dev/null +++ b/debezium/scripts/sample_dt.sql @@ -0,0 +1,5 @@ +SELECT + total, + to_timestamp(max_received_at) max_received_ts, + mz_logical_timestamp() - max_received_at*1000 AS dt_ms +FROM pageviews_summary;