diff --git a/vectordb_bench/backend/runner/mp_runner.py b/vectordb_bench/backend/runner/mp_runner.py index 25e383bf7..3b99cfe6e 100644 --- a/vectordb_bench/backend/runner/mp_runner.py +++ b/vectordb_bench/backend/runner/mp_runner.py @@ -4,6 +4,7 @@ import multiprocessing as mp import logging from typing import Iterable +import numpy as np from ..clients import api from ... import config @@ -49,6 +50,7 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) start_time = time.perf_counter() count = 0 + latencies = [] while time.perf_counter() < start_time + self.duration: s = time.perf_counter() try: @@ -61,7 +63,8 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) log.warning(f"VectorDB search_embedding error: {e}") traceback.print_exc(chain=True) raise e from None - + + latencies.append(time.perf_counter() - s) count += 1 # loop through the test data idx = idx + 1 if idx < num - 1 else 0 @@ -75,7 +78,7 @@ def search(self, test_data: list[list[float]], q: mp.Queue, cond: mp.Condition) f"actual_dur={total_dur}s, count={count}, qps in this process: {round(count / total_dur, 4):3}" ) - return (count, total_dur) + return (count, total_dur, latencies) @staticmethod def get_mp_context(): @@ -85,6 +88,9 @@ def get_mp_context(): def _run_all_concurrencies_mem_efficient(self) -> float: max_qps = 0 + conc_num_list = [] + conc_qps_list = [] + conc_latency_p99_list = [] try: for conc in self.concurrencies: with mp.Manager() as m: @@ -103,9 +109,14 @@ def _run_all_concurrencies_mem_efficient(self) -> float: start = time.perf_counter() all_count = sum([r.result()[0] for r in future_iter]) + latencies = sum([r.result()[2] for r in future_iter], start=[]) + latency_p99 = np.percentile(latencies, 0.99) cost = time.perf_counter() - start qps = round(all_count / cost, 4) + conc_num_list.append(conc) + conc_qps_list.append(qps) + conc_latency_p99_list.append(latency_p99) log.info(f"End search in concurrency {conc}: dur={cost}s, total_count={all_count}, qps={qps}") if qps > max_qps: @@ -122,7 +133,7 @@ def _run_all_concurrencies_mem_efficient(self) -> float: finally: self.stop() - return max_qps + return max_qps, conc_num_list, conc_qps_list, conc_latency_p99_list def run(self) -> float: """ diff --git a/vectordb_bench/backend/runner/serial_runner.py b/vectordb_bench/backend/runner/serial_runner.py index e4861abd1..9e6818443 100644 --- a/vectordb_bench/backend/runner/serial_runner.py +++ b/vectordb_bench/backend/runner/serial_runner.py @@ -10,7 +10,7 @@ import pandas as pd from ..clients import api -from ...metric import calc_recall +from ...metric import calc_ndcg, calc_recall, get_ideal_dcg from ...models import LoadTimeoutError, PerformanceTimeoutError from .. import utils from ... import config @@ -171,11 +171,12 @@ def search(self, args: tuple[list, pd.DataFrame]): log.info(f"{mp.current_process().name:14} start search the entire test_data to get recall and latency") with self.db.init(): test_data, ground_truth = args + ideal_dcg = get_ideal_dcg(self.k) log.debug(f"test dataset size: {len(test_data)}") log.debug(f"ground truth size: {ground_truth.columns}, shape: {ground_truth.shape}") - latencies, recalls = [], [] + latencies, recalls, ndcgs = [], [], [] for idx, emb in enumerate(test_data): s = time.perf_counter() try: @@ -194,6 +195,7 @@ def search(self, args: tuple[list, pd.DataFrame]): gt = ground_truth['neighbors_id'][idx] recalls.append(calc_recall(self.k, gt[:self.k], results)) + ndcgs.append(calc_ndcg(gt[:self.k], results, ideal_dcg)) if len(latencies) % 100 == 0: @@ -201,6 +203,7 @@ def search(self, args: tuple[list, pd.DataFrame]): avg_latency = round(np.mean(latencies), 4) avg_recall = round(np.mean(recalls), 4) + avg_ndcg = round(np.mean(ndcgs), 4) cost = round(np.sum(latencies), 4) p99 = round(np.percentile(latencies, 99), 4) log.info( @@ -208,10 +211,11 @@ def search(self, args: tuple[list, pd.DataFrame]): f"cost={cost}s, " f"queries={len(latencies)}, " f"avg_recall={avg_recall}, " + f"avg_ndcg={avg_ndcg}," f"avg_latency={avg_latency}, " f"p99={p99}" ) - return (avg_recall, p99) + return (avg_recall, avg_ndcg, p99) def _run_in_subprocess(self) -> tuple[float, float]: diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 0680847aa..a6d94f186 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -150,7 +150,8 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: ) self._init_search_runner() - m.qps = self._conc_search() + + m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = self._conc_search() m.recall, m.serial_latency_p99 = self._serial_search() ''' @@ -181,10 +182,11 @@ def _run_perf_case(self, drop_old: bool = True) -> Metric: m.recall = search_results.recall m.serial_latencies = search_results.serial_latencies ''' - m.recall, m.serial_latency_p99 = search_results + m.recall, m.ndcg, m.serial_latency_p99 = search_results if TaskStage.SEARCH_CONCURRENT in self.config.stages: search_results = self._conc_search() - m.qps = search_results + m.qps, m.conc_num_list, m.conc_qps_list, m.conc_latency_p99_list = search_results + except Exception as e: log.warning(f"Failed to run performance case, reason = {e}") traceback.print_exc() diff --git a/vectordb_bench/frontend/components/check_results/data.py b/vectordb_bench/frontend/components/check_results/data.py index 10fa3f459..c092da3a0 100644 --- a/vectordb_bench/frontend/components/check_results/data.py +++ b/vectordb_bench/frontend/components/check_results/data.py @@ -87,15 +87,18 @@ def mergeMetrics(metrics_1: dict, metrics_2: dict) -> dict: def getBetterMetric(metric, value_1, value_2): - if value_1 < 1e-7: - return value_2 - if value_2 < 1e-7: + try: + if value_1 < 1e-7: + return value_2 + if value_2 < 1e-7: + return value_1 + return ( + min(value_1, value_2) + if isLowerIsBetterMetric(metric) + else max(value_1, value_2) + ) + except Exception: return value_1 - return ( - min(value_1, value_2) - if isLowerIsBetterMetric(metric) - else max(value_1, value_2) - ) def getBetterLabel(label_1: ResultLabel, label_2: ResultLabel): diff --git a/vectordb_bench/frontend/components/concurrent/charts.py b/vectordb_bench/frontend/components/concurrent/charts.py new file mode 100644 index 000000000..bf886bd8e --- /dev/null +++ b/vectordb_bench/frontend/components/concurrent/charts.py @@ -0,0 +1,82 @@ + + +from vectordb_bench.backend.cases import Case +from vectordb_bench.frontend.components.check_results.expanderStyle import initMainExpanderStyle +import plotly.express as px + +from vectordb_bench.frontend.const.styles import COLOR_MAP + + +def drawChartsByCase(allData, cases: list[Case], st): + initMainExpanderStyle(st) + for case in cases: + chartContainer = st.expander(case.name, True) + caseDataList = [ + data for data in allData if data["case_name"] == case.name] + data = [{ + "conc_num": caseData["conc_num_list"][i], + "qps": caseData["conc_qps_list"][i], + "latency_p99": caseData["conc_latency_p99_list"][i] * 1000, + "db_name": caseData["db_name"], + "db": caseData["db"] + + } for caseData in caseDataList for i in range(len(caseData["conc_num_list"]))] + drawChart(data, chartContainer) + + +def getRange(metric, data, padding_multipliers): + minV = min([d.get(metric, 0) for d in data]) + maxV = max([d.get(metric, 0) for d in data]) + padding = maxV - minV + rangeV = [ + minV - padding * padding_multipliers[0], + maxV + padding * padding_multipliers[1], + ] + return rangeV + + +def drawChart(data, st): + if len(data) == 0: + return + + x = "latency_p99" + xrange = getRange(x, data, [0.05, 0.1]) + + y = "qps" + yrange = getRange(y, data, [0.2, 0.1]) + + color = "db" + color_discrete_map = COLOR_MAP + color = "db_name" + color_discrete_map = None + line_group = "db_name" + text = "conc_num" + + data.sort(key=lambda a: a["conc_num"]) + + fig = px.line( + data, + x=x, + y=y, + color=color, + color_discrete_map=color_discrete_map, + line_group=line_group, + text=text, + markers=True, + # color_discrete_map=color_discrete_map, + hover_data={ + "conc_num": True, + }, + height=720, + ) + fig.update_xaxes(range=xrange, title_text="Latency P99 (ms)") + fig.update_yaxes(range=yrange, title_text="QPS") + fig.update_traces(textposition="bottom right", + texttemplate="conc-%{text:,.4~r}") + # fig.update_layout( + # margin=dict(l=0, r=0, t=40, b=0, pad=8), + # legend=dict( + # orientation="h", yanchor="bottom", y=1, xanchor="right", x=1, title="" + # ), + # ) + st.plotly_chart(fig, use_container_width=True,) diff --git a/vectordb_bench/frontend/components/tables/data.py b/vectordb_bench/frontend/components/tables/data.py new file mode 100644 index 000000000..96134c7ff --- /dev/null +++ b/vectordb_bench/frontend/components/tables/data.py @@ -0,0 +1,44 @@ +from dataclasses import asdict +from vectordb_bench.backend.cases import CaseType +from vectordb_bench.interface import benchMarkRunner +from vectordb_bench.models import CaseResult, ResultLabel +import pandas as pd + + +def getNewResults(): + allResults = benchMarkRunner.get_results() + newResults: list[CaseResult] = [] + + for res in allResults: + results = res.results + for result in results: + if result.label == ResultLabel.NORMAL: + newResults.append(result) + + + df = pd.DataFrame(formatData(newResults)) + return df + + +def formatData(caseResults: list[CaseResult]): + data = [] + for caseResult in caseResults: + db = caseResult.task_config.db.value + db_label = caseResult.task_config.db_config.db_label + case_config = caseResult.task_config.case_config + db_case_config = caseResult.task_config.db_case_config + case = case_config.case_id.case_cls() + filter_rate = case.filter_rate + dataset = case.dataset.data.name + metrics = asdict(caseResult.metrics) + data.append( + { + "db": db, + "db_label": db_label, + "case_name": case.name, + "dataset": dataset, + "filter_rate": filter_rate, + **metrics, + } + ) + return data \ No newline at end of file diff --git a/vectordb_bench/frontend/pages/concurrent.py b/vectordb_bench/frontend/pages/concurrent.py new file mode 100644 index 000000000..0c1415efc --- /dev/null +++ b/vectordb_bench/frontend/pages/concurrent.py @@ -0,0 +1,72 @@ + + + +import streamlit as st +from vectordb_bench.backend.cases import CaseType +from vectordb_bench.frontend.components.check_results.footer import footer +from vectordb_bench.frontend.components.check_results.expanderStyle import initMainExpanderStyle +from vectordb_bench.frontend.components.check_results.priceTable import priceTable +from vectordb_bench.frontend.components.check_results.headerIcon import drawHeaderIcon +from vectordb_bench.frontend.components.check_results.nav import NavToResults, NavToRunTest +from vectordb_bench.frontend.components.check_results.charts import drawMetricChart +from vectordb_bench.frontend.components.check_results.filters import getshownData +from vectordb_bench.frontend.components.concurrent.charts import drawChartsByCase +from vectordb_bench.frontend.components.get_results.saveAsImage import getResults +from vectordb_bench.frontend.const.styles import * +from vectordb_bench.interface import benchMarkRunner +from vectordb_bench.models import TestResult + + +def main(): + # set page config + st.set_page_config( + page_title="VDBBench Conc Perf", + page_icon=FAVICON, + layout="wide", + # initial_sidebar_state="collapsed", + ) + + # header + drawHeaderIcon(st) + + allResults = benchMarkRunner.get_results() + + def check_conc_data(res: TestResult): + case_results = res.results + count = 0 + for case_result in case_results: + if len(case_result.metrics.conc_num_list) > 0: + count += 1 + + return count > 0 + + checkedResults = [res for res in allResults if check_conc_data(res)] + + + st.title("VectorDB Benchmark (Concurrent Performance)") + + # results selector + resultSelectorContainer = st.sidebar.container() + shownData, _, showCases = getshownData( + checkedResults, resultSelectorContainer) + + + resultSelectorContainer.divider() + + # nav + navContainer = st.sidebar.container() + NavToRunTest(navContainer) + NavToResults(navContainer) + + # save or share + resultesContainer = st.sidebar.container() + getResults(resultesContainer, "vectordb_bench_concurrent") + + drawChartsByCase(shownData, showCases, st.container()) + + # footer + footer(st.container()) + + +if __name__ == "__main__": + main() diff --git a/vectordb_bench/frontend/pages/tables.py b/vectordb_bench/frontend/pages/tables.py new file mode 100644 index 000000000..a4dab68a6 --- /dev/null +++ b/vectordb_bench/frontend/pages/tables.py @@ -0,0 +1,24 @@ +import streamlit as st +from vectordb_bench.frontend.components.check_results.headerIcon import drawHeaderIcon +from vectordb_bench.frontend.components.tables.data import getNewResults +from vectordb_bench.frontend.const.styles import FAVICON + + +def main(): + # set page config + st.set_page_config( + page_title="Table", + page_icon=FAVICON, + layout="wide", + # initial_sidebar_state="collapsed", + ) + + # header + drawHeaderIcon(st) + + df = getNewResults() + st.dataframe(df, height=800) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/vectordb_bench/metric.py b/vectordb_bench/metric.py index a2b6d6ff0..5c23072e3 100644 --- a/vectordb_bench/metric.py +++ b/vectordb_bench/metric.py @@ -1,7 +1,7 @@ import logging import numpy as np -from dataclasses import dataclass +from dataclasses import dataclass, field log = logging.getLogger(__name__) @@ -19,6 +19,10 @@ class Metric: qps: float = 0.0 serial_latency_p99: float = 0.0 recall: float = 0.0 + ndcg: float = 0.0 + conc_num_list: list[int] = field(default_factory=list) + conc_qps_list: list[float] = field(default_factory=list) + conc_latency_p99_list: list[float] = field(default_factory=list) QURIES_PER_DOLLAR_METRIC = "QP$ (Quries per Dollar)" @@ -60,3 +64,21 @@ def calc_recall(count: int, ground_truth: list[int], got: list[int]) -> float: recalls[i] = 1 return np.mean(recalls) + + +def get_ideal_dcg(k: int): + ideal_dcg = 0 + for i in range(k): + ideal_dcg += 1 / np.log2(i+2) + + return ideal_dcg + + +def calc_ndcg(ground_truth: list[int], got: list[int], ideal_dcg: float) -> float: + dcg = 0 + ground_truth = list(ground_truth) + for id in set(got): + if id in ground_truth: + idx = ground_truth.index(id) + dcg += 1 / np.log2(idx+2) + return dcg / ideal_dcg