Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sorr task918 add unit test for split universe #996

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2e89757
Added test for _split_universe()
smitpatel49 May 21, 2024
6476255
update1
smitpatel49 May 21, 2024
2abb4e9
requirements
smitpatel49 May 21, 2024
d8fabff
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 May 21, 2024
49aa798
Added the requested test
smitpatel49 May 24, 2024
e4b6acc
Updated according to the requested changes
smitpatel49 May 29, 2024
04f351a
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 May 30, 2024
886f3f6
Corrected a minor mistake
smitpatel49 May 31, 2024
1a3d24e
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 May 31, 2024
4a0c9ab
Merge branch 'SorrTask918_Add_unit_test_for_split_universe' of https:…
smitpatel49 May 31, 2024
796234d
Updated assertRaises explanation
smitpatel49 May 31, 2024
efb5bf9
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 May 31, 2024
501c886
Updated according to the review
smitpatel49 May 31, 2024
fd319b2
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 Jun 3, 2024
e61cd37
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
smitpatel49 Jun 3, 2024
350496f
Added a test and made changes according to the review
smitpatel49 Jun 3, 2024
523517f
Merge branch 'SorrTask918_Add_unit_test_for_split_universe' of https:…
smitpatel49 Jun 3, 2024
d7c0961
Added the missing code
smitpatel49 Jun 5, 2024
b681c5f
Fixed the typo and removed one test
smitpatel49 Jun 6, 2024
401c582
Merge branch 'master' into SorrTask918_Add_unit_test_for_split_universe
samarth9008 Jun 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
305 changes: 76 additions & 229 deletions im_v2/common/data/extract/test/test_extract_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
import os
import unittest.mock as umock
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import pandas as pd
import pytest

import helpers.hdatetime as hdateti
import helpers.henv as henv
import helpers.hmoto as hmoto
import helpers.hpandas as hpandas
Expand Down Expand Up @@ -820,6 +819,81 @@ def test_download_and_resample_bid_ask_data(self) -> None:
self.check_resampler()


class TestSplitUniverse(hunitest.TestCase):
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved
"""
Test that `_split_universe()` works correctly.
"""

def helper(self, group_size: int, universe_part: int) -> List:
"""
Helper function to get the actual output.
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved
"""
universe = [
"ALICE_USDT",
"GALA_USDT",
"FLOW_USDT",
"HBAR_USDT",
"INJ_USDT",
"NEAR_USDT",
]
actual_output = imvcdeexut._split_universe(
universe, group_size, universe_part
)
return actual_output

def test1(self) -> None:
"""
Test if both `group_size` and `universe_part` are 0.
"""
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved
actual_str = str(self.helper(0, 0))
expected_str = r"""
[]
"""
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved
self.assert_equal(actual_str, expected_str, fuzzy_match=True)
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved

def test2(self) -> None:
"""
Test if `group_size` is 6 and `universe_part` is 1.
"""
actual_str = str(self.helper(6, 1))
expected_str = r"""
['ALICE_USDT', 'GALA_USDT', 'FLOW_USDT', 'HBAR_USDT', 'INJ_USDT', 'NEAR_USDT']
"""
self.assert_equal(actual_str, expected_str, fuzzy_match=True)
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved

def test3(self) -> None:
"""
Test if `group_size` is 6 and `universe_part` is 2.
"""
actual_str = str(self.helper(6, 2))
expected_str = r"""
[]
"""
self.assert_equal(actual_str, expected_str, fuzzy_match=True)

def test4(self) -> None:
"""
Test to check for `group_size` 4 and `universe_part` 2.
"""
actual_str = str(self.helper(4, 2))
expected_str = r"""
['INJ_USDT', 'NEAR_USDT']
"""
self.assert_equal(actual_str, expected_str, fuzzy_match=True)

def test5(self) -> None:
"""
Test if the error is raised if no such part exists.
"""
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved
with self.assertRaises(RuntimeError) as cm:
self.helper(6, 6)
actual_str = str(cm.exception)
expected_str = r"""
Universe does not have 6 parts of 6 pairs. It has 6 symbols.
"""
self.assert_equal(actual_str, expected_str, fuzzy_match=True)


