-
Notifications
You must be signed in to change notification settings - Fork 119
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ASV framework refactor proposal #2227
base: asv_s3_more
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,116 @@ | ||
import copy | ||
from abc import ABC, abstractmethod | ||
from enum import Enum | ||
import logging | ||
import os | ||
import tempfile | ||
import time | ||
import re | ||
import pandas as pd | ||
import numpy as np | ||
from typing import List, Union | ||
|
||
from arcticdb.arctic import Arctic | ||
from arcticdb.options import LibraryOptions | ||
from arcticdb.storage_fixtures.s3 import BaseS3StorageFixtureFactory, real_s3_from_environment_variables | ||
from arcticdb.util.utils import DFGenerator, ListGenerators, TimestampNumber | ||
from arcticdb.version_store.library import Library | ||
|
||
def get_logger_for_asv(bencmhark_cls): | ||
logLevel = logging.INFO | ||
logger = logging.getLogger(bencmhark_cls.__name__) | ||
logger.setLevel(logLevel) | ||
console_handler = logging.StreamHandler() | ||
console_handler.setLevel(logLevel) | ||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') | ||
console_handler.setFormatter(formatter) | ||
logger.addHandler(console_handler) | ||
return logger | ||
|
||
|
||
class Storage(Enum): | ||
AMAZON = 1 | ||
LMDB = 2 | ||
|
||
|
||
class LibraryType(Enum): | ||
PERSISTENT = "PERMANENT" | ||
MODIFIABLE = "MODIFIABLE" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would advise adding the concept of TEST library type which is persistent space for tests. Setting then throgh a property of the generator class set_test() could automatically make the code execute on separate space and not polute PERSISTENT space. |
||
|
||
|
||
# It is quite clear what is this responsible for: only dataframe generation | ||
# Using such an abstraction can help us deduplicate the dataframe generation code between the different `EnvironmentSetup`s | ||
# Note: We use a class instead of a generator function to allow caching of dataframes in the state | ||
class DataFrameGenerator(ABC): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not necessarily opposed to using a class, but if caching is the only thing we need it for, we can use something like this instead. |
||
@abstractmethod | ||
def get_dataframe(self, **kwargs): | ||
pass | ||
|
||
# The population policy is tightly paired with the `populate_library` | ||
# Using a separate abstraction for the population policy can allow us to be flexible with how we populate libraries. | ||
# E.g. One benchmark can use one population policy for read operations, a different for updates and a third for appends | ||
# Note: Using a kwargs generator to be passed to the df_generator allows customization of dataframe generation params (e.g. num_rows, num_cols, use_string_columns?) | ||
class LibraryPopulationPolicy(ABC): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed with Ivo that in previous version the setup environment class was responsible for both library and symbol generation. In this version it will be responsible only for symbol population. Then ASV test will invoke it for each library it needs to populate. Perhaps this would be ammended also with other methods, attributes:
I would suggest that I do not beleive we will have single only way to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I aggree as we discussed offline, we need more complex I'd be happy to move the Probably just a single Policy which allows many symbols / many versions / snapshot frequency can serve for most our tests |
||
def __init__(self, num_symbols: int, df_generator: DataFrameGenerator): | ||
self.num_symbols = num_symbols | ||
self.df_generator = df_generator | ||
|
||
@abstractmethod | ||
def get_symbol_name(self, ind: int): | ||
pass | ||
|
||
@abstractmethod | ||
def get_generator_kwargs(self, ind: int) -> Dict[str, Any]: | ||
pass | ||
|
||
|
||
# Currently we're using the same arctic client for both persistant and modifiable libraries. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest having a separate class for those 3 functions : get_arctic_client Perhaps SharedStorageAccess or better name? |
||
# We might decide that we want different arctic clients (e.g. different buckets) but probably not needed for now. | ||
def get_arctic_client(storage: Storage) -> Arctic: | ||
pass | ||
|
||
def get_library_name(library_type, benchmark_cls, lib_name_suffix): | ||
if library_type == LibraryType.PERSISTENT: | ||
return f"{library_type}_{benchmark_cls.__name__}_{lib_name_suffix}" | ||
if library_type == LibraryType.MODIFIABLE: | ||
# We want the modifiable libraries to be unique per process/ benchmark class. We embed this deep in the name | ||
return f"{library_type}_{benchmark_cls.__name__}_{os.getpid()}_{lib_name_suffix}" | ||
|
||
def get_library(ac : Arctic, library_type : LibraryType, benchmark_cls, lib_name_suffix : str) -> Library: | ||
lib_name = get_library_name(library_type, benchmark_cls, lib_name_suffix) | ||
if library_type == LibraryType.PERSISTENT: | ||
# TODO: Change to OpenMode=READ for persistent libraries | ||
# This however is not available in the c++ library manager API (it currently hardcodes OpenMode=DELETE). | ||
# We can expose it just for these tests if we decide so. but this is probably for a separate PR. | ||
pass | ||
return ac.get_library(lib_name, create_if_missing=True) | ||
|
||
|
||
def populate_library(lib, population_policy : LibraryPopulationPolicy): | ||
num_symbols = population_policy.num_symbols | ||
df_generator = population_policy.df_generator | ||
for i in range(num_symbols): | ||
sym = population_policy.get_symbol(i) | ||
kwargs = population_policy.get_generator_kwargs(i) | ||
df = df_generator.get_dataframe(**kwargs) | ||
lib.write(sym, df) | ||
|
||
# This is the only API to populate a persistent library. If we deem useful we can also add a check whether library is valid (e.g. has the correct num_symbols) | ||
# As discussed, ideally this will be called in completely separate logic from ASV tests to avoid races, but for simplicity | ||
# for now we can just rely on setup_cache to populate the persistant libraries if they are missing. | ||
def populate_persistent_library_if_missing(ac, benchmark_cls, lib_name_suffix, population_policy : LibraryPopulationPolicy): | ||
lib_name = get_library_name(LibraryType.PERSISTENT, benchmark_cls, lib_name_suffix) | ||
if ac.has_library(lib_name): | ||
lib = ac.create_library(lib_name) | ||
populate_library(lib, population_policy) | ||
|
||
# Not sure how useful is this, one can always just keep track of the libraries created and clear them | ||
def clear_all_modifiable_libs_from_this_process(ac, benchmark_cls): | ||
lib_names = ac.list_libraries() | ||
to_deletes = [lib_name for lib_name in lib_names if lib_name.startswith(f"{LibraryType.MODIFIABLE}_{benchmark_cls.__name__}_{os.getpid()}")] | ||
for to_delete in to_deletes: | ||
ac.delete_library(to_delete) | ||
|
||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,72 +13,61 @@ | |
import pandas as pd | ||
|
||
from arcticdb.util.utils import DFGenerator, TimestampNumber | ||
from arcticdb.util.environment_setup import SetupSingleLibrary, Storage | ||
# from arcticdb.util.environment_setup import SetupSingleLibrary, Storage | ||
from arcticdb.util.environment_setup_refactored import ( | ||
DataFrameGenerator, | ||
LibraryPopulationPolicy, | ||
Storage, | ||
LibraryType, | ||
get_arctic_client, | ||
get_library, | ||
populate_library, | ||
populate_persistent_library_if_missing, | ||
get_logger_for_asv, | ||
) | ||
|
||
|
||
#region Setup classes | ||
class AllColumnTypesGenerator(DataFrameGenerator): | ||
def get_dataframe(self, num_rows): | ||
df = (DFGenerator(num_rows) | ||
.add_int_col("int8", np.int8) | ||
.add_int_col("int16", np.int16) | ||
.add_int_col("int32", np.int32) | ||
.add_int_col("int64", min=-26, max=31) | ||
.add_int_col("uint64", np.uint64, min=100, max=199) | ||
.add_float_col("float16",np.float32) | ||
.add_float_col("float2",min=-100.0, max=200.0, round_at=4) | ||
.add_string_col("string10", str_size=10) | ||
.add_string_col("string20", str_size=20, num_unique_values=20000) | ||
.add_bool_col("bool") | ||
.add_timestamp_index("time", ReadWriteBenchmarkSettings.INDEX_FREQ, ReadWriteBenchmarkSettings.START_DATE_INDEX) | ||
).generate_dataframe() | ||
return df | ||
|
||
class ReadWriteBenchmarkSettings(SetupSingleLibrary): | ||
""" | ||
Setup Read Tests Library for different storages. | ||
Its aim is to have at one place the responsibility for setting up any supported storage | ||
with proper symbols. | ||
class PerRowsPopulationPolicy(LibraryPopulationPolicy): | ||
def __init__(self, num_rows: List[int]): | ||
self.num_rows = num_rows | ||
self.num_symbols = len(num_rows) | ||
self.df_generator = AllColumnTypesGenerator() | ||
|
||
It is also responsible for providing 2 libraries: | ||
- one that will hold persistent data across runs | ||
- one that will will hold transient data for operations which data will be wiped out | ||
""" | ||
def get_symbol_from_rows(self, num_rows): | ||
return f"sym_{num_rows}" | ||
|
||
START_DATE_INDEX = pd.Timestamp("2000-1-1") | ||
INDEX_FREQ = 's' | ||
|
||
def get_last_x_percent_date_range(self, row_num, percents): | ||
""" | ||
Returns a date range selecting last X% of rows of dataframe | ||
pass percents as 0.0-1.0 | ||
""" | ||
start = TimestampNumber.from_timestamp( | ||
ReadWriteBenchmarkSettings.START_DATE_INDEX, ReadWriteBenchmarkSettings.INDEX_FREQ) | ||
percent_5 = int(row_num * percents) | ||
end_range = start + row_num | ||
start_range = end_range - percent_5 | ||
range = pd.date_range(start=start_range.to_timestamp(), end=end_range.to_timestamp(), freq="s") | ||
return range | ||
|
||
def generate_dataframe(self, row_num:int, col_num: int) -> pd.DataFrame: | ||
""" | ||
Dataframe that will be used in read and write tests | ||
""" | ||
st = time.time() | ||
# NOTE: Use only setup environment logger! | ||
self.logger().info("Dataframe generation started.") | ||
df = (DFGenerator(row_num) | ||
.add_int_col("int8", np.int8) | ||
.add_int_col("int16", np.int16) | ||
.add_int_col("int32", np.int32) | ||
.add_int_col("int64", min=-26, max=31) | ||
.add_int_col("uint64", np.uint64, min=100, max=199) | ||
.add_float_col("float16",np.float32) | ||
.add_float_col("float2",min=-100.0, max=200.0, round_at=4) | ||
.add_string_col("string10", str_size=10) | ||
.add_string_col("string20", str_size=20, num_unique_values=20000) | ||
.add_bool_col("bool") | ||
.add_timestamp_index("time", ReadWriteBenchmarkSettings.INDEX_FREQ, ReadWriteBenchmarkSettings.START_DATE_INDEX) | ||
).generate_dataframe() | ||
self.logger().info(f"Dataframe {row_num} rows generated for {time.time() - st} sec") | ||
return df | ||
def get_symbol_name(self, i): | ||
return self.get_symbol_from_rows(self.num_rows[i]) | ||
|
||
def get_generator_kwargs(self, ind: int) -> pd.DataFrame: | ||
return {"num_rows": self.num_rows[ind]} | ||
|
||
#endregion | ||
|
||
class LMDBReadWrite: | ||
""" | ||
This class is for general read write tests on LMDB | ||
|
||
IMPORTANT: | ||
- When we inherit from another test we inherit test, setup and teardown methods | ||
- setup_cache() method we inherit it AS IS, thus it will be executed only ONCE for | ||
all classes that inherit from the base class. Therefore it is perhaps best to ALWAYS | ||
provide implementation in the child class, no matter that it might look like code repeat | ||
Uses 1 persistent library for read tests | ||
Uses 1 modifiable library for write tests | ||
""" | ||
|
||
rounds = 1 | ||
|
@@ -89,95 +78,82 @@ class LMDBReadWrite: | |
|
||
timeout = 1200 | ||
|
||
SETUP_CLASS = ReadWriteBenchmarkSettings(Storage.LMDB).set_params([2_500_000, 5_000_000]) | ||
|
||
params = SETUP_CLASS.get_parameter_list() | ||
param_names = ["num_rows"] | ||
params = [2_500_000, 5_000_000] | ||
|
||
def setup_cache(self): | ||
''' | ||
Always provide implementation of setup_cache in | ||
the child class | ||
|
||
And always return storage info which should | ||
be first parameter for setup, tests and teardowns | ||
''' | ||
lmdb_setup = LMDBReadWrite.SETUP_CLASS.setup_environment() | ||
info = lmdb_setup.get_storage_info() | ||
# NOTE: use only logger defined by setup class | ||
lmdb_setup.logger().info(f"storage info object: {info}") | ||
return info | ||
storage = Storage.LMDB | ||
|
||
def setup(self, storage_info, num_rows): | ||
def setup_cache(self): | ||
''' | ||
This setup method for read and writes can be executed only once | ||
No need to be executed before each test. That is why we define | ||
`repeat` as 1 | ||
In setup_cache we only populate the persistent libraries if they are missing. | ||
''' | ||
## Construct back from arctic url the object | ||
self.setup_env: ReadWriteBenchmarkSettings = ReadWriteBenchmarkSettings.from_storage_info(storage_info) | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.to_write_df = self.setup_env.get_library().read(symbol=sym).data | ||
self.last_20 = self.setup_env.get_last_x_percent_date_range(num_rows, 20) | ||
## | ||
## Writing into library that has suffix same as process | ||
## will protect ASV processes from writing on one and same symbol | ||
## this way each one is going to have its unique library | ||
self.write_library = self.setup_env.get_modifiable_library(os.getpid()) | ||
|
||
def time_read(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym) | ||
|
||
def peakmem_read(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym) | ||
|
||
def time_write(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.write_library.write(symbol=sym, data=self.to_write_df) | ||
|
||
def peakmem_write(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.write_library.write(symbol=sym, data=self.to_write_df) | ||
|
||
def time_read_with_column_float(self, storage_info, num_rows): | ||
ac = get_arctic_client(LMDBReadWrite.storage) | ||
population_policy = PerRowsPopulationPolicy(LMDBReadWrite.params) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How and where we write tests in this mode for populators for differenet needs?
(in previous option all tests were in environment_setup.py) |
||
populate_persistent_library_if_missing(ac, LibraryType.PERSISTENT, LMDBReadWrite, "READ_LIB", population_policy) | ||
|
||
def setup(self, num_rows): | ||
self.logger = get_logger_for_asv(LMDBReadWrite) | ||
self.ac = get_arctic_client(LMDBReadWrite.storage) | ||
self.population_policy = PerRowsPopulationPolicy(LMDBReadWrite.params) | ||
self.sym = self.population_policy.get_symbol_from_rows(num_rows) | ||
# We use the same generator as the policy | ||
self.to_write_df = self.population_policy.df_generator.get_dataframe(num_rows) | ||
# Functions operating on differetent date ranges to be moved in some shared utils | ||
self.last_20 = utils.get_last_x_percent_date_range(num_rows, 20) | ||
|
||
self.read_lib = get_library(ac, LibraryType.PERSISTENT, LMDBReadWrite, "READ_LIB") | ||
self.write_lib = get_library(ac, LibraryType.MODIFIABLE, LMDBReadWrite, "WRITE_LIB") | ||
# We could also populate the library like so (we don't need ) | ||
# populate_library(self.write_lib, ) | ||
|
||
def teardown(self, num_rows): | ||
# We could clear the modifiable libraries we used | ||
# self.write_lib.clear() | ||
pass | ||
|
||
def time_read(self, num_rows): | ||
self.read_lib.read(self.sym) | ||
|
||
def peakmem_read(self, num_rows): | ||
self.read_lib.read(self.sym) | ||
|
||
def time_write(self, num_rows): | ||
self.write_lib.write(self.sym, self.to_write_df) | ||
|
||
def peakmem_write(self, num_rows): | ||
self.write_lib.write(self.sym, self.to_write_df) | ||
|
||
def time_read_with_column_float(self, num_rows): | ||
COLS = ["float2"] | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, columns=COLS).data | ||
self.read_lib.read(symbol=self.sym, columns=COLS).data | ||
|
||
def peakmem_read_with_column_float(self, storage_info, num_rows): | ||
def peakmem_read_with_column_float(self, num_rows): | ||
COLS = ["float2"] | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, columns=COLS).data | ||
self.read_lib.read(symbol=self.sym, columns=COLS).data | ||
|
||
def time_read_with_columns_all_types(self, storage_info, num_rows): | ||
def time_read_with_columns_all_types(self, num_rows): | ||
COLS = ["float2","string10","bool", "int64","uint64"] | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, columns=COLS).data | ||
self.read_lib.read(symbol=self.sym, columns=COLS).data | ||
|
||
def peakmem_read_with_columns_all_types(self, storage_info, num_rows): | ||
def peakmem_read_with_columns_all_types(self, num_rows): | ||
COLS = ["float2","string10","bool", "int64","uint64"] | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, columns=COLS).data | ||
self.read_lib.read(symbol=self.sym, columns=COLS).data | ||
|
||
def time_write_staged(self, storage_info, num_rows): | ||
def time_write_staged(self, num_rows): | ||
lib = self.write_library | ||
lib.write(f"sym", self.to_write_df, staged=True) | ||
lib._nvs.compact_incomplete(f"sym", False, False) | ||
lib.write(self.sym, self.to_write_df, staged=True) | ||
lib._nvs.compact_incomplete(self.sym, False, False) | ||
|
||
def peakmem_write_staged(self, storage_info, num_rows): | ||
def peakmem_write_staged(self, num_rows): | ||
lib = self.write_library | ||
lib.write(f"sym", self.to_write_df, staged=True) | ||
lib._nvs.compact_incomplete(f"sym", False, False) | ||
lib.write(self.sym, self.to_write_df, staged=True) | ||
lib._nvs.compact_incomplete(self.sym, False, False) | ||
|
||
def time_read_with_date_ranges_last20_percent_rows(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, date_range=self.last_20).data | ||
def time_read_with_date_ranges_last20_percent_rows(self, num_rows): | ||
self.read_lib.read(symbol=self.sym, date_range=self.last_20).data | ||
|
||
def peakmem_read_with_date_ranges_last20_percent_rows(self, storage_info, num_rows): | ||
sym = self.setup_env.get_symbol_name(num_rows, None) | ||
self.setup_env.get_library().read(symbol=sym, date_range=self.last_20).data | ||
def peakmem_read_with_date_ranges_last20_percent_rows(self, num_rows): | ||
self.read_lib.read(symbol=self.sym, date_range=self.last_20).data | ||
|
||
class AWSReadWrite(LMDBReadWrite): | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that both versions are ok.
The original version main goal was to separate the creation structure from the ASV class. That is something that needed to be done because of the persostence library. Only this way the code for generating the environment could be safely tested without making changes to the tests - you fix the logic of generating the structure of objects outside of ASV tests then you go to ASV tests and use it wherever you want. So the class although monolythic has this main function. That makes the setup code llittle more hard to understand but easy to reuse outside of ASV as it does not follow ASV guidelines. But the benchamrk tests are mainly because of the tests not about the setup struct, so once developed the structure would be unwise to change due to loss of history for benchamrks.
The proposed version indeed makes at first glance perhaps things more clear during the review process as the code will be inside the ASV test, primarily. Having more decomposed objects perhaps would increase reusability. Still there are things that are missing from the model and the final solution would be more thick and perhaps again not quite clean here and there when missing things are added.
Thus for me it is ok to go to new way of constucting tests, just have to say that during adaptations of this approach to different benchamrks its cleanes might suffer.