Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AmpTask1786_Integrate_20240804 #1110

Merged
merged 3 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions core/config/config_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,18 @@ def get_config_list_from_builder(config_builder: str) -> ccocolis.ConfigList:


def get_notebook_config(
*,
config_file_path: Optional[str] = None,
# TODO(Grisha): it's more general to do `dst_region: Optional[str] = None`,
# where `None` means keep the original region and any other value means
# replace the original region with the `dst_region`.
replace_ecs_tokyo: Optional[bool] = False,
) -> Optional[cconconf.Config]:
"""
Get the config from the environment variables or from a file.
:param config_file_path: path to a config file
:param replace_ecs_tokyo: if True replace `ecs_tokyo` to `ecs` in the path
:return: the config or `None` if env vars are not set and `config_file_path`
is None.
"""
Expand All @@ -83,11 +89,15 @@ def get_notebook_config(
"Config env vars not set. Using config from the pickle file: %s",
config_file_path,
)
config_file_path = hdocker.replace_shared_root_path(config_file_path)
config_file_path = hdocker.replace_shared_root_path(
config_file_path, replace_ecs_tokyo=replace_ecs_tokyo
)
config = hpickle.from_pickle(config_file_path)
# To run locally we need to replace path to the shared folder, e.g.,
# `/data/shared` -> `/shared_data`.
config = ccocouti.replace_shared_dir_paths(config)
config = ccocouti.replace_shared_dir_paths(
config, replace_ecs_tokyo=replace_ecs_tokyo
)
else:
config = None
_LOG.warning("No config found, returning None")
Expand Down
58 changes: 55 additions & 3 deletions core/config/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import collections
import copy
import logging
import os
import re
from typing import Any, Iterable, List, Optional

Expand All @@ -17,6 +18,7 @@
import helpers.hdbg as hdbg
import helpers.hdict as hdict
import helpers.hdocker as hdocker
import helpers.hio as hio
import helpers.hpickle as hpickle
import helpers.hprint as hprint

Expand Down Expand Up @@ -50,10 +52,14 @@ def configs_to_str(configs: List[cconconf.Config]) -> str:
return res


def replace_shared_dir_paths(config: cconconf.Config) -> cconconf.Config:
def replace_shared_dir_paths(
config: cconconf.Config, *, replace_ecs_tokyo: Optional[bool] = False
) -> cconconf.Config:
"""
Replace all the root paths of the shared data directory in the config.
:param config: config to update
:param replace_ecs_tokyo: if True replace `ecs_tokyo` to `ecs` in the path
:return: updated version of a config
"""
new_config = config.copy()
Expand All @@ -64,10 +70,14 @@ def replace_shared_dir_paths(config: cconconf.Config) -> cconconf.Config:
for key in config.keys():
value = config[key]
if isinstance(value, cconconf.Config):
value = replace_shared_dir_paths(value)
value = replace_shared_dir_paths(
value, replace_ecs_tokyo=replace_ecs_tokyo
)
elif isinstance(value, str):
# Search for file paths among string values only.
value = hdocker.replace_shared_root_path(value)
value = hdocker.replace_shared_root_path(
value, replace_ecs_tokyo=replace_ecs_tokyo
)
else:
# No need to change values other than strings.
pass
Expand Down Expand Up @@ -132,6 +142,48 @@ def sort_config_string(txt: str) -> str:
return chunks


def load_config_from_pickle1(log_dir: str, tag: str) -> cconconf.Config:
"""
Load config from a pickle file.
:param log_dir: path to execution logs
:param tag: basename of the pickle file (e.g.,
"system_config.output")
:return: config object
"""
hdbg.dassert_dir_exists(log_dir)
# TODO(Grisha): centralize version file name somehow, e.g., move to the `Config` class.
# Build path to config version file.
config_version_filename = "config_version.txt"
config_version_path = os.path.join(log_dir, config_version_filename)
if os.path.exists(config_version_path):
# Extract config version from the corresponding file.
config_version = hio.from_file(config_version_path)
# TODO(Grisha): centralize file name, e.g., move to the `Config` class.
# Set file name that corresponds to the extracted config version.
file_name = f"{tag}.all_values_picklable.pkl"
config_path = os.path.join(log_dir, file_name)
else:
# Only v2 config version has no version file.
config_version = "v2"
# Set file name corresponding to v2 config version.
file_name = f"{tag}.values_as_strings.pkl"
_LOG.info(f"Found Config {config_version} flow")
# Get config from file.
config_path = os.path.join(log_dir, file_name)
hdbg.dassert_path_exists(config_path)
_LOG.debug("Reading config from %s", config_path)
config = hpickle.from_pickle(config_path)
if isinstance(config, dict):
# _LOG.warning("Found Config v1.0 flow: converting")
# config = cconconf.Config.from_dict(config)
raise TypeError(
f"Found Config v1.0 flow at '{config_path}'. Deprecated in CmTask7794."
)
return config