@pytest.mark.requires_ck_infra
@pytest.mark.requires_aws
@pytest.mark.skipif(
Expand Down Expand Up @@ -1746,230 +1820,3 @@ def test_resample_rt_bid_ask_data_periodically(
1 1709281440000 3384.71 1.617 3384.72 109.136 3384.715 0.005 -4.212022 3384.4 34.431 3384.41 48.920 3384.405 0.005 -0.351229 3384.71 34.431 3384.72 129.577 3384.715 0.005 -0.351229 3384.4 0.610 3384.41 48.920 3384.405 0.005 -5.358572 3384.667500 3.106187 3384.677500 104.250812 3384.672500 0.005 -4.484592 0.0579 0.0 343.623874 323.847900 binance ETH_USDT 1 2024-02-20 18:01:01+00:00
"""
self.check_df_output(resampled_df2, None, None, None, exp_df2)
smitpatel49 marked this conversation as resolved.
Show resolved Hide resolved


class TestDownloadRealtimeForOneExchangePeriodically1(
imvcddbut.TestImDbHelper, hunitest.TestCase
):
"""
Testing invariants in OHLCV, See
`docs/datapull/ck.data_pipeline.explanation.md` .
"""

@classmethod
def get_id(cls) -> int:
return hash(cls.__name__) % 10000

@pytest.mark.slow("~10 seconds.")
def test_download_websocket_ohlcv_futures1(
self,
) -> None:
"""
Test the correct download of OHLCV data with consideration for
incomplete bars. Invariant #1.
"""
# Data received from exchange in second iteration.
next_data = {}
next_data["ohlcv"] = [
[1695120600000, 1.159, 1.159, 1.157, 1.158, 127432.0],
[1695120660000, 1.159, 1.159, 1.167, 1.158, 137432.0],
]
next_data["currency_pair"] = "ETH_USDT"
next_data["end_download_timestamp"] = str(
hdateti.convert_unix_epoch_to_timestamp(1695120660000 + 59000)
)
expected = r"""
id timestamp open high low close volume currency_pair exchange_id end_download_timestamp knowledge_timestamp
0 1 1695120600000 1.159 1.159 1.157 1.158 127432.0 ETH_USDT binance 2023-09-19 10:51:00+00:00 2023-09-26 16:04:10+00:00
"""
self._test_websocket_data_download(next_data, expected)

@pytest.mark.slow("~10 seconds.")
def test_download_websocket_ohlcv_futures2(
self,
) -> None:
"""
Test the correct download of OHLCV data with consideration for complete
bars.
"""
# Data received from exchange in second iteration.
next_data = {}
next_data["ohlcv"] = [
[1695120600000, 1.159, 1.159, 1.157, 1.158, 127432.0],
[1695120660000, 1.159, 1.159, 1.167, 1.158, 137432.0],
]
next_data["currency_pair"] = "ETH_USDT"
next_data["end_download_timestamp"] = str(
hdateti.convert_unix_epoch_to_timestamp(1695120660000 + 60000)
)
expected = r"""
id timestamp open high low close volume currency_pair exchange_id end_download_timestamp knowledge_timestamp
0 1 1695120600000 1.159 1.159 1.157 1.158 127432.0 ETH_USDT binance 2023-09-19 10:51:00+00:00 2023-09-26 16:04:10+00:00
1 2 1695120660000 1.159 1.159 1.167 1.158 137432.0 ETH_USDT binance 2023-09-19 10:52:00+00:00 2023-09-26 16:04:10+00:00
"""
self._test_websocket_data_download(next_data, expected)

@pytest.mark.slow("~10 seconds.")
def test_download_websocket_ohlcv_futures3(
self,
) -> None:
"""
Test the correct download of OHLCV data with backfilling.
Invariant #2.
"""
# Data received from exchange in second iteration.
next_data = {}
next_data["ohlcv"] = [
[1695120540000, 1.159, 1.159, 1.127, 1.158, 117432.0],
[1695120600000, 1.159, 1.159, 1.157, 1.158, 127432.0],
[1695120660000, 1.159, 1.159, 1.167, 1.158, 137432.0],
]
next_data["currency_pair"] = "ETH_USDT"
next_data["end_download_timestamp"] = str(
hdateti.convert_unix_epoch_to_timestamp(1695120660000 + 60000)
)
expected = r"""
id timestamp open high low close volume currency_pair exchange_id end_download_timestamp knowledge_timestamp
0 1 1695120600000 1.159 1.159 1.157 1.158 127432.0 ETH_USDT binance 2023-09-19 10:51:00+00:00 2023-09-26 16:04:10+00:00
1 2 1695120540000 1.159 1.159 1.127 1.158 117432.0 ETH_USDT binance 2023-09-19 10:52:00+00:00 2023-09-26 16:04:10+00:00
2 4 1695120660000 1.159 1.159 1.167 1.158 137432.0 ETH_USDT binance 2023-09-19 10:52:00+00:00 2023-09-26 16:04:10+00:00
"""
self._test_websocket_data_download(next_data, expected)

@pytest.mark.slow("~10 seconds.")
def test_download_websocket_ohlcv_futures4(
self,
) -> None:
"""
Test the correct download of OHLCV data with differing duplicates.

Exchange platforms construct OHLCV bars based on ongoing trade
activity. Behind the scenes, exchanges maintain an internal
queue to store all trade data. When OHLCV data is requested,
bars are sampled from the available trades in this queue.
However, over time, the queue removes older trade data, and
consequently, OHLCV bars for the same timestamps may have
different values due to changes in the underlying trade data.
"""
# Data received from exchange in second iteration.
next_data = {}
next_data["ohlcv"] = [
[1695120600000, 1.150, 1.159, 1.157, 1.158, 107432.0],
[1695120660000, 1.159, 1.159, 1.167, 1.158, 137432.0],
]
next_data["currency_pair"] = "ETH_USDT"
next_data["end_download_timestamp"] = str(
hdateti.convert_unix_epoch_to_timestamp(1695120660000 + 60000)
)
expected = r"""
id timestamp open high low close volume currency_pair exchange_id end_download_timestamp knowledge_timestamp
0 1 1695120600000 1.159 1.159 1.157 1.158 127432.0 ETH_USDT binance 2023-09-19 10:51:00+00:00 2023-09-26 16:04:10+00:00
1 2 1695120660000 1.159 1.159 1.167 1.158 137432.0 ETH_USDT binance 2023-09-19 10:52:00+00:00 2023-09-26 16:04:10+00:00
"""
self._test_websocket_data_download(next_data, expected)

@staticmethod
def _get_argument() -> Dict[str, str]:
"""
Prepare argumnets.
"""
# Amount of downloads depends on the start time and stop time.
current_time = datetime.now()
start_time = current_time + timedelta(minutes=0, seconds=1)
stop_time = current_time + timedelta(minutes=0, seconds=3)
kwargs = {
"data_type": "ohlcv",
"exchange_id": "binance",
"universe": "v7.3",
"db_stage": "test",
"contract_type": "futures",
"vendor": "ccxt",
"db_table": "ccxt_ohlcv_futures",
"aws_profile": "ck",
"start_time": f"{start_time}",
"stop_time": f"{stop_time}",
"method": "websocket",
"download_mode": "realtime",
"downloading_entity": "manual",
"action_tag": "downloaded_200ms",
"data_format": "postgres",
"log_level": "INFO",
"websocket_data_buffer_size": None,
"db_saving_mode": "on_buffer_full",
"bid_ask_depth": 10,
"ohlcv_download_method": "from_exchange",
"universe_part": [1, 1],
}
return kwargs

@staticmethod
def _get_ohlcvs_periodical_data() -> Dict[str, Dict[str, Any]]:
"""
Get mock data for the OHLCV method.

:return: mock data
"""
data = {}
data["ohlcv"] = [
[1695120600000, 1.159, 1.159, 1.157, 1.158, 127432.0],
]
data["currency_pair"] = "ETH_USDT"
# Mocking download timestamp by adding 1 minute to show OHLCV bar is complete.
data["end_download_timestamp"] = str(
hdateti.convert_unix_epoch_to_timestamp(1695120600000 + 60000)
)
return data

def _test_websocket_data_download(
self, data: pd.DataFrame, expected: str
) -> pd.DataFrame:
"""
Test data download.

:param data: mocked data from exchange in second iteration
:param expected: Expected output from the method
:return: downloaded data
"""
# Data we get from exchange in the first iteration
mocked_data = self._get_ohlcvs_periodical_data()
# Create the database.
cursor = self.connection.cursor()
# Remove table if already exists.
cursor.execute("DROP TABLE IF EXISTS ccxt_ohlcv_futures;")
# Get query to create the `ccxt_ohlcv_futures` database.
create_db_query = imvccdbut.get_ccxt_ohlcv_futures_create_table_query()
cursor.execute(create_db_query)
# Prepare inputs.
args = self._get_argument()
with umock.patch.object(imvcdexex, "CcxtExtractor") as mock_extractor:
# Tests use special connection params, so we mock the module function.
mock_instance = umock.AsyncMock()
mock_instance.vendor = "CCXT"
mock_instance.sleep = lambda *args, **kwargs: asyncio.sleep(1)
mock_instance.download_websocket_data = umock.MagicMock(
side_effect=[mocked_data, data], return_value=None
)
mock_extractor.return_value = mock_instance
with umock.patch.object(
imvcdeexut, "_subscribe_to_websocket_data"
), umock.patch.object(
imvcdeexut.hdateti,
"get_current_time",
return_value=pd.Timestamp("2023-09-26 16:04:10+00:00"),
), umock.patch.object(
imvcdeexut.imvcddbut.DbConnectionManager,
"get_connection",
) as mock_get_connection:
mock_get_connection.return_value = self.connection
# Run the sctipt.
extractor = imvcdexex.CcxtExtractor()
coroutine = imvcdeexut._download_websocket_realtime_for_one_exchange_periodically(
args, extractor
)
asyncio.run(coroutine)
# Get downloaded data.
get_data_query = f"SELECT * FROM ccxt_ohlcv_futures;"
data = hsql.execute_query_to_df(self.connection, get_data_query)
# Check downloaded data.
actual = hpandas.df_to_str(data, num_rows=None)
self.assert_equal(actual, expected, fuzzy_match=True)
Loading