Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AmpTask1786_Integrate_20240513 #981

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/config/config_.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
# _LOG.debug = lambda *_: 0


# Set config version.
# See `docs/kaizenflow/ck.system_config.explanation.md` for detailed info.
CONFIG_VERSION = "v3"


# Placeholder value used in configs, when configs are built in multiple phases.
DUMMY = "__DUMMY__"

Expand Down Expand Up @@ -996,6 +1001,9 @@ def save_to_file(self, log_dir: str, tag: str) -> None:

:param tag: basename of the files to save (e.g., "system_config.input")
"""
# 0) Save txt file with config version.
file_name = os.path.join(log_dir, "config_version.txt")
hio.to_file(file_name, CONFIG_VERSION)
# 1) As a string.
file_name = os.path.join(log_dir, f"{tag}.txt")
hio.to_file(file_name, repr(self))
Expand Down
9 changes: 5 additions & 4 deletions core/config/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,11 +139,12 @@ def load_config_from_pickle(config_path: str) -> cconconf.Config:
hdbg.dassert_path_exists(config_path)
_LOG.debug("Reading config from %s", config_path)
config = hpickle.from_pickle(config_path)
# TODO(Dan): `config` should be a `cconconf.Config` but previously it
# used to be dict, so keeping both for back-compatibility, see CmTask6627.
if isinstance(config, dict):
_LOG.warning("Found Config v1.0 flow: converting")
config = cconconf.Config.from_dict(config)
# _LOG.warning("Found Config v1.0 flow: converting")
# config = cconconf.Config.from_dict(config)
raise TypeError(
f"Found Config v1.0 flow at '{config_path}'. Deprecated in CmTask7794."
)
return config


Expand Down
6 changes: 3 additions & 3 deletions core/config/test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1878,9 +1878,9 @@ def helper(self, value: Optional[str]) -> None:
log_dir, f"{tag}.all_values_picklable.pkl"
)
# Check that file paths exist.
hdbg.dassert_path_exists(expected_txt_path)
hdbg.dassert_path_exists(expected_pkl_str_path)
hdbg.dassert_path_exists(expected_pkl_path)
self.assertTrue(os.path.exists(expected_txt_path))
self.assertTrue(os.path.exists(expected_pkl_str_path))
self.assertTrue(os.path.exists(expected_pkl_path))

def test1(self) -> None:
"""
Expand Down
1 change: 1 addition & 0 deletions core/finance/prediction_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def compute_bar_start_timestamps(
return srs


# TODO(Paul): Add unit tests.
def compute_epoch(
data: Union[pd.Series, pd.DataFrame], *, unit: Optional[str] = None
) -> Union[pd.Series, pd.DataFrame]:
Expand Down
1 change: 1 addition & 0 deletions core/signal_processing/misc_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ def skip_apply_func(
return df


# TODO(Paul): Add test coverage.
def sign_normalize(
signal: Union[pd.DataFrame, pd.Series],
atol: float = 0,
Expand Down
6 changes: 3 additions & 3 deletions data_schema/changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ v3

v3
- 2022-11-30
- Initial version which support Sorrentum protocol
- based on Sorrentum protocol v0.1
- Initial version which support KaizenFlow protocol
- based on KaizenFlow protocol v0.1
- prod data stored in s3://cryptokaizen-data/v3
- values for universe version and dataset versions are hardcoded, will be addressed later

v2
- Before Sorrentum protocol was put in place
- Before KaizenFlow protocol was put in place
- prod data stored in s3://cryptokaizen-data/reorg
101 changes: 63 additions & 38 deletions data_schema/dataset_schema_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import data_schema.dataset_schema_utils as dsdascut
"""

# TODO(Juraj): At high level this module essentially performs the same thing as
# im_v2/common/universe/universe.py -> try to extract the common logic
# according to DRY principle.

import copy
import logging
import os
Expand All @@ -15,22 +19,26 @@
import helpers.hstring as hstring
import im_v2.common.universe.universe as imvcounun

# TODO(Juraj): At high level this module essentially performs the same thing as
# im_v2/common/universe/universe.py -> try to extract the common logic
# according to DRY principle.
_LOG = logging.getLogger(__name__)


# #############################################################################
# Retrieve schema.
# #############################################################################


def _get_dataset_schema_file_path(*, version: Optional[str] = None) -> str:
"""
Get dataset schema file path based on version.
Get file path of the dataset schema based on the version.

:param version: dataset schema version (e.g. "v01"). If None it uses
the latest version available
:return: file path to the dataset schema file corresponding to the
specified version
"""
# TODO(Juraj): Implement dynamic version resolving and remove hardcoded logic.
if version is not None:
raise ValueError("Dynamic custom version not supported.")
ds_file_path = os.path.join(
hgit.get_amp_abs_path(),
"data_schema/dataset_schema_versions/dataset_schema_v3.json",
Expand All @@ -42,30 +50,20 @@ def _get_dataset_schema_file_path(*, version: Optional[str] = None) -> str:

def get_dataset_schema(*, version: Optional[str] = None) -> Dict[str, Any]:
"""
Get dataset schema for a specified version, if version is None fetch the
Get dataset schema for a specified version, if version is `None` fetch the
latest version of the schema.

:param version: dataset schema version (e.g. "v01") to load. If
None, the latest version is loaded.
:return: dataset schema as a nested dictionary, e.g.
```
{
"dataset_signature":
"download_mode.downloading_entity.action_tag",
"token_separator_character": ".",
"allowed_values": {
"download_mode": ["bulk", "periodic_daily"],
"downloading_entity": ["airflow", "manual"],
"action_tag": ["downloaded_1sec", "resampled_1min"]
}
"version": "v3"
}
```
:return: dataset schema as a nested dictionary, e.g. ``` {
"dataset_signature":
"download_mode.downloading_entity.action_tag",
"token_separator_character": ".", "allowed_values": {
"download_mode": ["bulk", "periodic_daily"],
"downloading_entity": ["airflow", "manual"], "action_tag":
["downloaded_1sec", "resampled_1min"] } "version": "v3" } ```
"""
# TODO(Juraj): Implement loading custom version of schema.
if version is not None:
raise ValueError("Dynamic custom version not supported.")
# Load dataset schema as json.
# Load dataset schema as JSON.
ds_file_path = _get_dataset_schema_file_path()
dataset_schema = hio.from_json(ds_file_path)
# Resolve version name.
Expand All @@ -79,13 +77,18 @@ def get_dataset_schema(*, version: Optional[str] = None) -> Dict[str, Any]:
return dataset_schema


# #############################################################################
# Validate schema.
# #############################################################################


def _validate_dataset_signature_syntax(
signature: str, dataset_schema: Dict[str, Any]
) -> bool:
"""
Validate syntax of a dataset signature based on provided schema.

For example refer to docstirng of
For example refer to docstring of
data_schema/validate_dataset_signature.py

:param signature: dataset signature to validate
Expand Down Expand Up @@ -113,7 +116,7 @@ def _validate_dataset_signature_semantics(
"""
Validate semantics of a dataset signature based on provided schema.

For example refer to docstirng of
For example refer to docstring of
data_schema/validate_dataset_signature.py

:param signature: dataset signature to validate
Expand Down Expand Up @@ -168,20 +171,20 @@ def validate_dataset_signature(
schema.

For example refer to the docstring of
data_schema/validate_dataset_signature.py
`data_schema/validate_dataset_signature.py`

:param signature: dataset signature to validate
:param dataset_schema: dataset schema to validate against
:return: True if the signature is syntactically AND semantically
correct, False otherwise
"""
# TODO(Juraj): Ideally this function should
# encapsulate a final state machine-like validator
# but for now this more primitive check is good enough.
# TODO(Juraj): Ideally this function should encapsulate a final state
# machine-like validator but for now this more primitive check is good
# enough.
# Check syntax of the signature.
# Currently the smenatic check implicitly decides the syntactic check
# as well, but later down the line the syntax/semantics
# distinction might make sense.
# Currently the smenatic check implicitly decides the syntactic check as
# well, but later down the line the syntax/semantics distinction might make
# sense.
is_correct_signature = _validate_dataset_signature_syntax(
signature, dataset_schema
)
Expand All @@ -196,6 +199,11 @@ def validate_dataset_signature(
return is_correct_signature


# #############################################################################
#
# #############################################################################


def _build_dataset_signature_from_args(
args: Dict[str, Any], dataset_schema: Dict[str, Any]
) -> str:
Expand All @@ -210,8 +218,8 @@ def _build_dataset_signature_from_args(
schema_signature_list = dataset_schema["dataset_signature"].split(
token_separator_char
)
# Replace schema signature identifiers with the actual values
# if an argument is missing and exception is raised.
# Replace schema signature identifiers with the actual values if an
# argument is missing and exception is raised.
try:
dataset_signature = list(map(lambda x: args[x], schema_signature_list))
except KeyError as e:
Expand All @@ -232,6 +240,7 @@ def parse_dataset_signature_to_args(
e.g. `bulk.airflow.resampled_1min.parquet.bid_ask.spot.v3.crypto_chassis.binance.v1_0_0`
:param dataset_schema: dataset schema to parse against
:return: signature arguments mapping, e.g.
```
{
"download_mode": "bulk",
"downloading_entity": "airflow",
Expand All @@ -244,6 +253,7 @@ def parse_dataset_signature_to_args(
"exchange_id": "binance",
"version": "v1_0_0"
}
```
"""
hdbg.dassert_eq(validate_dataset_signature(signature, dataset_schema), True)
token_separator = dataset_schema["token_separator_character"]
Expand All @@ -253,6 +263,11 @@ def parse_dataset_signature_to_args(
return args


# #############################################################################
# S3 file interface.
# #############################################################################


def get_vendor_from_s3_path(s3_path: str) -> str:
"""
Extract vendor from S3 path.
Expand All @@ -279,8 +294,13 @@ def build_s3_dataset_path_from_args(
raised.

:param s3_base_path: Base S3 path to use, i.e.
's3://cryptokaizen-data'
:param s3_base_path: Base S3 path to use, i.e. 's3://cryptokaizen-data'
's3://cryptokaizen-data'
:param s3_base_path: Base S3 path to use, i.e.
's3://cryptokaizen-data'
:param s3_base_path: Base S3 path to use, i.e. 's3://cryptokaizen-
data'
:param s3_base_path: Base S3 path to use, i.e. 's3://cryptokaizen-
data'
:param args: arguments to build the dataset signature from
:param version: version of the dataset schema to use, if None,
latest version
Expand All @@ -289,8 +309,8 @@ def build_s3_dataset_path_from_args(
s3_path = s3_base_path
schema = get_dataset_schema(version=version)
s3_path = os.path.join(s3_path, schema["version"])
# TODO(Juraj): If preprocessing operations pile up,
# divide them into separate functions.
# TODO(Juraj): If preprocessing operations pile up, divide them into
# separate functions.
if _args.get("universe"):
_args["universe"] = _args["universe"].replace(".", "_")
dataset_signature = _build_dataset_signature_from_args(_args, schema)
Expand All @@ -305,6 +325,11 @@ def build_s3_dataset_path_from_args(
return s3_path


# #############################################################################
# IM interface.
# #############################################################################


def get_im_db_table_name_from_signature(
signature: str, dataset_schema: Dict[str, Any]
) -> str:
Expand Down
17 changes: 9 additions & 8 deletions data_schema/validate_dataset_signature.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#!/usr/bin/env python
"""
Perform syntactic and semantic validation of a specified dataset signature.
Signature is validated by the latest dataset schema version. Syntax validation
checks if the signature is not malformed.

- If the schema specifies dataset signature as {data_type}.{asset_type},
then ohlcv.futures is a valid signatue, but ohlcv-futures is not.
Semantic validation checks if the signature tokens are correct.
Signature is validated by the latest dataset schema version.

Syntax validation checks if the signature is not malformed.
- If the schema specifies dataset signature as `{data_type}.{asset_type}`, then
`ohlcv.futures` is a valid signatue, but `ohlcv-futures` is not.

- If the schema specifies allowed values for data_type = ["ohlcv", "bid_ask"],
then for dataset signature {data_type}.{asset_type} ohlcv.futures is a valid
signature, but bidask.futures is not.
Semantic validation checks if the signature tokens are correct.
- If the schema specifies allowed values for `data_type = ["ohlcv", "bid_ask"]`,
then for dataset signature `{data_type}.{asset_type}` `ohlcv.futures` is a valid
signature, but `bidask.futures` is not.

Use as:
> data_schema/validate_dataset_signature.py \
Expand Down
Loading
Loading