# TODO(Dan): Replace with `load_config_from_pickle1()` in CmTask7795.
def load_config_from_pickle(config_path: str) -> cconconf.Config:
"""
Load config from pickle file.
Expand Down
92 changes: 80 additions & 12 deletions core/config/test/test_config_utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import argparse
import collections
import os
import unittest.mock as umock
from typing import Any

import pandas as pd

import core.config as cconfig
import helpers.hio as hio
import helpers.hpandas as hpandas
import helpers.hprint as hprint
import helpers.hunit_test as hunitest
Expand Down Expand Up @@ -564,8 +566,9 @@ def test_replace_shared_dir_paths(self) -> None:
"""
# Mock `henv.execute_repo_config_code()` to return a dummy mapping.
mock_mapping = {
"/shared_folder1": "/data/shared1",
"/shared_folder2": "/data/shared2",
"/ecs_tokyo": "/ecs",
"/data/shared1": "/shared_folder1",
"/data/shared2": "/shared_folder2",
}
with umock.patch.object(
cconfig.hdocker.henv,
Expand All @@ -575,26 +578,91 @@ def test_replace_shared_dir_paths(self) -> None:
# Initial Config.
initial_config = cconfig.Config.from_dict(
{
"key1": "/shared_folder1/asset1",
"key2": "/shared_folder2/asset1/item",
"key1": "/data/shared1/asset1",
"key2": "/data/shared2/asset1/item",
"key3": 1,
"key4": 'object("/shared_folder2/asset1/item")',
"key4": 'object("/data/shared2/asset1/item")',
"key5": {
"key5.1": "/shared_folder1/asset1",
"key5.2": "/shared_folder2/asset2",
"key5.1": "/data/shared1/asset1",
"key5.2": "/data/shared2/asset2",
},
"key6": "/data/shared1/ecs_tokyo/some_path",
}
)
actual_config = cconfig.replace_shared_dir_paths(initial_config)
# Check that shared root paths have been replaced.
act = str(actual_config)
exp = """
key1: /data/shared1/asset1
key2: /data/shared2/asset1/item
key1: /shared_folder1/asset1
key2: /shared_folder2/asset1/item
key3: 1
key4: object("/data/shared2/asset1/item")
key4: object("/shared_folder2/asset1/item")
key5:
key5.1: /data/shared1/asset1
key5.2: /data/shared2/asset2
key5.1: /shared_folder1/asset1
key5.2: /shared_folder2/asset2
key6: /shared_folder1/ecs/some_path
"""
self.assert_equal(act, exp, fuzzy_match=True)


# #############################################################################
# Test_load_config_from_pickle1
# #############################################################################


class Test_load_config_from_pickle1(hunitest.TestCase):
def helper(
self, expected_config_version: str, expected_signature: str
) -> None:
# Set config.
log_dir = self.get_scratch_space()
tag = "system_config.output"
nested: Dict[str, Any] = {
"key1": "val",
"key2": {"key3": {"key4": [1, 2, 3]}},
}
config = cconfig.Config.from_dict(nested)
# Save config and related files.
config.save_to_file(log_dir, tag)
# Check config version file for different config versions.
config_version_path = os.path.join(log_dir, "config_version.txt")
if expected_config_version == "v2":
# v2 config version has no file with info about it.
hio.delete_file(config_version_path)
else:
# Check config version.
actual_config_version = hio.from_file(config_version_path)
self.assert_equal(actual_config_version, expected_config_version)
# Load config from the file.
actual = cconfig.load_config_from_pickle1(log_dir, tag)
# Check signature.
actual_signature = str(actual)
self.assert_equal(actual_signature, expected_signature, fuzzy_match=True)

def test_v2_config1(self) -> None:
"""
Check that v2 config is extracted correctly.
"""
expected_config_version = "v2"
# Integer values are extracted as strings as expected for v2.
expected_signature = r"""
key1: val
key2:
key3:
key4: ['1', '2', '3']
"""
self.helper(expected_config_version, expected_signature)

def test_v3_config1(self) -> None:
"""
Check that v3 config is extracted correctly.
"""
expected_config_version = "v3"
# Integer values are extracted in original format as expected for v3.
expected_signature = r"""
key1: val
key2:
key3:
key4: [1, 2, 3]
"""
self.helper(expected_config_version, expected_signature)
26 changes: 12 additions & 14 deletions core/finance/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
_LOG = logging.getLogger(__name__)


def _generate_limit_order_price(
# TODO(gp): -> private
def generate_limit_order_price(
df: pd.DataFrame,
bid_col: str,
ask_col: str,
Expand Down Expand Up @@ -242,20 +243,17 @@ def generate_limit_orders_and_estimate_execution(
:param df: datetime-indexed dataframe with price data
:param bid_col: bid col for estimating buy limit order execution
:param ask_col: ask col for estimating sell limit order execution
:param buy_reference_price_col: reference price col for buy limit
orders
:param buy_reference_price_col: reference price col for buy limit orders
:param buy_spread_frac_offset: amount to add to buy reference price
:param sell_reference_price_col: reference price col for sell limit
orders
:param sell_spread_frac_offset: amount to add to sell reference
price
:param subsample_freq: as in `_generate_limit_order_price()`
:param freq_offset: as in `_generate_limit_order_price()`
:param ffill_limit: as in `_generate_limit_order_price()`
:param tick_decimals: as in `_generate_limit_order_price()`
:param sell_reference_price_col: reference price col for sell limit orders
:param sell_spread_frac_offset: amount to add to sell reference price
:param subsample_freq: as in `generate_limit_order_price()`
:param freq_offset: as in `generate_limit_order_price()`
:param ffill_limit: as in `generate_limit_order_price()`
:param tick_decimals: as in `generate_limit_order_price()`
:return: dataframe with limit order price and execution cols
"""
limit_order_prices = _generate_limit_order_price(
limit_order_prices = generate_limit_order_price(
df,
bid_col,
ask_col,
Expand Down Expand Up @@ -341,8 +339,8 @@ def compute_bid_ask_execution_quality(
:param ask_col: top-of-book ask price col
:param buy_trade_price_col: price at which a "buy" was executed
:param sell_trade_price_col: price at which a "sell" was executed
:return: dataframe with several notions of execution quality, in
notional amounts and relative amounts (bps or pct)
:return: dataframe with several notions of execution quality, in notional
amounts and relative amounts (bps or pct)
"""
data = []
# Compute midpoint and spread.
Expand Down
7 changes: 7 additions & 0 deletions core/finance/portfolio_df_processing/slippage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
_LOG = logging.getLogger(__name__)


# TODO(Grisha): add a flag for benchmark price. Currently the function is overfit
# to benchmark “arrival price”.
def compute_share_prices_and_slippage(
portfolio_df: pd.DataFrame,
join_output_with_input: bool = False,
Expand Down Expand Up @@ -59,6 +61,11 @@ def compute_share_prices_and_slippage(
else:
# TODO(Paul): Perform checks on indices.
holdings_price_per_share = price_df
# Shift is needed for “arrival price” benchmark but not for TWAP/VWAP benchmarks.
# We label trades executed between T and T+1 as T+1. In this case "arrival price"
# is price at T (shift by 1 lag) and TWAP/VWAP price between T and T+1 is available
# only at T+1 (no shift needed).
holdings_price_per_share = holdings_price_per_share.shift(1)
# We do not expect negative prices.
hdbg.dassert_lte(0, holdings_price_per_share.min().min())
# Compute price per share of trades (using execution reference prices).
Expand Down
2 changes: 1 addition & 1 deletion core/finance/share_quantization.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def quantize_shares(
quantized_shares = quantized_shares.round(asset_id_to_decimals)
# If series-to-dataframe conversion was performed, undo it here.
if isinstance(shares, pd.Series):
quantized_shares = quantized_shares.squeeze()
quantized_shares = quantized_shares.squeeze(axis=0)
quantized_shares.name = shares.name
_LOG.debug("`quantized_shares`=\n%s", hpandas.df_to_str(quantized_shares))
return quantized_shares
2 changes: 1 addition & 1 deletion core/finance/test/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get_limit_order_prices(
ffill_limit = 3
tick_decimals = 2
_LOG.debug("data=\n%s", hpandas.df_to_str(data, num_rows=None))
limit_order_prices = cfinexec._generate_limit_order_price(
limit_order_prices = cfinexec.generate_limit_order_price(
data,
bid_col,
ask_col,
Expand Down
Loading
Loading