From 7211a519dc3a3a631bb167b4c7c8040dec0a64a5 Mon Sep 17 00:00:00 2001 From: SHIHONGHAO <13820618441@163.com> Date: Thu, 3 Aug 2023 09:48:25 +0800 Subject: [PATCH] Inference frame (#136) * upd ign * init inference * fix trtexec * fix trtexec * fix * upd pipe * rm secret * fix * add 5time 4perf and summary in run_inference * update monitor (#1) * finish logdir * finish merge * format * fix * lic & rdm * ur * Update README.md * fix log output * fix cal perf * fix sync * fix output * fix * fixbug * fix frame * ur * add skip validation * fix * fix kunlun * fix --------- Co-authored-by: uuup <55571217+upvenly@users.noreply.github.com> --- .gitignore | 2 + inference/benchmarks/resnet50/README.md | 86 +++ .../benchmarks/resnet50/pytorch/__init__.py | 5 + .../benchmarks/resnet50/pytorch/dataloader.py | 49 ++ .../benchmarks/resnet50/pytorch/evaluator.py | 10 + .../benchmarks/resnet50/pytorch/export.py | 34 + .../benchmarks/resnet50/pytorch/forward.py | 103 +++ .../benchmarks/resnet50/pytorch/model.py | 15 + inference/configs/host.yaml | 17 + .../configs/resnet50/configurations.yaml | 14 + inference/configs/resnet50/parameters.yaml | 2 + .../vendor_config/nvidia_configurations.yaml | 3 + .../docker_images/nvidia/nvidia_analysis.py | 14 + .../docker_images/nvidia/nvidia_monitor.py | 256 +++++++ .../nvidia/pytorch_1.13/Dockerfile | 9 + .../pytorch_1.13/pytorch1.13_install.sh | 1 + .../nvidia/pytorch_2.1/Dockerfile | 9 + .../nvidia/pytorch_2.1/pytorch2.1_install.sh | 1 + inference/inference_engine/nvidia/tensorrt.py | 129 ++++ inference/inference_engine/nvidia/torchtrt.py | 35 + inference/run.py | 628 ++++++++++++++++++ inference/run_inference.py | 161 +++++ inference/tools/__init__.py | 3 + inference/tools/config_manager.py | 73 ++ inference/tools/init_logger.py | 28 + inference/tools/torch_sync.py | 6 + inference/utils/__init__.py | 0 inference/utils/cluster_manager.py | 212 ++++++ inference/utils/container_manager.py | 223 +++++++ inference/utils/image_manager.py | 203 ++++++ inference/utils/prepare_in_container.py | 101 +++ inference/utils/run_cmd.py | 27 + inference/utils/sys_monitor.py | 286 ++++++++ 33 files changed, 2745 insertions(+) create mode 100644 inference/benchmarks/resnet50/README.md create mode 100644 inference/benchmarks/resnet50/pytorch/__init__.py create mode 100644 inference/benchmarks/resnet50/pytorch/dataloader.py create mode 100644 inference/benchmarks/resnet50/pytorch/evaluator.py create mode 100644 inference/benchmarks/resnet50/pytorch/export.py create mode 100644 inference/benchmarks/resnet50/pytorch/forward.py create mode 100644 inference/benchmarks/resnet50/pytorch/model.py create mode 100644 inference/configs/host.yaml create mode 100644 inference/configs/resnet50/configurations.yaml create mode 100644 inference/configs/resnet50/parameters.yaml create mode 100644 inference/configs/resnet50/vendor_config/nvidia_configurations.yaml create mode 100644 inference/docker_images/nvidia/nvidia_analysis.py create mode 100644 inference/docker_images/nvidia/nvidia_monitor.py create mode 100644 inference/docker_images/nvidia/pytorch_1.13/Dockerfile create mode 100644 inference/docker_images/nvidia/pytorch_1.13/pytorch1.13_install.sh create mode 100644 inference/docker_images/nvidia/pytorch_2.1/Dockerfile create mode 100644 inference/docker_images/nvidia/pytorch_2.1/pytorch2.1_install.sh create mode 100644 inference/inference_engine/nvidia/tensorrt.py create mode 100644 inference/inference_engine/nvidia/torchtrt.py create mode 100644 inference/run.py create mode 100644 inference/run_inference.py create mode 100644 inference/tools/__init__.py create mode 100644 inference/tools/config_manager.py create mode 100644 inference/tools/init_logger.py create mode 100644 inference/tools/torch_sync.py create mode 100644 inference/utils/__init__.py create mode 100644 inference/utils/cluster_manager.py create mode 100644 inference/utils/container_manager.py create mode 100644 inference/utils/image_manager.py create mode 100644 inference/utils/prepare_in_container.py create mode 100644 inference/utils/run_cmd.py create mode 100644 inference/utils/sys_monitor.py diff --git a/.gitignore b/.gitignore index 51bae239c..c361d5b64 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ __pycache__/ .pytest_cache training/result/* +inference/result/* +inference/onnxs/* diff --git a/inference/benchmarks/resnet50/README.md b/inference/benchmarks/resnet50/README.md new file mode 100644 index 000000000..88b2f9aae --- /dev/null +++ b/inference/benchmarks/resnet50/README.md @@ -0,0 +1,86 @@ +### 1. 推理数据集 +> Download website:https://image-net.org/ + +We use ImageNet2012 Validation Images: +| Dataset | FileName | Size | Checksum | +| ----------------------------- | ---------------------- | ----- | ------------------------------------- | +| Validation images (all tasks) | ILSVRC2012_img_val.tar | 6.3GB | MD5: 29b22e2961454d5413ddabcf34fc5622 | +Dataset format conversion: +https://github.com/pytorch/examples/blob/main/imagenet/extract_ILSVRC.sh + +make sure ILSVRC2012_img_train.tar & ILSVRC2012_img_val.tar are in the same directory with extract_ILSVRC.sh. +```bash +sh extract_ILSVRC.sh +``` + +preview directory structures of decompressed dataset. + +```bash +tree -d -L 1 +``` + +``` +. +├── train +└── val +``` +dataset samples size + +```bash +find ./val -name "*JPEG" | wc -l +50000 +``` + +### 2. 模型与权重 + +* 模型实现 + * pytorch:torchvision.models.resnet50 +* 权重下载 + * pytorch:https://download.pytorch.org/models/resnet50-0676ba61.pth + +### 2. 软硬件配置与运行信息参考 + +#### 2.1 Nvidia A100 + +- ##### 硬件环境 + - 机器、加速卡型号: NVIDIA_A100-SXM4-40GB + - 多机网络类型、带宽: InfiniBand,200Gb/s + +- ##### 软件环境 + - OS版本:Ubuntu 20.04 + - OS kernel版本: 5.4.0-113-generic + - 加速卡驱动版本:470.129.06 + - Docker 版本:20.10.16 + - 训练框架版本:pytorch-1.13.0a0+937e930 + - 依赖软件版本: + - cuda: 11.8 + +- 推理工具包 + + - TensorRT 8.5.1.7 + - torch_tensorrt 1.3.0 + +### 3. 运行情况 + +* 指标列表 + +| 指标名称 | 指标值索引 | 特殊说明 | +| ------------------ | ---------------- | -------------------------------------------- | +| 数据精度 | precision | 可选fp32/fp16 | +| 批尺寸 | bs | | +| 硬件存储使用 | mem | 通常称为“显存”,单位为GiB | +| 端到端时间 | e2e_time | 总时间+Perf初始化等时间 | +| 验证总吞吐量 | p_val_whole | 实际验证图片数除以总验证时间 | +| 验证计算吞吐量 | \*p_val_core | 不包含IO部分耗时 | +| 推理总吞吐量 | p_infer_whole | 实际推理图片数除以总推理时间 | +| **推理计算吞吐量** | **\*p_infer_core** | 不包含IO部分耗时 | +| 推理结果 | acc(推理/验证) | 单位为top1分类准确率(acc1) | + +* 指标值 + +| 推理工具 | precision | bs | e2e_time | p_val_whole | \*p_val_core | p_infer_whole | \*p_infer_core | acc | mem | +| ----------- | --------- | ---- | -------- | ----------- | ---------- | ------------- | ------------ | ----------- | ---------- | +| tensorrt | fp16 | 256 | 613.4 | 1358.9 | 4263.3 | 1391.4 | 12406.0 | 76.2/76.2 | 19.7/40.0 | +| tensorrt | fp32 | 256 | 474.4 | 1487.3 | 2653.2 | 1560.3 | 6091.6 | 76.2/76.2 | 28.86/40.0 | +| torchtrt | fp16 | 256 | 716.4 | 1370.4 | 4282.6 | 1320.0 | 4723.0 | 76.2/76.2 | 9.42/40.0 | + diff --git a/inference/benchmarks/resnet50/pytorch/__init__.py b/inference/benchmarks/resnet50/pytorch/__init__.py new file mode 100644 index 000000000..1f6cdf49b --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/__init__.py @@ -0,0 +1,5 @@ +from .dataloader import build_dataloader +from .model import create_model +from .export import export_model +from .evaluator import evaluator +from .forward import model_forward, engine_forward diff --git a/inference/benchmarks/resnet50/pytorch/dataloader.py b/inference/benchmarks/resnet50/pytorch/dataloader.py new file mode 100644 index 000000000..d08453f1e --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/dataloader.py @@ -0,0 +1,49 @@ +import torchvision as tv +from torch.utils.data import DataLoader as dl +import torch +import tqdm + + +def build_dataset(config): + crop = 256 + c_crop = 224 + mean = (0.485, 0.456, 0.406) + std = (0.229, 0.224, 0.225) + + if config.fp16: + + class ToFloat16(object): + + def __call__(self, tensor): + return tensor.to(dtype=torch.float16) + + tx = tv.transforms.Compose([ + tv.transforms.Resize(crop), + tv.transforms.CenterCrop(c_crop), + tv.transforms.ToTensor(), + ToFloat16(), + tv.transforms.Normalize(mean=mean, std=std), + ]) + dataset = tv.datasets.ImageFolder(config.data_dir, tx) + else: + tx = tv.transforms.Compose([ + tv.transforms.Resize(crop), + tv.transforms.CenterCrop(c_crop), + tv.transforms.ToTensor(), + tv.transforms.Normalize(mean=mean, std=std), + ]) + dataset = tv.datasets.ImageFolder(config.data_dir, tx) + + return dataset + + +def build_dataloader(config): + dataset = build_dataset(config) + loader = dl(dataset, + batch_size=config.batch_size, + shuffle=False, + drop_last=True, + num_workers=config.num_workers, + pin_memory=True) + + return loader diff --git a/inference/benchmarks/resnet50/pytorch/evaluator.py b/inference/benchmarks/resnet50/pytorch/evaluator.py new file mode 100644 index 000000000..5481c5e5b --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/evaluator.py @@ -0,0 +1,10 @@ +def topk(output, target, ks=(1, )): + _, pred = output.topk(max(ks), 1, True, True) + pred = pred.t() + correct = pred.eq(target.view(1, -1).expand_as(pred)) + return [correct[:k].max(0)[0] for k in ks] + + +def evaluator(pred, ground_truth): + top1, top5 = topk(pred, ground_truth, ks=(1, 5)) + return top1 diff --git a/inference/benchmarks/resnet50/pytorch/export.py b/inference/benchmarks/resnet50/pytorch/export.py new file mode 100644 index 000000000..3df1a821b --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/export.py @@ -0,0 +1,34 @@ +import torch +import os + + +def export_model(model, config): + if config.exist_onnx_path is not None: + return config.exist_onnx_path + + filename = config.case + "_bs" + str(config.batch_size) + filename = filename + "_" + str(config.framework) + filename = filename + "_fp16" + str(config.fp16) + filename = "onnxs/" + filename + ".onnx" + onnx_path = config.perf_dir + "/" + filename + + dummy_input = torch.randn(config.batch_size, 3, 224, 224) + + if config.fp16: + dummy_input = dummy_input.half() + dummy_input = dummy_input.cuda() + + dir_onnx_path = os.path.dirname(onnx_path) + os.makedirs(dir_onnx_path, exist_ok=True) + + with torch.no_grad(): + torch.onnx.export(model, + dummy_input, + onnx_path, + verbose=False, + input_names=["input"], + output_names=["output"], + training=torch.onnx.TrainingMode.EVAL, + do_constant_folding=True) + + return onnx_path diff --git a/inference/benchmarks/resnet50/pytorch/forward.py b/inference/benchmarks/resnet50/pytorch/forward.py new file mode 100644 index 000000000..5619760df --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/forward.py @@ -0,0 +1,103 @@ +from loguru import logger +import torch +import numpy as np +import time +from tools import torch_sync + + +def cal_perf(config, dataloader_len, duration, core_time, str_prefix): + model_forward_perf = config.repeat * dataloader_len * config.batch_size / duration + logger.info(str_prefix + "(" + config.framework + ") Perf: " + + str(model_forward_perf) + " ips") + model_forward_core_perf = config.repeat * dataloader_len * config.batch_size / core_time + logger.info(str_prefix + "(" + config.framework + ") core Perf: " + + str(model_forward_core_perf) + " ips") + return round(model_forward_perf, 3), round(model_forward_core_perf, 3) + + +def model_forward(model, dataloader, evaluator, config): + if config.no_validation: + return None, None, None + start = time.time() + core_time = 0.0 + acc = [] + + for times in range(config.repeat): + + logger.debug("Repeat: " + str(times + 1)) + + all_top1 = [] + for step, (x, y) in enumerate(dataloader): + torch_sync(config) + core_time_start = time.time() + + if step % config.log_freq == 0: + logger.debug("Step: " + str(step) + " / " + + str(len(dataloader))) + + with torch.no_grad(): + + x = x.cuda() + y = y.cuda() + pred = model(x) + torch_sync(config) + core_time += time.time() - core_time_start + + top1 = evaluator(pred, y) + + all_top1.extend(top1.cpu()) + + acc.append(np.mean(all_top1)) + + logger.info("Top1 Acc: " + str(acc)) + + duration = time.time() - start + model_forward_perf, model_forward_core_perf = cal_perf( + config, len(dataloader), duration, core_time, "Validation") + + return model_forward_perf, model_forward_core_perf, round( + float(np.mean(acc)), 3) + + +def engine_forward(model, dataloader, evaluator, config): + start = time.time() + core_time = 0.0 + foo_time = 0.0 + acc = [] + + for times in range(config.repeat): + + logger.debug("Repeat: " + str(times + 1)) + + all_top1 = [] + for step, (x, y) in enumerate(dataloader): + torch_sync(config) + core_time_start = time.time() + + if step % config.log_freq == 0: + logger.debug("Step: " + str(step) + " / " + + str(len(dataloader))) + + with torch.no_grad(): + + outputs = model([x]) + pred = outputs[0][0] + foo_time += outputs[1] + pred = pred.float() + torch_sync(config) + core_time += time.time() - core_time_start + + top1 = evaluator(pred, y) + + all_top1.extend(top1.cpu()) + + acc.append(np.mean(all_top1)) + + logger.info("Top1 Acc: " + str(acc)) + + duration = time.time() - start - foo_time + model_forward_perf, model_forward_core_perf = cal_perf( + config, len(dataloader), duration, core_time - foo_time, "Inference") + + return model_forward_perf, model_forward_core_perf, round( + float(np.mean(acc)), 3) diff --git a/inference/benchmarks/resnet50/pytorch/model.py b/inference/benchmarks/resnet50/pytorch/model.py new file mode 100644 index 000000000..a7ae29b0a --- /dev/null +++ b/inference/benchmarks/resnet50/pytorch/model.py @@ -0,0 +1,15 @@ +from torchvision.models import resnet50 +from torchvision.models import ResNet50_Weights as w + + +def create_model(config): + if config.no_validation: + assert config.exist_onnx_path is not None + return None + model = resnet50(weights=w.IMAGENET1K_V1) + model.cuda() + model.eval() + if config.fp16: + model.half() + + return model diff --git a/inference/configs/host.yaml b/inference/configs/host.yaml new file mode 100644 index 000000000..feaa4f7ee --- /dev/null +++ b/inference/configs/host.yaml @@ -0,0 +1,17 @@ +FLAGPERF_PATH: "/home/FlagPerf/inference" +FLAGPERF_LOG_PATH: "result" +VENDOR: "nvidia" +MODEL: "resnet50" +FLAGPERF_LOG_LEVEL: "INFO" +LOG_CALL_INFORMATION: True +HOSTS: ["127.0.0.1"] +SSH_PORT: "22" +HOSTS_PORTS: ["2222"] +MASTER_PORT: "29501" +SHM_SIZE: "32G" +ACCE_CONTAINER_OPT: " --gpus all" +PIP_SOURCE: "https://mirror.baidu.com/pypi/simple" +CLEAR_CACHES: True +ACCE_VISIBLE_DEVICE_ENV_NAME: "CUDA_VISIBLE_DEVICES" +CASES: + "resnet50:pytorch_1.13": "/raid/dataset/ImageNet/imagenet/val" \ No newline at end of file diff --git a/inference/configs/resnet50/configurations.yaml b/inference/configs/resnet50/configurations.yaml new file mode 100644 index 000000000..fa1739983 --- /dev/null +++ b/inference/configs/resnet50/configurations.yaml @@ -0,0 +1,14 @@ +batch_size: 256 +# 3*224*224(1 item in x) +input_size: 150528 +fp16: true +compiler: tensorrt +num_workers: 8 +log_freq: 30 +repeat: 5 +# skip validation(will also skip create_model, export onnx). Assert exist_onnx_path != null +no_validation: false +# set a real onnx_path to use exist, or set it to anything but null to avoid export onnx manually(like torch-tensorrt) +exist_onnx_path: null +# set a exist path of engine file like resnet50.trt/resnet50.plan/resnet50.engine +exist_compiler_path: null \ No newline at end of file diff --git a/inference/configs/resnet50/parameters.yaml b/inference/configs/resnet50/parameters.yaml new file mode 100644 index 000000000..b9a42d4c2 --- /dev/null +++ b/inference/configs/resnet50/parameters.yaml @@ -0,0 +1,2 @@ +# contain case-specified parameters, like max_seq_length in BERT. +# There is no parameters for resnet50 case. \ No newline at end of file diff --git a/inference/configs/resnet50/vendor_config/nvidia_configurations.yaml b/inference/configs/resnet50/vendor_config/nvidia_configurations.yaml new file mode 100644 index 000000000..d7c19087f --- /dev/null +++ b/inference/configs/resnet50/vendor_config/nvidia_configurations.yaml @@ -0,0 +1,3 @@ +trt_tmp_path: nvidia_tmp/resnet50.trt +has_dynamic_axis: false +torchtrt_full_compile: true \ No newline at end of file diff --git a/inference/docker_images/nvidia/nvidia_analysis.py b/inference/docker_images/nvidia/nvidia_analysis.py new file mode 100644 index 000000000..26132d19d --- /dev/null +++ b/inference/docker_images/nvidia/nvidia_analysis.py @@ -0,0 +1,14 @@ +def analysis_log(logpath): + logfile = open(logpath) + + max_usage = 0.0 + max_mem = 0.0 + for line in logfile.readlines(): + if "MiB" in line: + usage = line.split(" ")[2] + usage = float(usage[:-3]) + max_usage = max(max_usage, usage) + max_mem = line.split(" ")[3] + max_mem = float(max_mem[:-3]) + + return round(max_usage / 1024.0, 2), round(max_mem / 1024.0, 2) diff --git a/inference/docker_images/nvidia/nvidia_monitor.py b/inference/docker_images/nvidia/nvidia_monitor.py new file mode 100644 index 000000000..f679fc4d0 --- /dev/null +++ b/inference/docker_images/nvidia/nvidia_monitor.py @@ -0,0 +1,256 @@ +# !/usr/bin/env python3 +# encoding: utf-8 +''' +Usage: python3 sys-monitor.py -o operation -l [log_path] + -o, --operation start|stop|restart|status + -l, --log log path , ./logs/ default +''' + +import os +import sys +import time +import signal +import atexit +import argparse +import datetime +from multiprocessing import Process +import subprocess +import schedule + + +class Daemon: + ''' + daemon subprocess class. + usage: subclass this daemon and override the run() method. + sys-monitor.pid: in the /tmp/, auto del when unexpected exit. + verbose: debug mode, disabled default. + ''' + + def __init__(self, + pid_file, + log_file, + err_file, + gpu_log, + log_path, + rate=5, + stdin=os.devnull, + stdout=os.devnull, + stderr=os.devnull, + home_dir='.', + umask=0o22, + verbose=0): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.home_dir = home_dir + self.verbose = verbose + self.pidfile = pid_file + self.logfile = log_file + self.errfile = err_file + self.gpufile = gpu_log + self.logpath = log_path + self.rate = rate + self.umask = umask + self.verbose = verbose + self.daemon_alive = True + + def get_pid(self): + try: + with open(self.pidfile, 'r') as pf: + pid = int(pf.read().strip()) + except IOError: + pid = None + except SystemExit: + pid = None + return pid + + def del_pid(self): + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + + def run(self): + ''' + NOTE: override the method in subclass + ''' + + def gpu_mon(file): + TIMESTAMP = datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') + cmd = "nvidia-smi |grep 'Default'|awk '{print $3,$5,$9,$11,$13}'" + process = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding='utf-8') + try: + out = process.communicate(timeout=10) + except subprocess.TimeoutExpired: + process.kill() + out = process.communicate() + + if process.returncode != 0: + result = "error" + result = TIMESTAMP + "\n" + out[0] + "\n" + with open(file, 'a') as f: + f.write(result) + + def timer_gpu_mon(): + gpu_process = Process(target=gpu_mon, args=(self.gpufile, )) + gpu_process.start() + + schedule.every(self.rate).seconds.do(timer_gpu_mon) + while True: + schedule.run_pending() + time.sleep(5) + + def daemonize(self): + if self.verbose >= 1: + print('daemon process starting ...') + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #1 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + os.chdir(self.home_dir) + os.setsid() + os.umask(self.umask) + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #2 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + sys.stdout.flush() + sys.stderr.flush() + si = open(self.stdin, 'r') + so = open(self.stdout, 'a+') + if self.stderr: + se = open(self.stderr, 'a+') + else: + se = so + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + atexit.register(self.del_pid) + pid = str(os.getpid()) + with open(self.pidfile, 'w+') as f: + f.write('%s\n' % pid) + + def start(self): + if not os.path.exists(self.logpath): + os.makedirs(self.logpath) + elif os.path.exists(self.gpufile): + os.remove(self.gpufile) + if self.verbose >= 1: + print('ready to start ......') + # check for a pid file to see if the daemon already runs + pid = self.get_pid() + if pid: + msg = 'pid file %s already exists, is it already running?\n' + sys.stderr.write(msg % self.pidfile) + sys.exit(1) + # start the daemon + self.daemonize() + self.run() + + def stop(self): + if self.verbose >= 1: + print('stopping ...') + pid = self.get_pid() + if not pid: + msg = 'pid file [%s] does not exist. Not running?\n' % self.pidfile + sys.stderr.write(msg) + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + return + # try to kill the daemon process + try: + i = 0 + while 1: + os.kill(pid, signal.SIGTERM) + time.sleep(1) + i = i + 1 + if i % 10 == 0: + os.kill(pid, signal.SIGHUP) + except OSError as err: + err = str(err) + if err.find('No such process') > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print(str(err)) + sys.exit(1) + if self.verbose >= 1: + print('Stopped!') + + def restart(self): + self.stop() + self.start() + + def status(self): + pid = self.get_pid() + if pid: + if os.path.exists('/proc/%d' % pid): + return pid + return False + + +def parse_args(): + ''' Check script input parameter. ''' + parse = argparse.ArgumentParser(description='Sys monitor script') + parse.add_argument('-o', + type=str, + metavar='[operation]', + required=True, + help='start|stop|restart|status') + parse.add_argument('-l', + type=str, + metavar='[log_path]', + required=False, + default='./logs/', + help='log path') + args = parse.parse_args() + return args + + +def main(): + sample_rate1 = 5 + args = parse_args() + operation = args.o + log_path = args.l + pid_fn = str('/tmp/gpu_monitor.pid') + log_fn = str(log_path + '/nvidia_monitor.log') + err_fn = str(log_path + '/nvidia_monitor.err') + # result for gpu + gpu_fn = str(log_path + '/nvidia_monitor.log') + + subdaemon = Daemon(pid_fn, + log_fn, + err_fn, + gpu_fn, + log_path, + verbose=1, + rate=sample_rate1) + if operation == 'start': + subdaemon.start() + elif operation == 'stop': + subdaemon.stop() + elif operation == 'restart': + subdaemon.restart() + elif operation == 'status': + pid = subdaemon.status() + if pid: + print('process [%s] is running ......' % pid) + else: + print('daemon process [%s] stopped' % pid) + else: + print("invalid argument!") + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/inference/docker_images/nvidia/pytorch_1.13/Dockerfile b/inference/docker_images/nvidia/pytorch_1.13/Dockerfile new file mode 100644 index 000000000..c22e047c0 --- /dev/null +++ b/inference/docker_images/nvidia/pytorch_1.13/Dockerfile @@ -0,0 +1,9 @@ +FROM nvcr.io/nvidia/pytorch:22.11-py3 +RUN /bin/bash -c "pip config set global.index-url https://mirror.baidu.com/pypi/simple" +RUN /bin/bash -c "uname -a" +RUN /bin/bash -c alias python3=python +RUN apt-get update +RUN pip3 install loguru +RUN pip3 install pycuda +RUN pip3 install schedule +RUN pip3 install munch diff --git a/inference/docker_images/nvidia/pytorch_1.13/pytorch1.13_install.sh b/inference/docker_images/nvidia/pytorch_1.13/pytorch1.13_install.sh new file mode 100644 index 000000000..cc1f786e8 --- /dev/null +++ b/inference/docker_images/nvidia/pytorch_1.13/pytorch1.13_install.sh @@ -0,0 +1 @@ +#!/bin/bash \ No newline at end of file diff --git a/inference/docker_images/nvidia/pytorch_2.1/Dockerfile b/inference/docker_images/nvidia/pytorch_2.1/Dockerfile new file mode 100644 index 000000000..59f335ee8 --- /dev/null +++ b/inference/docker_images/nvidia/pytorch_2.1/Dockerfile @@ -0,0 +1,9 @@ +FROM nvcr.io/nvidia/pytorch:23.06-py3 +RUN /bin/bash -c "pip config set global.index-url https://mirror.baidu.com/pypi/simple" +RUN /bin/bash -c "uname -a" +RUN /bin/bash -c alias python3=python +RUN apt-get update +RUN pip3 install loguru +RUN pip3 install pycuda +RUN pip3 install schedule +RUN pip3 install munch diff --git a/inference/docker_images/nvidia/pytorch_2.1/pytorch2.1_install.sh b/inference/docker_images/nvidia/pytorch_2.1/pytorch2.1_install.sh new file mode 100644 index 000000000..cc1f786e8 --- /dev/null +++ b/inference/docker_images/nvidia/pytorch_2.1/pytorch2.1_install.sh @@ -0,0 +1 @@ +#!/bin/bash \ No newline at end of file diff --git a/inference/inference_engine/nvidia/tensorrt.py b/inference/inference_engine/nvidia/tensorrt.py new file mode 100644 index 000000000..fb215ba17 --- /dev/null +++ b/inference/inference_engine/nvidia/tensorrt.py @@ -0,0 +1,129 @@ +import os +import torch +from torch import autocast +import tensorrt as trt + +trt.init_libnvinfer_plugins(None, "") +import numpy as np +import pycuda.driver as cuda +import pycuda.autoinit +import time +import subprocess + + +class InferModel: + + class HostDeviceMem(object): + + def __init__(self, host_mem, device_mem): + self.host = host_mem + self.device = device_mem + + def __str__(self): + return "Host:\n" + str(self.host) + "\nDevice:\n" + str( + self.device) + + def __repr__(self): + return self.__str__() + + def __init__(self, config, onnx_path, model): + self.logger = trt.Logger(trt.Logger.WARNING) + self.runtime = trt.Runtime(self.logger) + + self.engine = self.build_engine(config, onnx_path) + + self.inputs, self.outputs, self.bindings, self.stream = self.allocate_buffers( + self.engine) + + self.context = self.engine.create_execution_context() + self.numpy_to_torch_dtype_dict = { + bool: torch.bool, + np.uint8: torch.uint8, + np.int8: torch.int8, + np.int16: torch.int16, + np.int32: torch.int32, + np.int64: torch.int64, + np.float16: torch.float16, + np.float32: torch.float32, + np.float64: torch.float64, + np.complex64: torch.complex64, + np.complex128: torch.complex128, + } + + def build_engine(self, config, onnx_path): + if config.exist_compiler_path is None: + trt_path = config.log_dir + "/" + config.trt_tmp_path + + dir_trt_path = os.path.dirname(trt_path) + os.makedirs(dir_trt_path, exist_ok=True) + + time.sleep(10) + + trtexec_cmd = "trtexec --onnx=" + onnx_path + " --saveEngine=" + trt_path + if config.fp16: + trtexec_cmd += " --fp16" + if config.has_dynamic_axis: + trtexec_cmd += " --minShapes=" + config.minShapes + trtexec_cmd += " --optShapes=" + config.optShapes + trtexec_cmd += " --maxShapes=" + config.maxShapes + + p = subprocess.Popen(trtexec_cmd, shell=True) + p.wait() + else: + trt_path = config.exist_compiler_path + + with open(trt_path, "rb") as f: + return self.runtime.deserialize_cuda_engine(f.read()) + + def allocate_buffers(self, engine): + inputs = [] + outputs = [] + bindings = [] + stream = cuda.Stream() + + for binding in engine: + size = trt.volume( + engine.get_binding_shape(binding)) * engine.max_batch_size + dtype = trt.nptype(engine.get_binding_dtype(binding)) + + host_mem = cuda.pagelocked_empty(size, dtype) + device_mem = cuda.mem_alloc(host_mem.nbytes) + bindings.append(int(device_mem)) + + if engine.binding_is_input(binding): + inputs.append(self.HostDeviceMem(host_mem, device_mem)) + else: + outputs.append(self.HostDeviceMem(host_mem, device_mem)) + + return inputs, outputs, bindings, stream + + def __call__(self, model_inputs: list): + + batch_size = np.unique(np.array([i.size(dim=0) for i in model_inputs])) + batch_size = batch_size[0] + + for i, model_input in enumerate(model_inputs): + binding_name = self.engine[i] + binding_dtype = trt.nptype( + self.engine.get_binding_dtype(binding_name)) + model_input = model_input.to( + self.numpy_to_torch_dtype_dict[binding_dtype]) + + cuda.memcpy_dtod_async( + self.inputs[i].device, + model_input.data_ptr(), + model_input.element_size() * model_input.nelement(), + self.stream, + ) + + self.context.execute_async_v2(bindings=self.bindings, + stream_handle=self.stream.handle) + for out in self.outputs: + cuda.memcpy_dtoh_async(out.host, out.device, self.stream) + + self.stream.synchronize() + + return [ + torch.from_numpy(out.host.reshape(batch_size, -1)) + for out in self.outputs + ], 0 diff --git a/inference/inference_engine/nvidia/torchtrt.py b/inference/inference_engine/nvidia/torchtrt.py new file mode 100644 index 000000000..9500dbfcf --- /dev/null +++ b/inference/inference_engine/nvidia/torchtrt.py @@ -0,0 +1,35 @@ +import os +import torch +import torch_tensorrt as torchtrt +import time + + +class InferModel: + + def __init__(self, config, onnx_path, model): + self.config = config + self.origin_model = model + self.traced_model = None + self.trt_model = None + self.full_compile = config.torchtrt_full_compile + + def __call__(self, model_inputs: list): + start = time.time() + model_cuda_inputs = [] + for item in model_inputs: + model_cuda_inputs.append(item.cuda()) + + if self.traced_model is None: + self.traced_model = torch.jit.trace(self.origin_model, + model_cuda_inputs) + self.trt_model = torchtrt.compile( + self.traced_model, + inputs=model_cuda_inputs, + truncate_long_and_double=True, + enabled_precisions={torch.float32, torch.float16}, + require_full_compilation=self.full_compile) + + compile_foo_time = time.time() - start + + model_outputs = self.trt_model(*model_cuda_inputs) + return [model_outputs], compile_foo_time diff --git a/inference/run.py b/inference/run.py new file mode 100644 index 000000000..d452d83e6 --- /dev/null +++ b/inference/run.py @@ -0,0 +1,628 @@ +# Copyright (c) 2023 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- +''' TODO Copyright and Other info ''' + +import os +import sys +import ast +import time +import yaml +import importlib +from munch import DefaultMunch +import getpass +from loguru import logger +from collections import namedtuple + +from utils import cluster_manager, image_manager + +VERSION = "v0.1" +CLUSTER_MGR = cluster_manager.ClusterManager() + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) + + +def print_welcome_msg(): + '''Print colorful welcome message to console.''' + logger.log( + "Welcome", + "\033[1;34;40m==============================================\033[0m") + logger.log("Welcome", + "\033[1;36;40m Welcome to FlagPerf Inference!\033[0m") + logger.log( + "Welcome", + "\033[1;36;40m See more at https://github.com/FlagOpen/FlagPerf \033[0m" + ) + logger.log( + "Welcome", + "\033[1;34;40m==============================================\033[0m") + + +def init_logger(config): + logger.remove() + """ + define "EVENTS", using logger.log("EVENT",msg) to log + #21 means just important than info(#20), less than warning(#30) + """ + logger.level("Welcome", no=21) + + timestamp_log_dir = "run" + time.strftime("%Y%m%d%H%M%S", time.localtime()) + curr_log_path = config.FLAGPERF_PATH + "/" + config.FLAGPERF_LOG_PATH + "/" + timestamp_log_dir + logfile = curr_log_path + "/host.out.log" + + logger.remove() + + if config.LOG_CALL_INFORMATION: + logger.add(logfile, level=config.FLAGPERF_LOG_LEVEL) + logger.add(sys.stdout, level=config.FLAGPERF_LOG_LEVEL) + else: + logger.add(logfile, + level=config.FLAGPERF_LOG_LEVEL, + format="{time} - {level} - {message}") + logger.add(sys.stdout, + level=config.FLAGPERF_LOG_LEVEL, + format="{time} - {level} - {message}") + return curr_log_path + + +def usage(): + ''' Show usage and exit with exit_code. ''' + print("Usage: python3 ", __file__) + print("Edit config file test_conf.py & cluster_conf.py in " + "training/run_benchmarks/config and run.") + sys.exit(0) + + +def check_cluster_health(): + ''' Try to ssh login to all the hosts in cluster_conf.hosts. + Return None if everything goes well. + ''' + logger.debug("Cluster healthcheck ssh. Hosts are: " + + ",".join(CLUSTER_MGR.get_hosts_list())) + bad_hosts = CLUSTER_MGR.healthcheck() + if len(bad_hosts) != 0: + for bad_host in bad_hosts: + logger.error("Check " + bad_host + " failed. ssh command exit " + "with: " + str(bad_hosts[bad_host])) + logger.error("Check hosts in the cluster......[FAILED] [EXIT]") + sys.exit(3) + logger.info("Check hosts in the cluster......[SUCCESS]") + + +def _get_deploy_path(config): + '''Return deploy path according to FLAGPERF_LOG_PATH_HOST in host.yaml.''' + if 'FLAGPERF_PATH' not in config.__dict__.keys() \ + or config.FLAGPERF_PATH is None: + dp_path = CURR_PATH + else: + dp_path = os.path.abspath(config.FLAGPERF_PATH) + return dp_path + + +def check_cluster_deploy_path(dp_path): + '''Make sure that flagperf is deployed on all the hosts + ''' + logger.debug("Check flagperf deployment path: " + dp_path) + bad_hosts = CLUSTER_MGR.run_command_all_hosts("cd " + dp_path) + if len(bad_hosts) != 0: + logger.error("Hosts that can't find deployed path: " + + ",".join(bad_hosts.keys())) + logger.error("Check cluster deploy path " + dp_path + + "......[FAILED] [EXIT]") + sys.exit(3) + logger.info("Check flagperf deployment path: " + dp_path + "...[SUCCESS]") + + +def check_test_host_config(config): + ''' Check test config. + Make sure all CASES are configed. + ''' + logger.debug("Check config in host.yaml") + must_para = [ + 'FLAGPERF_LOG_PATH', 'FLAGPERF_LOG_PATH', 'VENDOR', + 'FLAGPERF_LOG_LEVEL', 'HOSTS', 'SSH_PORT', 'HOSTS_PORTS', + 'MASTER_PORT', 'SHM_SIZE', 'ACCE_CONTAINER_OPT', 'PIP_SOURCE', + 'CLEAR_CACHES', 'ACCE_VISIBLE_DEVICE_ENV_NAME', 'CASES' + ] + + for para in must_para: + if para not in config.__dict__.keys(): + logger.error(f"{para} MUST be set in host.yaml...[EXIT]") + sys.exit(2) + logger.info("Check host.yaml...[SUCCESS]") + + +def check_case_config(case, case_config, vendor): + '''Check config of the testcase. Make sure its path exists, framework is + right and config file exists. + ''' + logger.debug("Check config of test case: " + case) + must_configs = [ + "model", "framework", "data_dir_host", "data_dir_container" + ] + for config_item in case_config.keys(): + if config_item in must_configs: + must_configs.remove(config_item) + if len(must_configs) > 0: + logger.warning("Case " + case + " misses some config items: " + + ",".join(must_configs)) + return False + logger.debug("Check config of test case: " + case + " ...[SUCCESS]") + return True + + +def prepare_docker_image_cluster(dp_path, image_mgr, framework, nnodes, + config): + '''Prepare docker image in registry and in the cluster. + ''' + vendor = config.VENDOR + image_vendor_dir = os.path.join( + CURR_PATH, "docker_images/" + vendor + "/" + framework) + image_name = image_mgr.repository + ":" + image_mgr.tag + logger.debug("Prepare docker image in cluster. image_name=" + image_name + + " image_vendor_dir=" + image_vendor_dir) + prepare_image_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/image_manager.py -o build -i " \ + + image_mgr.repository + " -t " + image_mgr.tag \ + + " -d " + image_vendor_dir + " -f " + framework + timeout = 1200 + logger.debug("Run cmd in the cluster to prepare docker image: " + + prepare_image_cmd + " timeout=" + str(timeout)) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(prepare_image_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't pull image: " + + ",".join(bad_hosts.keys())) + return False + return True + + +def prepare_running_env(dp_path, container_name, case_config): + '''Install extensions and setup env before start task in container. + ''' + nnodes = case_config["nnodes"] + model = case_config["model"] + framework = case_config["framework"] + prepare_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runcmdin -c " \ + + container_name + " -t 1800 -r \"python3 " \ + + config.FLAGPERF_PATH + "/" \ + + "/utils/prepare_in_container.py --framework " \ + + framework + " --model " + model + " --vendor " \ + + config.VENDOR + " --pipsource " + config.PIP_SOURCE + "\"" + timeout = 1800 + logger.debug("Run cmd in the cluster to prepare running environment: " + + prepare_cmd + " timeout=" + str(timeout)) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(prepare_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't prepare running environment " + + "properly: " + ",".join(bad_hosts.keys())) + return False + return True + + +def start_container_in_cluster(dp_path, run_args, container_name, image_name, + nnodes): + '''Call CLUSTER_MGR tool to start containers.''' + start_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runnew " \ + + " -c " + container_name + " -i " + image_name + " -a \"" \ + + run_args + "\"" + logger.debug("Run cmd in the cluster to start container: " + start_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(start_cmd, nnodes, 600) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start docker container: " + + ",".join(bad_hosts.keys())) + return False + return True + + +def stop_container_in_cluster(dp_path, container_name, nnodes): + '''Call CLUSTER_MGR tool to stop containers.''' + stop_cont_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o stop" \ + + " -c " + container_name + logger.debug("Run cmd to stop container(s) in the cluster:" + + stop_cont_cmd) + failed_hosts = CLUSTER_MGR.run_command_some_hosts(stop_cont_cmd, nnodes, + 60) + if len(failed_hosts) != 0: + logger.warning("Hosts that stop container " + container_name + + " failed:" + ",".join(failed_hosts.keys()) + + " Continue.") + return False + logger.info("All containers stoped in the cluster") + return True + + +def clear_caches_cluster(clear, nnodes): + '''Set vm.drop to clean the system caches.''' + if not clear: + logger.info("Caches clear config is NOT set.") + return + + clear_cmd = "sync && sudo /sbin/sysctl vm.drop_caches=3" + timeout = 30 + logger.debug("Run cmd in the cluster to clear the system cache: " + + clear_cmd + " timeout=" + str(timeout)) + failed_hosts = CLUSTER_MGR.run_command_some_hosts(clear_cmd, nnodes, + timeout) + if len(failed_hosts) != 0: + logger.warning("Hosts that clear cache failed: " + + ",".join(failed_hosts.keys()) + ". Continue.") + logger.info("Clear system caches if it set......[SUCCESS]") + + +def start_monitors_in_cluster(dp_path, case_log_dir, nnodes): + '''Start sytem and vendor's monitors.''' + start_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/sys_monitor.py -o restart -l " + timeout = 60 + logger.debug("Run cmd in the cluster to start system monitors: " + + start_mon_cmd) + bad_hosts = CLUSTER_MGR.start_monitors_some_hosts(start_mon_cmd, + case_log_dir, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start system monitors: " + + ",".join(bad_hosts.keys())) + + ven_mon_path = os.path.join(dp_path, "docker_images", config.VENDOR, + config.VENDOR + "_monitor.py") + start_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " " + ven_mon_path + " -o restart -l " + logger.debug("Run cmd in the cluster to start vendor's monitors: " + + start_mon_cmd) + bad_hosts = CLUSTER_MGR.start_monitors_some_hosts(start_mon_cmd, + case_log_dir, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't start vendor's monitors: " + + ",".join(bad_hosts.keys())) + + +def stop_monitors_in_cluster(dp_path, nnodes): + '''Stop sytem and vendor's monitors.''' + stop_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/sys_monitor.py -o stop" + timeout = 60 + logger.debug("Run cmd in the cluster to stop system monitors: " + + stop_mon_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(stop_mon_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't stop system monitors: " + + ",".join(bad_hosts.keys())) + + ven_mon_path = os.path.join(dp_path, "docker_images", config.VENDOR, + config.VENDOR + "_monitor.py") + stop_mon_cmd = "cd " + dp_path + " && " + sys.executable \ + + " " + ven_mon_path + " -o stop" + logger.debug("Run cmd in the cluster to stop vendor's monitors: " + + stop_mon_cmd) + bad_hosts = CLUSTER_MGR.run_command_some_hosts(stop_mon_cmd, nnodes, + timeout) + if len(bad_hosts) != 0: + logger.error("Hosts that can't stop vendor's monitors: " + + ",".join(bad_hosts.keys())) + + +def start_tasks_in_cluster(dp_path, container_name, case_config, curr_log_path, + config): + '''Start tasks in cluster, and NOT wait.''' + nnodes = case_config["nnodes"] + env_file = os.path.join( + config.FLAGPERF_PATH, config.VENDOR, + case_config["model"] + "-" + case_config["framework"], + "config/environment_variables.sh") + + must_configs = [ + "FLAGPERF_PATH", "FLAGPERF_LOG_PATH", "MODEL", "VENDOR", + "FLAGPERF_LOG_LEVEL" + ] + new_case_config = {"DATA_DIR": case_config["data_dir_container"]} + new_case_config = {"FLAGPERF_LOG_PATH": curr_log_path} + + for cfg in must_configs: + new_case_config[cfg] = getattr(config, cfg) + + start_cmd = "cd " + dp_path + " && " + sys.executable \ + + " utils/container_manager.py -o runcmdin -c " \ + + container_name + " -r \"" \ + + f"python3 run_inference.py" \ + + f" --perf_dir " + getattr(config, "FLAGPERF_PATH") \ + + f" --loglevel " + getattr(config, "FLAGPERF_LOG_LEVEL") \ + + f" --vendor " + getattr(config, "VENDOR") \ + + f" --case " + getattr(config, "MODEL") \ + + f" --data_dir " + case_config["data_dir_container"] \ + + f" --framework " + case_config["framework"] \ + + f" --log_dir " + curr_log_path + " 2>&1 | tee "+curr_log_path+"/stdout_err.out.log" + "\"" + logger.debug("Run cmd in the cluster to start tasks, cmd: " + start_cmd) + + logger.info("3) Waiting for tasks end in the cluster...") + logger.info("Check task log in real time from container: " + + curr_log_path + "/container.out.log") + logger.info("Check task stderr & stdout in real time from container: " + + curr_log_path + "/stdout_err.out.log") + CLUSTER_MGR.run_command_some_hosts_distribution_info(start_cmd, nnodes, 15) + # Wait a moment for starting tasks. + time.sleep(10) + + +def wait_for_finish(dp_path, container_name, pid_file_path, nnodes): + '''wait all the processes of start_xxx_task.py finished. + ''' + # Aussme pid of start_xxx_task.py won't loop in a short time. + check_cmd = "cd " + dp_path + "; " + sys.executable \ + + " utils/container_manager.py -o pidrunning -c " \ + + container_name + " -f " + pid_file_path + + logger.debug("Run cmd to check whether the training tasks is running: " + + check_cmd) + while True: + bad_hosts = CLUSTER_MGR.run_command_some_hosts(check_cmd, + nnodes, + no_log=True) + + if len(bad_hosts) == nnodes: + break + time.sleep(10) + + +def prepare_containers_env_cluster(dp_path, case_log_dir, config, + container_name, image_name, case_config): + '''Prepare containers environments in the cluster. It will start + containers, setup environments, start monitors, and clear caches.''' + nnodes = case_config["nnodes"] + container_start_args = " --rm --init --detach --net=host --uts=host" \ + + " --ipc=host --security-opt=seccomp=unconfined" \ + + " --privileged=true --ulimit=stack=67108864" \ + + " --ulimit=memlock=-1" \ + + " -w " + config.FLAGPERF_PATH \ + + " --shm-size=" + config.SHM_SIZE \ + + " -v " + dp_path + ":" \ + + config.FLAGPERF_PATH \ + + " -v " + case_config["data_dir_host"] + ":" \ + + case_config["data_dir_container"] + if config.ACCE_CONTAINER_OPT is not None: + container_start_args += " " + config.ACCE_CONTAINER_OPT + + logger.info("a) Stop old container(s) first.") + stop_container_in_cluster(dp_path, container_name, nnodes) + logger.info("b) Start container(s) in the cluster.") + if not start_container_in_cluster(dp_path, container_start_args, + container_name, image_name, nnodes): + logger.error("b) Start container in the cluster......" + "[FAILED]. Ignore this round.") + return False + logger.info("b) Start container(s) in the cluster.......[SUCCESS]") + + logger.info("c) Prepare running environment.") + if not prepare_running_env(dp_path, container_name, case_config): + logger.error("c) Prepare running environment......" + "[FAILED]. Ignore this round.") + logger.info("Stop containers in cluster.") + stop_container_in_cluster(dp_path, container_name, nnodes) + return False + logger.info("c) Prepare running environment......[SUCCESS]") + logger.info("d) Start monitors......") + start_monitors_in_cluster(dp_path, case_log_dir, nnodes) + logger.info("e) Clear system caches if it set......") + clear_caches_cluster(config.CLEAR_CACHES, nnodes) + return True + + +def clean_containers_env_cluster(dp_path, container_name, nnodes): + '''Clean containers environments in the cluster. It will stop containers, + and stop monitors.''' + logger.info("a) Stop containers......") + stop_container_in_cluster(dp_path, container_name, nnodes) + logger.info("b) Stop monitors......") + stop_monitors_in_cluster(dp_path, nnodes) + + +def compilation_result(case_log_path, config): + '''Scp logs from hosts in the cluster to temp dir, and then merge all. + ''' + case_perf_path = os.path.join(case_log_path, "container.out.log") + vendor_usage_path = os.path.join(case_log_path, + config.VENDOR + "_monitor.log") + + case_perf = None + case_file = open(case_perf_path) + + for line in case_file.readlines(): + if "Finish Info" in line: + case_perf_str = "{" + line.split("{")[1] + case_perf = ast.literal_eval(case_perf_str) + break + + if case_perf is None: + logger.error("Case Run Failed, Please Check Log!") + return + + vendor_module = importlib.import_module("docker_images." + config.VENDOR + + "." + config.VENDOR + "_analysis") + vendor_usage, vendor_maxmem = vendor_module.analysis_log(vendor_usage_path) + + case_perf["vendor_usage(GiB)"] = vendor_usage + case_perf["vendor_max_mem(GiB)"] = vendor_maxmem + + for key in case_perf.keys(): + padding_str = str(key).ljust(43) + " : " + str( + case_perf[key]).ljust(23) + logger.info(padding_str) + + +def get_config_from_case(case, config): + '''check case is string''' + if not isinstance(case, str): + logger.error("Key in test_config.CASES must be str") + return False, None + + case_info = case.split(":") + '''check if 4+ : in case, we don't care what to put in''' + if len(case_info) < 2: + logger.error("At least 2 terms split by \":\" should in config.CASES") + logger.error("model:framework:hardware_model:nnodes:nproc:repeat") + return False, None + + case_model = case_info[0] + case_framework = case_info[1] + + case_config = {"model": case_model} + case_config["framework"] = case_framework + case_config["data_dir_host"] = config.CASES[case] + case_config["data_dir_container"] = config.CASES[case] + case_config['nnodes'] = 1 + + return True, case_config + + +def get_valid_cases(config): + '''Check case config in test_conf, return valid cases list.''' + if not isinstance(config.CASES, dict): + logger.error( + "No valid cases found in test_conf because test_config.CASES is not a dict...[EXIT]" + ) + sys.exit(4) + logger.debug("Check configs of all test cases: " + ", ".join(config.CASES)) + valid_cases = [] + cases_config_error = [] + for case in config.CASES: + rets, case_config = get_config_from_case(case, config) + if (not rets) or (not check_case_config(case, case_config, + config.VENDOR)): + cases_config_error.append(case) + continue + valid_cases.append(case) + if len(valid_cases) == 0: + logger.error("No valid cases found in test_conf...[EXIT]") + sys.exit(4) + logger.debug("Valid cases: " + ",".join(valid_cases)) + logger.debug("Invalid cases that config is error: " + + ",".join(cases_config_error)) + logger.info("Get valid cases list......[SUCCESS]") + return valid_cases + + +def prepare_case_config_cluster(dp_path, case_config, case): + '''Sync case config files in cluster.''' + logger.info("--------------------------------------------------") + logger.info("Testcase " + case + " config:") + for config_item in case_config.keys(): + logger.info(config_item + ":\t" + str(case_config[config_item])) + logger.info("--------------------------------------------------") + model = case_config["model"] + framework = case_config["framework"].split("_")[0] + config_file = case_config["config"] + ".py" + nnodes = case_config["nnodes"] + case_config_dir = os.path.join(dp_path, config.VENDOR, + model + "-" + framework, "config") + case_config_file = os.path.join(case_config_dir, config_file) + failed_hosts = CLUSTER_MGR.sync_file_to_some_hosts(case_config_file, + case_config_dir, nnodes) + if len(failed_hosts) != 0: + logger.error("Hosts that sync vendor case config file failed: " + + ",".join(failed_hosts.keys())) + return False + return True + + +def log_test_configs(cases, curr_log_path, dp_path): + '''Put test configs to log ''' + logger.info("--------------------------------------------------") + logger.info("Prepare to run flagperf Inference benchmakrs with configs: ") + logger.info("Deploy path on host:\t" + dp_path) + logger.info("Vendor:\t\t" + config.VENDOR) + logger.info("Testcases:\t\t[" + ','.join(cases) + "]") + logger.info("Log path on host:\t" + curr_log_path) + logger.info("Cluster:\t\t[" + ",".join(config.HOSTS) + "]") + logger.info("--------------------------------------------------") + + +def main(config): + '''Main process to run all the testcases''' + + curr_log_whole = init_logger(config) + + print_welcome_msg() + + logger.info("======== Step 1: Check key configs. ========") + + check_test_host_config(config) + + # Check test environment and configs from host.yaml. + CLUSTER_MGR.init(config.HOSTS, config.SSH_PORT, getpass.getuser()) + check_cluster_health() + dp_path = _get_deploy_path(config) + check_cluster_deploy_path(dp_path) + + cases = get_valid_cases(config) + log_test_configs(cases, curr_log_whole, dp_path) + + logger.info("========= Step 2: Prepare and Run test cases. =========") + + for case in cases: + logger.info("======= Testcase: " + case + " =======") + _, case_config = get_config_from_case(case, config) + + # Prepare docker image. + image_mgr = image_manager.ImageManager( + "flagperf-inference-" + config.VENDOR + "-" + + case_config["framework"], "t_" + VERSION) + image_name = image_mgr.repository + ":" + image_mgr.tag + nnodes = case_config["nnodes"] + logger.info("=== 2.1 Prepare docker image:" + image_name + " ===") + if not prepare_docker_image_cluster( + dp_path, image_mgr, case_config["framework"], nnodes, config): + logger.error("=== 2.1 Prepare docker image...[FAILED] " + + "Ignore this case " + case + " ===") + continue + + # Set command to start docker container in the cluster + container_name = image_mgr.repository + "-" + image_mgr.tag \ + + "-container" + + logger.info("=== 2.2 Setup container and run testcases. ===") + + logger.info("-== Testcase " + case + " starts ==-") + logger.info("1) Prepare container environments in cluster...") + case_log_dir = os.path.join(curr_log_whole, case) + curr_log_path = os.path.join(case_log_dir, + config.HOSTS[0] + "_noderank0") + + if not prepare_containers_env_cluster(dp_path, case_log_dir, config, + container_name, image_name, + case_config): + logger.error("1) Prepare container environments in cluster" + "...[FAILED]. Ignore case " + case) + continue + logger.info("2) Start tasks in the cluster...") + + start_tasks_in_cluster(dp_path, container_name, case_config, + curr_log_path, config) + + logger.info("3) Training tasks end in the cluster...") + logger.info("4) Clean container environments in cluster...") + clean_containers_env_cluster(dp_path, container_name, nnodes) + logger.info("-== Testcase " + case + " finished ==-") + logger.info("=== 2.2 Setup container and run testcases finished." + " ===") + logger.info("=== 2.3 Compilation Case Performance ===") + compilation_result(curr_log_path, config) + + +if __name__ == '__main__': + if len(sys.argv) > 1: + usage() + CURR_PATH = os.path.abspath(os.path.dirname(__file__)) + yaml_path = os.path.join(CURR_PATH, "configs/host.yaml") + data = yaml.safe_load(open(yaml_path)) + + config = DefaultMunch.fromDict(data) + + main(config) diff --git a/inference/run_inference.py b/inference/run_inference.py new file mode 100644 index 000000000..ff6ba2acb --- /dev/null +++ b/inference/run_inference.py @@ -0,0 +1,161 @@ +# Copyright (c) 2023 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- + +import importlib +from loguru import logger +import time +import os +import sys +from tools import init_logger, merge_config +from argparse import ArgumentParser + + +def main(config): + + init_logger(config) + config = merge_config(config) + # e.g. import funcs from benchmarks/resnet50/pytorch/__init__.py + benchmark_module = importlib.import_module( + "benchmarks." + config.case + "." + config.framework, __package__) + """ + Init + """ + logger.log("Init Begin", "building dataloader and model") + start = time.time() + + dataloader = benchmark_module.build_dataloader(config) + model = benchmark_module.create_model(config) + + duration = time.time() - start + logger.log("Init End", str(duration) + " seconds") + """ + Using framework.eval(like torch.eval) to validate model & dataloader + """ + logger.log("Model Forward Begin", "") + start = time.time() + + evaluator = benchmark_module.evaluator + + p_forward, p_forward_core, val_acc = benchmark_module.model_forward( + model, dataloader, evaluator, config) + + logger.log("Model Forward End", "") + if config.compiler is None: + return config, p_forward, None, p_forward_core, None, val_acc, None + """ + Convert model into onnx + """ + logger.log("Export Begin", + "Export " + config.framework + " model into .onnx") + start = time.time() + + onnx_path = benchmark_module.export_model(model, config) + + duration = time.time() - start + logger.log("Export End", str(duration) + " seconds") + # e.g. import funcs from inference_engine/nvidia/inference.py + vendor_module = importlib.import_module("inference_engine." + + config.vendor + "." + + config.compiler) + """ + Compiling backend(like tensorRT) + """ + logger.log("Vendor Compile Begin", + "Compiling With " + config.vendor + "." + config.compiler) + start = time.time() + + compile_model = vendor_module.InferModel(config, onnx_path, model) + + duration = time.time() - start + logger.log("Vendor Compile End", str(duration) + " seconds") + """ + inference using engine + """ + logger.log("Vendor Inference Begin", "") + start = time.time() + + p_infer, p_infer_core, infer_acc = benchmark_module.engine_forward( + compile_model, dataloader, evaluator, config) + + logger.log("Vendor Inference End", "") + + return config, p_forward, p_infer, p_forward_core, p_infer_core, val_acc, infer_acc + + +def parse_args(): + parser = ArgumentParser(description=" ") + + parser.add_argument("--perf_dir", + type=str, + required=True, + help="abs dir of FlagPerf/inference/") + + parser.add_argument("--data_dir", + type=str, + required=True, + help="abs dir of data used in dataloader") + + parser.add_argument("--log_dir", + type=str, + required=True, + help="abs dir to write log") + + parser.add_argument("--loglevel", + type=str, + required=True, + help="DEBUG/INFO/WARNING/ERROR") + + parser.add_argument("--case", + type=str, + required=True, + help="case name like resnet50") + + parser.add_argument("--vendor", + type=str, + required=True, + help="vendor name like nvidia") + + parser.add_argument("--framework", + type=str, + required=True, + help="validation framework name like pytorch") + + args, unknown_args = parser.parse_known_args() + args.unknown_args = unknown_args + return args + + +if __name__ == "__main__": + config_from_args = parse_args() + config_from_args.framework = config_from_args.framework.split('_')[0] + + e2e_start = time.time() + + config, p_forward, p_infer, p_forward_core, p_infer_core, val_acc, infer_acc = main( + config_from_args) + + e2e_time = time.time() - e2e_start + e2e_time = round(float(e2e_time), 3) + + input_byte = 2 if config.fp16 else 4 + batch_input_byte = config.batch_size * config.input_size * input_byte + batch_input_byte = int(batch_input_byte) + + infer_info = { + "vendor": config.vendor, + "compiler": config.compiler, + "precision": "fp16" if config.fp16 else "fp32", + "batchsize": config.batch_size, + "byte_per_batch": batch_input_byte, + "e2e_time(second)": e2e_time, + "p_validation_whole(items per second)": p_forward, + "*p_validation_core(items per second)": p_forward_core, + "p_inference_whole(items per second)": p_infer, + "*p_inference_core(items per second)": p_infer_core, + "val_average_acc": val_acc, + "infer_average_acc": infer_acc + } + logger.log("Finish Info", infer_info) diff --git a/inference/tools/__init__.py b/inference/tools/__init__.py new file mode 100644 index 000000000..7a1910339 --- /dev/null +++ b/inference/tools/__init__.py @@ -0,0 +1,3 @@ +from .init_logger import init_logger +from .config_manager import merge_config +from .torch_sync import torch_sync diff --git a/inference/tools/config_manager.py b/inference/tools/config_manager.py new file mode 100644 index 000000000..7875e027d --- /dev/null +++ b/inference/tools/config_manager.py @@ -0,0 +1,73 @@ +import yaml +import os +from loguru import logger +from collections import namedtuple + + +def check_dup_cfg_parm(cfg, parm): + + for item in cfg.keys(): + if item in parm.keys(): + return False + return True + + +def merge_vendor(cfg, vendor_cfg, vendor): + for item in vendor_cfg.keys(): + if item not in cfg.keys(): + logger.warning("New Config Set by " + vendor + ": " + item) + cfg[item] = vendor_cfg[item] + else: + if vendor_cfg[item] == cfg[item]: + logger.error("Redundant Config Set at " + vendor + ": " + item) + exit(1) + logger.debug("Set " + item + " to " + str(vendor_cfg[item])) + cfg[item] = vendor_cfg[item] + + +def merge_config(config): + + configuration_path = config.perf_dir + "/configs/" + config.case + "/configurations.yaml" + parameter_path = config.perf_dir + "/configs/" + config.case + "/parameters.yaml" + vendor_cfg_path = config.perf_dir + "/configs/" + config.case + "/vendor_config/" + config.vendor + "_configurations.yaml" + + configuration = yaml.safe_load(open(configuration_path)) + if configuration is None: + configuration = {} + parameter = yaml.safe_load(open(parameter_path)) + if parameter is None: + parameter = {} + vendor_cfg = {} + if os.path.exists(vendor_cfg_path): + vendor_cfg = yaml.safe_load(open(vendor_cfg_path)) + + merged_data = {} + merged_data["perf_dir"] = config.perf_dir + merged_data["data_dir"] = config.data_dir + merged_data["log_dir"] = config.log_dir + merged_data["vendor"] = config.vendor + merged_data["case"] = config.case + merged_data["framework"] = config.framework + + if not check_dup_cfg_parm(configuration, parameter): + logger.error( + "Duplicated terms in configurations.yaml and parameters.yaml") + exit(1) + merge_vendor(configuration, vendor_cfg, config.vendor) + + for item in configuration.keys(): + if item in merged_data.keys(): + logger.error( + "Duplicated terms in configurations.yaml and host.yaml") + exit(1) + merged_data[item] = configuration[item] + + for item in parameter.keys(): + if item in merged_data.keys(): + logger.error("Duplicated terms in parameters.yaml and host.yaml") + exit(1) + merged_data[item] = parameter[item] + + Config = namedtuple("Config", merged_data.keys()) + unmutable_config = Config(**merged_data) + return unmutable_config diff --git a/inference/tools/init_logger.py b/inference/tools/init_logger.py new file mode 100644 index 000000000..65a9c18d4 --- /dev/null +++ b/inference/tools/init_logger.py @@ -0,0 +1,28 @@ +from loguru import logger +import sys + + +def init_logger(config): + logger.remove() + """ + define "EVENTS", using logger.log("EVENT",msg) to log + #21 means just important than info(#20), less than warning(#30) + Finish Info is more important than error(#40) + """ + logger.level("Init Begin", no=21) + logger.level("Init End", no=21) + logger.level("Export Begin", no=21) + logger.level("Export End", no=21) + logger.level("Model Forward Begin", no=21) + logger.level("Model Forward End", no=21) + logger.level("Vendor Compile Begin", no=21) + logger.level("Vendor Compile End", no=21) + logger.level("Vendor Inference Begin", no=21) + logger.level("Vendor Inference End", no=21) + logger.level("Finish Info", no=50) + + logdir = config.log_dir + logfile = logdir + "/container.out.log" + logger.add(logfile, level=config.loglevel) + + logger.add(sys.stdout, level=config.loglevel) diff --git a/inference/tools/torch_sync.py b/inference/tools/torch_sync.py new file mode 100644 index 000000000..6e5e8b09a --- /dev/null +++ b/inference/tools/torch_sync.py @@ -0,0 +1,6 @@ +import torch + + +def torch_sync(config): + if config.vendor == "nvidia": + torch.cuda.synchronize() diff --git a/inference/utils/__init__.py b/inference/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/inference/utils/cluster_manager.py b/inference/utils/cluster_manager.py new file mode 100644 index 000000000..7740d49c6 --- /dev/null +++ b/inference/utils/cluster_manager.py @@ -0,0 +1,212 @@ +# Copyright 2022 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +'''Cluster Manager''' + +import os +import sys + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) +sys.path.append(os.path.join(CURR_PATH)) +import run_cmd +from loguru import logger + + +class ClusterManager(): + '''A cluster manager that can make healthcheck, distribute files, and run a + command in the cluster. + ''' + + def __init__(self): + self.hosts = None + self.ssh_port = None + self.user = None + self.ssh_cmd_head = None + self.scp_cmd_head = None + + def init(self, hosts, port, user): + '''Init with all args that ssh needs.''' + self.hosts = hosts + self.ssh_port = port + self.user = user + self.ssh_cmd_head = "ssh -o ConnectTimeout=3" \ + + " -o StrictHostKeyChecking=no -l " + self.user \ + + " -p " + port + self.scp_cmd_head = "scp -o ConnectTimeout=3 " \ + + "-o StrictHostKeyChecking=no -P " + port + + logger.debug(f"ssh: {self.ssh_cmd_head}") + logger.debug(f"scp: {self.scp_cmd_head}") + + def _run_command_ssh_remote(self, cmd, host, timeout=10): + ''' Run cmd on host with ssh. + Return exit code of cmd and stdout/stderr messages. + ''' + ssh_run_cmd = self.ssh_cmd_head + " " + host + " \'" + cmd + "\'" + logger.debug("Run cmd on host with ssh. ssh cmd=" + cmd + " host=" + + host + " timeout=" + str(timeout)) + ret, outs = run_cmd.run_cmd_wait(ssh_run_cmd, timeout) + return ret, outs + + def healthcheck(self): + '''Return the hosts not alive. + ''' + return self.run_command_all_hosts(":") + + def get_hosts_list(self): + '''Return the lists of all hosts.''' + return self.hosts + + def get_hosts_count(self): + '''Return count of the hosts. + ''' + return len(self.hosts) + + def run_command_all_hosts(self, command, timeout=10): + '''Run a command on each host with ssh. + ''' + failed_hosts_ret = {} + for host in self.hosts: + ret, outs = self._run_command_ssh_remote(command, host, timeout) + if ret != 0: + failed_hosts_ret[host] = ret + logger.error("Run cmd on host " + host + " cmd=" + command + + " [FAILED]. Output: " + outs[0]) + return failed_hosts_ret + + def run_command_some_hosts(self, + command, + host_count=1, + timeout=10, + no_log=False): + '''Run a command on each host with ssh. + ''' + failed_hosts_ret = {} + for i in range(0, host_count): + logger.debug("host number:" + str(i)) + host = self.hosts[i] + ret, outs = self._run_command_ssh_remote(command, host, timeout) + if ret != 0: + failed_hosts_ret[host] = ret + if not no_log: + logger.error("Run cmd on host " + host + " cmd=" + + command + " [FAILED]. Output: " + outs[0]) + return failed_hosts_ret + + def start_monitors_some_hosts(self, + base_command, + case_log_dir, + host_count, + timeout=10): + '''Start monitors on hosts with ssh. + ''' + failed_hosts_ret = {} + for i in range(0, host_count): + logger.debug("host number:" + str(i)) + host = self.hosts[i] + # add log_dir option to the command + log_dir = os.path.join(case_log_dir, host + "_noderank" + str(i)) + command = base_command + log_dir + ret, outs = self._run_command_ssh_remote(command, host, timeout) + if ret != 0: + failed_hosts_ret[host] = ret + logger.error("Run cmd on host " + host + " cmd=" + command + + " [FAILED]. Output: " + outs[0]) + return failed_hosts_ret + + def run_command_some_hosts_distribution_info(self, + base_cmd, + host_count, + timeout=10): + '''Run a command with torch ddp options on each host with ssh. + ''' + failed_hosts_ret = {} + # remove the " at the end of base_cmd, then add other options. + # base_cmd = base_cmd.rstrip(" \"") + # command_master_ip = base_cmd + ' --master_addr ' + self.hosts[0] + for i in range(0, host_count): + host = self.hosts[i] + ret, outs = self._run_command_ssh_remote(base_cmd, host, timeout) + + if ret != 0: + failed_hosts_ret[host] = ret + logger.debug("Run cmd on host " + host + " cmd=" + base_cmd + + " node_rank=" + str(i) + " [FAILED]. Output: " + + outs[0]) + return failed_hosts_ret + + def _scp_file_to_remote_host(self, + host, + local_file, + remote_dir, + timeout=600): + ''' Run scp command to copy local_file to remote_dir. + ''' + + scp_cmd = self.scp_cmd_head + " " + local_file + " " + self.user \ + + "@" + host + ":" + remote_dir + "/" + logger.debug("scp command:" + scp_cmd) + ret, outs = run_cmd.run_cmd_wait(scp_cmd, timeout) + return ret, outs + + def sync_file_to_some_hosts(self, + local_file, + remote_dir, + host_count, + timeout=600): + '''scp local_file to remote_dir on hosts in the cluster .''' + failed_hosts_ret = {} + if not os.path.exists(local_file): + logger.error("Can't find local file before scp:" + local_file) + for host in self.hosts: + failed_hosts_ret[host] = 1 + return failed_hosts_ret + + for i in range(0, host_count): + host = self.hosts[i] + ret, outs = self._scp_file_to_remote_host(host, + local_file, + remote_dir, + timeout=timeout) + if ret != 0: + failed_hosts_ret[host] = ret + logger.debug("Scp local file " + local_file + "to " + host + + ":" + remote_dir + " [FAILED]. Output: " + + outs[0]) + return failed_hosts_ret + + def _scp_dir_from_remote_host(self, + host, + remote_dir, + local_dir, + timeout=600): + ''' Run scp command to copy remote_dir to local_dir. + ''' + scp_cmd = self.scp_cmd_head + " -r " + self.user + "@" + host + ":" \ + + remote_dir + "/* " + local_dir + "/" + logger.debug("scp command:" + scp_cmd) + ret, outs = run_cmd.run_cmd_wait(scp_cmd, timeout) + return ret, outs + + def collect_files_some_hosts(self, + remote_dir, + local_dir, + host_count, + timeout=600): + '''scp remote_dir from hosts in the cluster to /. + ''' + failed_hosts_ret = {} + for i in range(0, host_count): + host = self.hosts[i] + if not os.path.exists(local_dir): + logger.debug("Make local dir:" + local_dir) + os.makedirs(local_dir) + ret, outs = self._scp_dir_from_remote_host(host, + remote_dir, + local_dir, + timeout=timeout) + if ret != 0: + failed_hosts_ret[host] = ret + logger.debug("Scp from " + host + ":" + remote_dir + " to " + + local_dir + " [FAILED]. Output: " + outs[0]) + return failed_hosts_ret diff --git a/inference/utils/container_manager.py b/inference/utils/container_manager.py new file mode 100644 index 000000000..91c68f6cc --- /dev/null +++ b/inference/utils/container_manager.py @@ -0,0 +1,223 @@ +# Copyright 2022 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +'''Container Manager''' + +import os +import sys +from argparse import ArgumentParser + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) +sys.path.append(os.path.join(CURR_PATH)) +import run_cmd + + +class ContainerManager(): + '''A container manager that can start/stop/remove a container, run a + command in container, check container status with docker command. + ''' + + def __init__(self, container_name): + self.name = container_name + + def run_new(self, container_run_args, docker_image): + '''Start a new docker container with ''' + exists = self.exists() + if exists is True: + return 1, ["Conatiner exists.", None] + + run_new_cmd = "docker run " + container_run_args + \ + " --name=" + self.name + " \"" + docker_image + "\" " + \ + "sleep infinity" + print(run_new_cmd) + ret, outs = run_cmd.run_cmd_wait(run_new_cmd, 10) + return ret, outs + + def run_cmd_in(self, cmd_in_container, timeout=5, detach=True): + '''Start a new docker container with ''' + exists = self.exists() + if exists is False: + return 1, ["Conatiner doesn't exist.", None] + + if detach: + exec_cmd_head = "docker exec -d " + else: + exec_cmd_head = "docker exec -i " + + exec_cmd = exec_cmd_head + self.name + " bash -c \"" \ + + cmd_in_container + "\"" + print("run cmd in:", exec_cmd) + ret, outs = run_cmd.run_cmd_wait(exec_cmd, timeout) + print("ret:", ret, " outs:", outs[0]) + return ret, outs + + def start(self): + '''Start the stopped container. Useless now.''' + exists = self.exists() + if exists is False: + return 1, ["Conatiner doesn't exist.", None] + + rm_cmd = "docker start " + self.name + ret, outs = run_cmd.run_cmd_wait(rm_cmd, 3) + return ret, outs + + def stop(self): + '''Stop the container.''' + exists = self.exists() + if exists is False: + return 0, ["Conatiner doesn't exist.", None] + + rm_cmd = "docker stop " + self.name + ret, outs = run_cmd.run_cmd_wait(rm_cmd, 10) + return ret, outs + + def remove(self): + '''Remove the container. Useless now.''' + exists = self.exists() + if exists is False: + return 0, ["", None] + + rm_cmd = "docker rm -f " + self.name + ret, outs = run_cmd.run_cmd_wait(rm_cmd, 3) + return ret, outs + + def exists(self): + '''Return whether the container exists. + Return value: + True: It exists. + False: It doesn't exist. + ''' + exists = None + check_cmd = "docker ps -a | grep " + self.name + "$ | wc -l" + ret, outs = run_cmd.run_cmd_wait(check_cmd, 3) + + if ret == 0: + if str(outs[0]) == "1\n": + exists = True + elif str(outs[0]) == "0\n": + exists = False + return exists + + def is_pid_running(self, pid_file_path): + '''Return whether the process with pid is running in container. + Return value: + True: It is running. + False: It isn't running. + ''' + get_pid_cmd = "cat " + pid_file_path + ret, outs = self.run_cmd_in(get_pid_cmd, detach=False) + if ret == 0: + task_pid = int(outs[0]) + else: + print("Can't find pid file ", pid_file_path, "in container.") + return False + check_cmd = "ls /proc/" + str(task_pid) + "/cmdline" + ret, outs = self.run_cmd_in(check_cmd, detach=False) + if ret == 0: + print("The process is running.") + return True + print("The process is not running.") + return False + + +def _parse_args(): + '''Get command args from input. ''' + parser = ArgumentParser(description="Manage a container. ") + parser.add_argument("-o", + type=str, + required=True, + choices=[ + 'start', 'stop', 'rm', 'exists', 'runnew', + 'runcmdin', 'pidrunning' + ], + help="Operation on the container:" + "start Start a stopped container." + "stop Stop a container." + "rm Remove a container." + "exists Check whether a container exists." + "runnew Start a new container with run args." + "runcmdin Run a command in the container." + "pidrunning Check wether the process is running.") + parser.add_argument("-c", type=str, required=True, help="Container name") + + args, _ = parser.parse_known_args() + + if args.o == 'runnew': + parser.add_argument("-i", + type=str, + required=True, + help="Docker image.") + parser.add_argument("-a", + type=str, + required=True, + help="container start args.") + elif args.o == 'runcmdin': + parser.add_argument("-r", + type=str, + required=True, + help="command to run") + parser.add_argument("-d", + action='store_true', + default=False, + help="command to run") + parser.add_argument("-t", + type=int, + default=60, + help="timeout of running") + elif args.o == 'pidrunning': + parser.add_argument("-f", + type=str, + required=True, + help="pid file path in container.") + args = parser.parse_args() + return args + + +def main(): + '''Support command line for container manager. Called by cluster manager. + ''' + args = _parse_args() + container_mgr = ContainerManager(args.c) + operation = args.o + ret = None + outs = None + + if operation == "exists": + if container_mgr.exists(): + print("Container exists.") + sys.exit(0) + print("Container doesn't exist.") + sys.exit(1) + + if operation == "pidrunning": + if container_mgr.is_pid_running(args.f): + sys.exit(0) + sys.exit(1) + + if operation == "start": + ret, outs = container_mgr.start() + elif operation == "stop": + ret, outs = container_mgr.stop() + elif operation == "rm": + ret, outs = container_mgr.remove() + elif operation == "runcmdin": + cmd = args.r + detach = args.d + timeout = args.t + ret, outs = container_mgr.run_cmd_in(cmd, timeout, detach) + elif operation == "runnew": + run_args = args.a + docker_image = args.i + ret, outs = container_mgr.run_new(run_args, docker_image) + + if ret == 0: + print("Output: ", outs[0]) + print(operation, "successful.") + sys.exit(0) + print("Output: ", outs[0]) + print(operation, "failed.") + sys.exit(ret) + + +if __name__ == '__main__': + main() diff --git a/inference/utils/image_manager.py b/inference/utils/image_manager.py new file mode 100644 index 000000000..39ef2a493 --- /dev/null +++ b/inference/utils/image_manager.py @@ -0,0 +1,203 @@ +# Copyright 2022 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +#!/usr/bin/env python3 +# -*- coding:UTF-8 -*- +''' Local sudo docker image manger. +usage: +image_management.py -o [operation] -i [repository] -t [tag] +''' +import os +import sys +import argparse +from run_cmd import run_cmd_wait as rcw +from container_manager import ContainerManager + + +def _parse_args(): + ''' Check script input parameter. ''' + help_message = '''Operations for docker image: +exist Whether the image exists +remove Remove a docker image +build Build a docker image with two options if the image doesn't exist: + -d [directory] Directory contains dockerfile and install script + -f [framework] AI framework ''' + + parser = argparse.ArgumentParser( + description='Docker managment script', + formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument('-o', + type=str, + metavar='[operation]', + required=True, + choices=['exist', 'remove', 'build'], + help=help_message) + parser.add_argument('-i', + type=str, + metavar='[repository]', + required=True, + help='image repository') + parser.add_argument('-t', + type=str, + metavar='[tag]', + required=True, + help='image tag') + args, _ = parser.parse_known_args() + if args.o == "build": + parser.add_argument("-d", + type=str, + required=True, + help="dir contains dockerfile for building image.") + parser.add_argument("-f", + type=str, + required=True, + help="testcase framework of the image.") + args = parser.parse_args() + return args + + +class ImageManager(): + '''Local image manager. + Support operations below: + -- remove, rm image from local + -- exists, query if image exist local + -- build_image, build docker image + ''' + + def __init__(self, repository, tag): + self.repository = repository + self.tag = tag + + def exist(self): + '''Check if local image existi or not + Return code: + 0 - image already exist + 1 - image doesn't exist + ''' + cmd = "sudo docker images|grep -w \"" + self.repository + "\"|grep -w \"" + \ + self.tag + "\"" + print(cmd) + ret, _ = rcw(cmd, 10) + print(ret) + if ret != 0: + return 1 + return 0 + + def remove(self): + '''Remove local image + Return code: + 0 - rm image successfully + 1 - rm image failed + ''' + cmd = "sudo docker rmi " + self.repository + ":" + self.tag + ret, _ = rcw(cmd, 60) + if ret != 0: + return 1 + return 0 + + def _rm_tmp_image(self, tmp_image_name, cont_mgr): + '''remove temp container and temp image.''' + clean_tmp_cmd = "docker rmi -f " + tmp_image_name + cont_mgr.remove() + rcw(clean_tmp_cmd, 30) + + def build_image(self, image_dir, framework): + '''Build docker image in vendor's path. + ''' + # First, build base docker image. + tmp_image_name = "tmp_" + self.repository + ":" + self.tag + build_cmd = "cd " + image_dir + " && docker build -t " \ + + tmp_image_name + " ./" + + ret, _ = rcw(build_cmd, 600) + if ret != 0: + print("docker build failed. " + tmp_image_name) + return 1 + + # Second, start a container with the base image + tmp_container_name = "tmp_" + self.repository + "-" + self.tag \ + + "-container" + image_dir_in_container = "/workspace/docker_image" + start_args = " --rm --init --detach --net=host --uts=host " \ + + "--ipc=host --security-opt=seccomp=unconfined " \ + + "--privileged=true --ulimit=stack=67108864 " \ + + "--ulimit=memlock=-1 -v " + image_dir + ":" \ + + image_dir_in_container + cont_mgr = ContainerManager(tmp_container_name) + cont_mgr.remove() + ret, outs = cont_mgr.run_new(start_args, tmp_image_name) + if ret != 0: + print("Start new container with base image failed.") + print("Error: " + outs[0]) + self._rm_tmp_image(tmp_image_name, cont_mgr) + return ret + + # Third, install packages in container. + install_script = framework + "_install.sh" + if not os.path.isfile(os.path.join(image_dir, install_script)): + print("Can't find _install.sh") + install_cmd = ":" + else: + install_cmd = "bash " + image_dir_in_container + "/" \ + + install_script + ret, outs = cont_mgr.run_cmd_in(install_cmd, 1800, detach=False) + if ret != 0: + print("Run install command in temp container failed.") + print("Error: " + outs[0]) + self._rm_tmp_image(tmp_image_name, cont_mgr) + return ret + commit_cmd = "docker commit -a \"baai\" -m \"flagperf training\" " \ + + tmp_container_name + " " + self.repository + ":" \ + + self.tag + + ret, outs = rcw(commit_cmd, 300) + if ret != 0: + print("Commit docker image failed.") + print("Error: " + outs[0]) + self._rm_tmp_image(tmp_image_name, cont_mgr) + return ret + + # At last, remove temp container and temp image. + self._rm_tmp_image(tmp_image_name, cont_mgr) + return 0 + + +def main(): + '''Main process to manage image + Return code: + 0 - successfull. + 1 - failed. + 2 - invalid operation. ''' + args = _parse_args() + operation = args.o + image = args.i + tag = args.t + + image_manager = ImageManager(image, tag) + if operation == "exist": + ret = image_manager.exist() + if ret == 0: + print("Doker image exists.") + else: + print("Doker image doesn't exist.") + elif operation == "remove": + ret = image_manager.remove() + if ret == 0: + print("Remove doker image successfully.") + else: + print("Remove doker image failed.") + elif operation == "build": + if image_manager.exist() == 0: + ret = 0 + else: + image_dir = args.d + framework = args.f + ret = image_manager.build_image(image_dir, framework) + else: + print("Invalid operation.") + sys.exit(2) + sys.exit(ret) + + +if __name__ == "__main__": + main() diff --git a/inference/utils/prepare_in_container.py b/inference/utils/prepare_in_container.py new file mode 100644 index 000000000..b2fe4fe03 --- /dev/null +++ b/inference/utils/prepare_in_container.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +# -*- coding: UTF-8 -*- +'''This script is called in container to prepare running environment. +''' +import os +import sys +import shutil +from argparse import ArgumentParser + +CURR_PATH = os.path.abspath(os.path.dirname(__file__)) +sys.path.append(os.path.abspath(os.path.join(CURR_PATH, "../"))) +from utils import run_cmd + + +def parse_args(): + '''Parse args with ArgumentParser.''' + parser = ArgumentParser("Prepare running environment in Container.") + parser.add_argument("--model", + type=str, + default=None, + required=True, + help="Model name.") + parser.add_argument("--vendor", + type=str, + required=True, + help="Accelerator vendor.") + parser.add_argument("--framework", + type=str, + required=True, + help="AI framework.") + parser.add_argument("--pipsource", + type=str, + default="https://pypi.tuna.tsinghua.edu.cn/simple", + help="pip source.") + args = parser.parse_args() + return args + + +def install_requriements(vendor, model, framework, pipsource): + '''Install required python packages in vendor's path.''' + # framework: DL framework, may include version info. e.g. pytorch_1.13 + root_path = os.path.abspath(os.path.join(CURR_PATH, "../")) + benchmark_path = os.path.join(root_path, "benchmarks/") + case_path = os.path.join(benchmark_path, model) + framework_name = framework.split("_")[0] + framework_path = os.path.join(case_path, framework_name) + + req_file = os.path.join(framework_path, "requirements.txt") + print(req_file) + env_file = os.path.join(framework_path, "environment_variables.sh") + if not os.path.isfile(req_file): + print("requirenments file ", req_file, " doesn't exist. Do nothing.") + return 0 + + pip_install_cmd = "source " + env_file + "; pip3 install -r " + req_file \ + + " -i " + pipsource + print(pip_install_cmd) + ret, outs = run_cmd.run_cmd_wait(pip_install_cmd, 1200) + print(ret, outs[0]) + return ret + + +def install_extensions(vendor, model, framework): + '''Install vendor's extensions with setup.py script.''' + root_path = os.path.abspath(os.path.join(CURR_PATH, "../")) + vendor_path = os.path.join(root_path, "inference_engine/") + vendor_path = os.path.join(vendor_path, vendor) + source_path = os.path.join(vendor_path, "csrc") + env_file = os.path.join(vendor_path, "environment_variables.sh") + + if not os.path.isdir(source_path): + print("extensioin code ", source_path, " doesn't exist. Do nothing.") + return 0 + + sandbox_dir = os.path.join(vendor_path, 'sandbox', "extension") + if os.path.exists(sandbox_dir): + shutil.rmtree(sandbox_dir) + + cmd = "source " + env_file + "; export EXTENSION_SOURCE_DIR=" \ + + source_path + " ;" + " mkdir -p " + sandbox_dir + "; cd " \ + + sandbox_dir + "; " + sys.executable + " " + source_path \ + + "/setup.py install; " + " rm -rf " + sandbox_dir + print(cmd) + ret, outs = run_cmd.run_cmd_wait(cmd, 1200) + print(ret, outs[0]) + return ret + + +def main(): + '''Main process of preparing environment.''' + args = parse_args() + ret = install_requriements(args.vendor, args.model, args.framework, + args.pipsource) + if ret != 0: + sys.exit(ret) + ret = install_extensions(args.vendor, args.model, args.framework) + sys.exit(ret) + + +if __name__ == '__main__': + main() diff --git a/inference/utils/run_cmd.py b/inference/utils/run_cmd.py new file mode 100644 index 000000000..0cdf5e2b3 --- /dev/null +++ b/inference/utils/run_cmd.py @@ -0,0 +1,27 @@ +# Copyright 2022 BAAI. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License") +'''Basic functions to run shell commands''' + +import subprocess + + +def run_cmd_wait(cmd, timeout): + '''Run a shell command and wait second(s).''' + process = subprocess.Popen(cmd, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + encoding='utf-8') + + try: + output = process.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + process.kill() + output = process.communicate() + + # logger.debug(f"{cmd} returncode: {process.returncode}") + # logger.debug(f"{cmd} stdout: {output[0]}") + # logger.debug(f"{cmd} stderr: {output[1]}") + + return process.returncode, output diff --git a/inference/utils/sys_monitor.py b/inference/utils/sys_monitor.py new file mode 100644 index 000000000..efdaade47 --- /dev/null +++ b/inference/utils/sys_monitor.py @@ -0,0 +1,286 @@ +# !/usr/bin/env python3 +# encoding: utf-8 +''' +Usage: python3 sys-monitor.py -o operation -l [log_path] + -o, --operation start|stop|restart|status + -l, --log log path , ./logs/ default + -v, --gpu vendor nvidia|iluvatar|cambricon|kunlunxin +''' + +import os +import sys +import time +import signal +import atexit +import argparse +import schedule +import datetime +from multiprocessing import Process +from run_cmd import run_cmd_wait as rcw + + +class Daemon: + ''' + daemon subprocess class. + usage: subclass this daemon and override the run() method. + sys-monitor.pid: in the /tmp/, auto del when unexpected exit. + verbose: debug mode, disabled default. + ''' + + def __init__(self, + pid_file, + log_file, + err_file, + log_path, + rate1=5, + rate2=120, + stdin=os.devnull, + stdout=os.devnull, + stderr=os.devnull, + home_dir='.', + umask=0o22, + verbose=0): + self.stdin = stdin + self.stdout = stdout + self.stderr = stderr + self.home_dir = home_dir + self.verbose = verbose + self.pidfile = pid_file + self.loggile = log_file + self.errfile = err_file + # result for cpu,mem,gpu,pwr of system + self.log_path = log_path + self.cpulog = str(log_path + '/cpu_monitor.log') + self.memlog = str(log_path + '/mem_monitor.log') + self.pwrlog = str(log_path + '/pwr_monitor.log') + self.rate1 = rate1 + self.rate2 = rate2 + self.umask = umask + self.verbose = verbose + self.daemon_alive = True + + def get_pid(self): + try: + with open(self.pidfile, 'r') as pf: + pid = int(pf.read().strip()) + except IOError: + pid = None + except SystemExit: + pid = None + return pid + + def del_pid(self): + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + + def run(self): + ''' + NOTE: override the method in subclass + ''' + + def cpu_mon(file): + TIMESTAMP = datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') + cmd = "mpstat -P ALL 1 1|grep -v Average|grep all|awk '{print (100-$NF)/100}'" + res, out = rcw(cmd, 10) + if res: + result = "error" + result = TIMESTAMP + "\t" + out[0] + with open(file, 'a') as f: + f.write(result) + + def mem_mon(file): + TIMESTAMP = datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') + cmd = "free -g|grep -i mem|awk '{print $3/$2}'" + res, out = rcw(cmd, 10) + if res: + result = "error" + result = TIMESTAMP + "\t" + out[0] + with open(file, 'a') as f: + f.write(result) + + def pwr_mon(file): + TIMESTAMP = datetime.datetime.now().strftime('%Y-%m-%d-%H:%M:%S') + cmd = "ipmitool sdr list|grep -i Watts|awk 'BEGIN{FS = \"|\"}{for (f=1; f <= NF; f+=1) {if ($f ~ /Watts/)" \ + " {print $f}}}'|awk '{print $1}'|sort -n -r|head -n1" + res, out = rcw(cmd, 10) + if res: + result = "error" + result = TIMESTAMP + "\t" + out[0] + with open(file, 'a') as f: + f.write(result) + + def timer_cpu_mon(): + cpu_process = Process(target=cpu_mon, args=(self.cpulog, )) + cpu_process.start() + + def timer_mem_mon(): + mem_process = Process(target=mem_mon, args=(self.memlog, )) + mem_process.start() + + def timer_pwr_mon(): + pwr_process = Process(target=pwr_mon, args=(self.pwrlog, )) + pwr_process.start() + + schedule.every(self.rate1).seconds.do(timer_cpu_mon) + schedule.every(self.rate1).seconds.do(timer_mem_mon) + schedule.every(self.rate2).seconds.do(timer_pwr_mon) + while True: + schedule.run_pending() + time.sleep(5) + + def daemonize(self): + if self.verbose >= 1: + print('daemon process starting ...') + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #1 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + os.chdir(self.home_dir) + os.setsid() + os.umask(self.umask) + try: + pid = os.fork() + if pid > 0: + sys.exit(0) + except OSError as e: + sys.stderr.write('fork #2 failed: %d (%s)\n' % + (e.errno, e.strerror)) + sys.exit(1) + sys.stdout.flush() + sys.stderr.flush() + si = open(self.stdin, 'r') + so = open(self.stdout, 'a+') + if self.stderr: + se = open(self.stderr, 'a+') + else: + se = so + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + atexit.register(self.del_pid) + pid = str(os.getpid()) + with open(self.pidfile, 'w+') as f: + f.write('%s\n' % pid) + + def start(self): + if not os.path.exists(self.log_path): + os.makedirs(self.log_path) + else: + for i in self.cpulog, self.memlog, self.pwrlog: + if os.path.exists(i): + os.remove(i) + if self.verbose >= 1: + print('ready to start ......') + # check for a pid file to see if the daemon already runs + pid = self.get_pid() + if pid: + msg = 'pid file %s already exists, is it already running?\n' + sys.stderr.write(msg % self.pidfile) + sys.exit(1) + # start the daemon + self.daemonize() + self.run() + + def stop(self): + if self.verbose >= 1: + print('stopping ...') + pid = self.get_pid() + if not pid: + msg = 'pid file [%s] does not exist. Not running?\n' % self.pidfile + sys.stderr.write(msg) + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + return + # try to kill the daemon process + try: + i = 0 + while 1: + os.kill(pid, signal.SIGTERM) + time.sleep(1) + i = i + 1 + if i % 10 == 0: + os.kill(pid, signal.SIGHUP) + except OSError as err: + err = str(err) + if err.find('No such process') > 0: + if os.path.exists(self.pidfile): + os.remove(self.pidfile) + else: + print(str(err)) + sys.exit(1) + if self.verbose >= 1: + print('Stopped!') + + def restart(self): + self.stop() + self.start() + + def status(self): + pid = self.get_pid() + if pid: + if os.path.exists('/proc/%d' % pid): + return pid + else: + return False + else: + return False + + +def parse_args(): + ''' Check script input parameter. ''' + parse = argparse.ArgumentParser(description='Sys monitor script') + parse.add_argument('-o', + type=str, + metavar='[operation]', + required=True, + help='start|stop|restart|status') + parse.add_argument('-l', + type=str, + metavar='[log_path]', + required=False, + default='./logs/', + help='log path') + args = parse.parse_args() + return args + + +def main(): + sample_rate1 = 5 + sample_rate2 = 120 + args = parse_args() + operation = args.o + path = args.l + pid_fn = str('/tmp/sys_monitor.pid') + log_fn = str(path + '/sys_monitor.log') + err_fn = str(path + '/sys_monitor.err') + + subdaemon = Daemon(pid_fn, + log_fn, + err_fn, + path, + verbose=1, + rate1=sample_rate1, + rate2=sample_rate2) + if operation == 'start': + subdaemon.start() + elif operation == 'stop': + subdaemon.stop() + elif operation == 'restart': + subdaemon.restart() + elif operation == 'status': + pid = subdaemon.status() + if pid: + print('process [%s] is running ......' % pid) + else: + print('daemon process [%s] stopped' % pid) + else: + print("invalid argument!") + sys.exit(1) + + +if __name__ == '__main__': + main()