From 194343df1c6c3feccfcbcf8b41b50be3f09605a7 Mon Sep 17 00:00:00 2001 From: Ivo Dilov Date: Mon, 10 Mar 2025 13:47:50 +0200 Subject: [PATCH] ASV framework refactor proposal This commit adds some brief code example of how I think we can simplify the current framework. The main idea is that currently the `EnvConfigurationBase` is responsible for too many things at the same time: - dataframe generation - library creation/population - differentiating between persistant/modifiable libraries. - managing different libraries needed for a benchmark It being responsible for a large bundle of things makes it hard to: - reuse between different benchmarks (we basically end up with one new child of `EnvConfigurationBase` for each benchmark) - understand what some method actually does and we need to rely on lenghty documentation which is harder to keep up to date My proposal is not code complete but show how we might go about simplifying this framework. New code is inside the `environment_setup_refactored.py` and show how we might go about replacing the current `environment_setup.py`. I've also tweaked the `LMDBReadWrite` benchmark to show how we can use the new framework. The proposal mainly consists of separating the four responsibilities of the `EnvConfigurationBase` - dataframe generation (now we have an interface `DataFrameGenerator`) - library population (now we have an interface `LibraryPopulationPolicy`) - persistant/modifiable (now we just have function which can give you what kind of library you need) - managing different libraries (now this is entirely responsibility of the benchmark as each benchmark might need completely different logic around the number/types of libraries it needs. E.g. read_write uses one persistent and one modifiable) By separating these we can get better reusablity: - E.g. we can mix and match policies with dataframe generators for different benchmarks Also now when I see the code in the benchmark I can how the libraries are created/populated instead of an opaque `setup` call which is unclear what it does. --- .../util/environment_setup_refactored.py | 116 +++++++++ python/benchmarks/real_read_write.py | 224 ++++++++---------- 2 files changed, 216 insertions(+), 124 deletions(-) create mode 100644 python/arcticdb/util/environment_setup_refactored.py diff --git a/python/arcticdb/util/environment_setup_refactored.py b/python/arcticdb/util/environment_setup_refactored.py new file mode 100644 index 0000000000..837a4830e8 --- /dev/null +++ b/python/arcticdb/util/environment_setup_refactored.py @@ -0,0 +1,116 @@ +import copy +from abc import ABC, abstractmethod +from enum import Enum +import logging +import os +import tempfile +import time +import re +import pandas as pd +import numpy as np +from typing import List, Union + +from arcticdb.arctic import Arctic +from arcticdb.options import LibraryOptions +from arcticdb.storage_fixtures.s3 import BaseS3StorageFixtureFactory, real_s3_from_environment_variables +from arcticdb.util.utils import DFGenerator, ListGenerators, TimestampNumber +from arcticdb.version_store.library import Library + +def get_logger_for_asv(bencmhark_cls): + logLevel = logging.INFO + logger = logging.getLogger(bencmhark_cls.__name__) + logger.setLevel(logLevel) + console_handler = logging.StreamHandler() + console_handler.setLevel(logLevel) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + return logger + + +class Storage(Enum): + AMAZON = 1 + LMDB = 2 + + +class LibraryType(Enum): + PERSISTENT = "PERMANENT" + MODIFIABLE = "MODIFIABLE" + + +# It is quite clear what is this responsible for: only dataframe generation +# Using such an abstraction can help us deduplicate the dataframe generation code between the different `EnvironmentSetup`s +# Note: We use a class instead of a generator function to allow caching of dataframes in the state +class DataFrameGenerator(ABC): + @abstractmethod + def get_dataframe(self, **kwargs): + pass + +# The population policy is tightly paired with the `populate_library` +# Using a separate abstraction for the population policy can allow us to be flexible with how we populate libraries. +# E.g. One benchmark can use one population policy for read operations, a different for updates and a third for appends +# Note: Using a kwargs generator to be passed to the df_generator allows customization of dataframe generation params (e.g. num_rows, num_cols, use_string_columns?) +class LibraryPopulationPolicy(ABC): + def __init__(self, num_symbols: int, df_generator: DataFrameGenerator): + self.num_symbols = num_symbols + self.df_generator = df_generator + + @abstractmethod + def get_symbol_name(self, ind: int): + pass + + @abstractmethod + def get_generator_kwargs(self, ind: int) -> Dict[str, Any]: + pass + + +# Currently we're using the same arctic client for both persistant and modifiable libraries. +# We might decide that we want different arctic clients (e.g. different buckets) but probably not needed for now. +def get_arctic_client(storage: Storage) -> Arctic: + pass + +def get_library_name(library_type, benchmark_cls, lib_name_suffix): + if library_type == LibraryType.PERSISTENT: + return f"{library_type}_{benchmark_cls.__name__}_{lib_name_suffix}" + if library_type == LibraryType.MODIFIABLE: + # We want the modifiable libraries to be unique per process/ benchmark class. We embed this deep in the name + return f"{library_type}_{benchmark_cls.__name__}_{os.getpid()}_{lib_name_suffix}" + +def get_library(ac : Arctic, library_type : LibraryType, benchmark_cls, lib_name_suffix : str) -> Library: + lib_name = get_library_name(library_type, benchmark_cls, lib_name_suffix) + if library_type == LibraryType.PERSISTENT: + # TODO: Change to OpenMode=READ for persistent libraries + # This however is not available in the c++ library manager API (it currently hardcodes OpenMode=DELETE). + # We can expose it just for these tests if we decide so. but this is probably for a separate PR. + pass + return ac.get_library(lib_name, create_if_missing=True) + + +def populate_library(lib, population_policy : LibraryPopulationPolicy): + num_symbols = population_policy.num_symbols + df_generator = population_policy.df_generator + for i in range(num_symbols): + sym = population_policy.get_symbol(i) + kwargs = population_policy.get_generator_kwargs(i) + df = df_generator.get_dataframe(**kwargs) + lib.write(sym, df) + +# This is the only API to populate a persistent library. If we deem useful we can also add a check whether library is valid (e.g. has the correct num_symbols) +# As discussed, ideally this will be called in completely separate logic from ASV tests to avoid races, but for simplicity +# for now we can just rely on setup_cache to populate the persistant libraries if they are missing. +def populate_persistent_library_if_missing(ac, benchmark_cls, lib_name_suffix, population_policy : LibraryPopulationPolicy): + lib_name = get_library_name(LibraryType.PERSISTENT, benchmark_cls, lib_name_suffix) + if ac.has_library(lib_name): + lib = ac.create_library(lib_name) + populate_library(lib, population_policy) + +# Not sure how useful is this, one can always just keep track of the libraries created and clear them +def clear_all_modifiable_libs_from_this_process(ac, benchmark_cls): + lib_names = ac.list_libraries() + to_deletes = [lib_name for lib_name in lib_names if lib_name.startswith(f"{LibraryType.MODIFIABLE}_{benchmark_cls.__name__}_{os.getpid()}")] + for to_delete in to_deletes: + ac.delete_library(to_delete) + + + + diff --git a/python/benchmarks/real_read_write.py b/python/benchmarks/real_read_write.py index 9cc3169c6d..b02728e3c9 100644 --- a/python/benchmarks/real_read_write.py +++ b/python/benchmarks/real_read_write.py @@ -13,60 +13,52 @@ import pandas as pd from arcticdb.util.utils import DFGenerator, TimestampNumber -from arcticdb.util.environment_setup import SetupSingleLibrary, Storage +# from arcticdb.util.environment_setup import SetupSingleLibrary, Storage +from arcticdb.util.environment_setup_refactored import ( + DataFrameGenerator, + LibraryPopulationPolicy, + Storage, + LibraryType, + get_arctic_client, + get_library, + populate_library, + populate_persistent_library_if_missing, + get_logger_for_asv, +) #region Setup classes +class AllColumnTypesGenerator(DataFrameGenerator): + def get_dataframe(self, num_rows): + df = (DFGenerator(num_rows) + .add_int_col("int8", np.int8) + .add_int_col("int16", np.int16) + .add_int_col("int32", np.int32) + .add_int_col("int64", min=-26, max=31) + .add_int_col("uint64", np.uint64, min=100, max=199) + .add_float_col("float16",np.float32) + .add_float_col("float2",min=-100.0, max=200.0, round_at=4) + .add_string_col("string10", str_size=10) + .add_string_col("string20", str_size=20, num_unique_values=20000) + .add_bool_col("bool") + .add_timestamp_index("time", ReadWriteBenchmarkSettings.INDEX_FREQ, ReadWriteBenchmarkSettings.START_DATE_INDEX) + ).generate_dataframe() + return df -class ReadWriteBenchmarkSettings(SetupSingleLibrary): - """ - Setup Read Tests Library for different storages. - Its aim is to have at one place the responsibility for setting up any supported storage - with proper symbols. +class PerRowsPopulationPolicy(LibraryPopulationPolicy): + def __init__(self, num_rows: List[int]): + self.num_rows = num_rows + self.num_symbols = len(num_rows) + self.df_generator = AllColumnTypesGenerator() - It is also responsible for providing 2 libraries: - - one that will hold persistent data across runs - - one that will will hold transient data for operations which data will be wiped out - """ + def get_symbol_from_rows(self, num_rows): + return f"sym_{num_rows}" - START_DATE_INDEX = pd.Timestamp("2000-1-1") - INDEX_FREQ = 's' - - def get_last_x_percent_date_range(self, row_num, percents): - """ - Returns a date range selecting last X% of rows of dataframe - pass percents as 0.0-1.0 - """ - start = TimestampNumber.from_timestamp( - ReadWriteBenchmarkSettings.START_DATE_INDEX, ReadWriteBenchmarkSettings.INDEX_FREQ) - percent_5 = int(row_num * percents) - end_range = start + row_num - start_range = end_range - percent_5 - range = pd.date_range(start=start_range.to_timestamp(), end=end_range.to_timestamp(), freq="s") - return range - - def generate_dataframe(self, row_num:int, col_num: int) -> pd.DataFrame: - """ - Dataframe that will be used in read and write tests - """ - st = time.time() - # NOTE: Use only setup environment logger! - self.logger().info("Dataframe generation started.") - df = (DFGenerator(row_num) - .add_int_col("int8", np.int8) - .add_int_col("int16", np.int16) - .add_int_col("int32", np.int32) - .add_int_col("int64", min=-26, max=31) - .add_int_col("uint64", np.uint64, min=100, max=199) - .add_float_col("float16",np.float32) - .add_float_col("float2",min=-100.0, max=200.0, round_at=4) - .add_string_col("string10", str_size=10) - .add_string_col("string20", str_size=20, num_unique_values=20000) - .add_bool_col("bool") - .add_timestamp_index("time", ReadWriteBenchmarkSettings.INDEX_FREQ, ReadWriteBenchmarkSettings.START_DATE_INDEX) - ).generate_dataframe() - self.logger().info(f"Dataframe {row_num} rows generated for {time.time() - st} sec") - return df + def get_symbol_name(self, i): + return self.get_symbol_from_rows(self.num_rows[i]) + + def get_generator_kwargs(self, ind: int) -> pd.DataFrame: + return {"num_rows": self.num_rows[ind]} #endregion @@ -74,11 +66,8 @@ class LMDBReadWrite: """ This class is for general read write tests on LMDB - IMPORTANT: - - When we inherit from another test we inherit test, setup and teardown methods - - setup_cache() method we inherit it AS IS, thus it will be executed only ONCE for - all classes that inherit from the base class. Therefore it is perhaps best to ALWAYS - provide implementation in the child class, no matter that it might look like code repeat + Uses 1 persistent library for read tests + Uses 1 modifiable library for write tests """ rounds = 1 @@ -89,95 +78,82 @@ class LMDBReadWrite: timeout = 1200 - SETUP_CLASS = ReadWriteBenchmarkSettings(Storage.LMDB).set_params([2_500_000, 5_000_000]) - - params = SETUP_CLASS.get_parameter_list() param_names = ["num_rows"] + params = [2_500_000, 5_000_000] - def setup_cache(self): - ''' - Always provide implementation of setup_cache in - the child class - - And always return storage info which should - be first parameter for setup, tests and teardowns - ''' - lmdb_setup = LMDBReadWrite.SETUP_CLASS.setup_environment() - info = lmdb_setup.get_storage_info() - # NOTE: use only logger defined by setup class - lmdb_setup.logger().info(f"storage info object: {info}") - return info + storage = Storage.LMDB - def setup(self, storage_info, num_rows): + def setup_cache(self): ''' - This setup method for read and writes can be executed only once - No need to be executed before each test. That is why we define - `repeat` as 1 + In setup_cache we only populate the persistent libraries if they are missing. ''' - ## Construct back from arctic url the object - self.setup_env: ReadWriteBenchmarkSettings = ReadWriteBenchmarkSettings.from_storage_info(storage_info) - sym = self.setup_env.get_symbol_name(num_rows, None) - self.to_write_df = self.setup_env.get_library().read(symbol=sym).data - self.last_20 = self.setup_env.get_last_x_percent_date_range(num_rows, 20) - ## - ## Writing into library that has suffix same as process - ## will protect ASV processes from writing on one and same symbol - ## this way each one is going to have its unique library - self.write_library = self.setup_env.get_modifiable_library(os.getpid()) - - def time_read(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym) - - def peakmem_read(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym) - - def time_write(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.write_library.write(symbol=sym, data=self.to_write_df) - - def peakmem_write(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.write_library.write(symbol=sym, data=self.to_write_df) - - def time_read_with_column_float(self, storage_info, num_rows): + ac = get_arctic_client(LMDBReadWrite.storage) + population_policy = PerRowsPopulationPolicy(LMDBReadWrite.params) + populate_persistent_library_if_missing(ac, LibraryType.PERSISTENT, LMDBReadWrite, "READ_LIB", population_policy) + + def setup(self, num_rows): + self.logger = get_logger_for_asv(LMDBReadWrite) + self.ac = get_arctic_client(LMDBReadWrite.storage) + self.population_policy = PerRowsPopulationPolicy(LMDBReadWrite.params) + self.sym = self.population_policy.get_symbol_from_rows(num_rows) + # We use the same generator as the policy + self.to_write_df = self.population_policy.df_generator.get_dataframe(num_rows) + # Functions operating on differetent date ranges to be moved in some shared utils + self.last_20 = utils.get_last_x_percent_date_range(num_rows, 20) + + self.read_lib = get_library(ac, LibraryType.PERSISTENT, LMDBReadWrite, "READ_LIB") + self.write_lib = get_library(ac, LibraryType.MODIFIABLE, LMDBReadWrite, "WRITE_LIB") + # We could also populate the library like so (we don't need ) + # populate_library(self.write_lib, ) + + def teardown(self, num_rows): + # We could clear the modifiable libraries we used + # self.write_lib.clear() + pass + + def time_read(self, num_rows): + self.read_lib.read(self.sym) + + def peakmem_read(self, num_rows): + self.read_lib.read(self.sym) + + def time_write(self, num_rows): + self.write_lib.write(self.sym, self.to_write_df) + + def peakmem_write(self, num_rows): + self.write_lib.write(self.sym, self.to_write_df) + + def time_read_with_column_float(self, num_rows): COLS = ["float2"] - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, columns=COLS).data + self.read_lib.read(symbol=self.sym, columns=COLS).data - def peakmem_read_with_column_float(self, storage_info, num_rows): + def peakmem_read_with_column_float(self, num_rows): COLS = ["float2"] - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, columns=COLS).data + self.read_lib.read(symbol=self.sym, columns=COLS).data - def time_read_with_columns_all_types(self, storage_info, num_rows): + def time_read_with_columns_all_types(self, num_rows): COLS = ["float2","string10","bool", "int64","uint64"] - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, columns=COLS).data + self.read_lib.read(symbol=self.sym, columns=COLS).data - def peakmem_read_with_columns_all_types(self, storage_info, num_rows): + def peakmem_read_with_columns_all_types(self, num_rows): COLS = ["float2","string10","bool", "int64","uint64"] - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, columns=COLS).data + self.read_lib.read(symbol=self.sym, columns=COLS).data - def time_write_staged(self, storage_info, num_rows): + def time_write_staged(self, num_rows): lib = self.write_library - lib.write(f"sym", self.to_write_df, staged=True) - lib._nvs.compact_incomplete(f"sym", False, False) + lib.write(self.sym, self.to_write_df, staged=True) + lib._nvs.compact_incomplete(self.sym, False, False) - def peakmem_write_staged(self, storage_info, num_rows): + def peakmem_write_staged(self, num_rows): lib = self.write_library - lib.write(f"sym", self.to_write_df, staged=True) - lib._nvs.compact_incomplete(f"sym", False, False) + lib.write(self.sym, self.to_write_df, staged=True) + lib._nvs.compact_incomplete(self.sym, False, False) - def time_read_with_date_ranges_last20_percent_rows(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, date_range=self.last_20).data + def time_read_with_date_ranges_last20_percent_rows(self, num_rows): + self.read_lib.read(symbol=self.sym, date_range=self.last_20).data - def peakmem_read_with_date_ranges_last20_percent_rows(self, storage_info, num_rows): - sym = self.setup_env.get_symbol_name(num_rows, None) - self.setup_env.get_library().read(symbol=sym, date_range=self.last_20).data + def peakmem_read_with_date_ranges_last20_percent_rows(self, num_rows): + self.read_lib.read(symbol=self.sym, date_range=self.last_20).data class AWSReadWrite(LMDBReadWrite): """