diff --git a/pdr_backend/lake/gql_data_factory.py b/pdr_backend/lake/gql_data_factory.py index a3619b55a..8ed309874 100644 --- a/pdr_backend/lake/gql_data_factory.py +++ b/pdr_backend/lake/gql_data_factory.py @@ -13,6 +13,10 @@ get_pdr_subscriptions_df, subscriptions_schema, ) +from pdr_backend.lake.table_pdr_payouts import ( + get_pdr_payouts_df, + payouts_schema, +) from pdr_backend.ppss.ppss import PPSS from pdr_backend.subgraph.subgraph_predictions import get_all_contract_ids_by_owner from pdr_backend.util.networkutil import get_sapphire_postfix @@ -42,6 +46,10 @@ def __init__(self, ppss: PPSS): network=network, ) contract_list = [f.lower() for f in contract_list] + + # For debugging + # t_contract_list = [f.lower() for f in contract_list] + # contract_list = [t_contract_list[0], t_contract_list[1]] # configure all tables that will be recorded onto lake self.record_config = { @@ -59,6 +67,13 @@ def __init__(self, ppss: PPSS): "contract_list": contract_list, }, }, + "pdr_payouts": { + "fetch_fn": get_pdr_payouts_df, + "schema": payouts_schema, + "config": { + "contract_list": contract_list, + }, + }, } def get_gql_dfs(self) -> Dict[str, pl.DataFrame]: diff --git a/pdr_backend/lake/table_pdr_payouts.py b/pdr_backend/lake/table_pdr_payouts.py new file mode 100644 index 000000000..c8415b70f --- /dev/null +++ b/pdr_backend/lake/table_pdr_payouts.py @@ -0,0 +1,46 @@ +from typing import Dict +import polars as pl +from enforce_typing import enforce_types +from polars import Int64, Float64, Utf8 + +from pdr_backend.subgraph.subgraph_payout import fetch_payouts +from pdr_backend.lake.plutil import _object_list_to_df +from pdr_backend.util.networkutil import get_sapphire_postfix +from pdr_backend.util.timeutil import ms_to_seconds + +# RAW PAYOUT SCHEMA +payouts_schema = { + "ID": Utf8, + "token": Utf8, + "user": Utf8, + "slot": Int64, + "timestamp": Int64, + "payout": Float64, +} + + +@enforce_types +def get_pdr_payouts_df( + network: str, st_ut: int, fin_ut: int, config: Dict +) -> pl.DataFrame: + """ + @description + Fetch raw payouts from predictoor subgraph + Update function for graphql query, returns raw data + + Transforms ts into ms as required for data factory + """ + network = get_sapphire_postfix(network) + + # fetch payouts + payouts = fetch_payouts( + config["contract_list"], ms_to_seconds(st_ut), ms_to_seconds(fin_ut), 0, network + ) + + if len(payouts) == 0: + print("No payouts to fetch. Exit.") + return pl.DataFrame() + + # convert payouts to df and transform timestamp into ms + payout_df = _object_list_to_df(payouts, payouts_schema) + + return payout_df diff --git a/pdr_backend/lake/test/conftest.py b/pdr_backend/lake/test/conftest.py index 1b9ff717d..a76f4d9b8 100644 --- a/pdr_backend/lake/test/conftest.py +++ b/pdr_backend/lake/test/conftest.py @@ -2,6 +2,7 @@ from pdr_backend.subgraph.prediction import mock_daily_predictions from pdr_backend.subgraph.subscription import mock_subscriptions +from pdr_backend.subgraph.payout import mock_payouts @pytest.fixture() @@ -12,3 +13,8 @@ def sample_daily_predictions(): @pytest.fixture() def sample_subscriptions(): return mock_subscriptions() + + +@pytest.fixture() +def sample_payouts(): + return mock_payouts() diff --git a/pdr_backend/lake/test/test_table_pdr_payouts.py b/pdr_backend/lake/test/test_table_pdr_payouts.py new file mode 100644 index 000000000..4ac67af40 --- /dev/null +++ b/pdr_backend/lake/test/test_table_pdr_payouts.py @@ -0,0 +1,149 @@ +from typing import List +from unittest.mock import patch + +import polars as pl +from enforce_typing import enforce_types + +from pdr_backend.lake.table_pdr_payouts import payouts_schema +from pdr_backend.lake.test.resources import _gql_data_factory, _filter_gql_config +from pdr_backend.ppss.web3_pp import del_network_override +from pdr_backend.util.timeutil import timestr_to_ut + +# ==================================================================== +pdr_payouts_record = "pdr_payouts" + + +@patch("pdr_backend.lake.table_pdr_payouts.fetch_payouts") +@patch("pdr_backend.lake.gql_data_factory.get_all_contract_ids_by_owner") +def test_update_payout_gql_proxy( + mock_get_all_contract_ids_by_owner, + mock_fetch_payouts, + tmpdir, + sample_payouts, + monkeypatch, +): + del_network_override(monkeypatch) + mock_get_all_contract_ids_by_owner.return_value = ["0x123"] + _test_update_payout_gql( + mock_fetch_payouts, + tmpdir, + sample_payouts, + "2023-01-01_0:00", # earlier date + "2024-01-04_17:00", # later date + n_items=6, + ) + + +@enforce_types +def _test_update_payout_gql( + mock_fetch_payouts, + tmpdir, + sample_payouts, + st_timestr: str, + fin_timestr: str, + n_items: int, +): + """ + @arguments + n_items -- expected # payouts. Typically int. If '>1K', expect >1000 + """ + + _, gql_data_factory = _gql_data_factory( + tmpdir, + "binanceus ETH/USDT h 5m", + st_timestr, + fin_timestr, + ) + + # Update subscriptions record only + default_config = gql_data_factory.record_config + gql_data_factory.record_config = _filter_gql_config( + gql_data_factory.record_config, pdr_payouts_record + ) + + # setup: filename + # everything will be inside the gql folder + filename = gql_data_factory._parquet_filename(pdr_payouts_record) + assert ".parquet" in filename + + fin_ut = timestr_to_ut(fin_timestr) + st_ut = gql_data_factory._calc_start_ut(filename) + + # calculate ms locally so we can filter raw subscriptions + st_ut_sec = st_ut // 1000 + fin_ut_sec = fin_ut // 1000 + + mock_fetch_payouts.return_value = sample_payouts + + # work 1: update parquet + gql_data_factory._update(fin_ut) + + # assert params + mock_fetch_payouts.assert_called_with( + ["0x123"], + st_ut_sec, + fin_ut_sec, + 0, + "mainnet", + ) + + # read parquet and columns + def _payouts_in_parquet(filename: str) -> List[int]: + df = pl.read_parquet(filename) + assert df.schema == payouts_schema + return df["timestamp"].to_list() + + # assert expected length of payouts in parquet + payouts: List[int] = _payouts_in_parquet(filename) + if isinstance(n_items, int): + assert len(payouts) == n_items + elif n_items == ">1K": + assert len(payouts) > 1000 + + # reset record config + gql_data_factory.record_config = default_config + + +@patch("pdr_backend.lake.table_pdr_payouts.fetch_payouts") +@patch("pdr_backend.lake.gql_data_factory.get_all_contract_ids_by_owner") +def test_load_and_verify_payout_schema( + mock_get_all_contract_ids_by_owner, + mock_fetch_payouts, + tmpdir, + sample_payouts, + monkeypatch, +): + del_network_override(monkeypatch) + mock_get_all_contract_ids_by_owner.return_value = ["0x123"] + st_timestr = "2023-01-01_0:00" + fin_timestr = "2024-01-04_17:00" + + _test_update_payout_gql( + mock_fetch_payouts, + tmpdir, + sample_payouts, + "2023-01-01_0:00", # earlier date + "2024-01-04_17:00", # later date + n_items=6, + ) + + _, gql_data_factory = _gql_data_factory( + tmpdir, + "binanceus ETH/USDT h 5m", + st_timestr, + fin_timestr, + ) + + # Update subscriptions record only + gql_data_factory.record_config = _filter_gql_config( + gql_data_factory.record_config, pdr_payouts_record + ) + + fin_ut = timestr_to_ut(fin_timestr) + + gql_dfs = gql_data_factory._load_parquet(fin_ut) + + assert len(gql_dfs) == 1 + assert len(gql_dfs[pdr_payouts_record]) == 6 + assert round(gql_dfs[pdr_payouts_record]["payout"].sum(), 0) == 15.0 + assert gql_dfs[pdr_payouts_record].schema == payouts_schema diff --git a/pdr_backend/subgraph/payout.py b/pdr_backend/subgraph/payout.py index bd3bce799..d29733788 100644 --- a/pdr_backend/subgraph/payout.py +++ b/pdr_backend/subgraph/payout.py @@ -1,15 +1,12 @@ +from typing import List + from enforce_typing import enforce_types + @enforce_types class Payout: def __init__( - self, - ID: str, - token: str, - user: str, - slot: int, - timestamp: int, - payout: float + self, ID: str, token: str, user: str, slot: int, timestamp: int, payout: float ) -> None: self.ID = ID self.user = user @@ -17,3 +14,75 @@ def __init__( self.token = token self.slot = slot self.payout = payout + + +@enforce_types +def mock_payout(payout_tuple: tuple) -> Payout: + (ID, user, timestamp, token, slot, payout) = payout_tuple + + return Payout( + ID=ID, user=user, timestamp=timestamp, token=token, slot=slot, payout=payout + ) + + +@enforce_types +def mock_payouts() -> List[Payout]: + return [mock_payout(payout_tuple) for payout_tuple in _PAYOUT_TUPS] + + +_PAYOUT_TUPS = [ + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704152700-0xeb18bad7365a40e36a41fb8734eb0b855d13b74f", + "0xeb18bad7365a40e36a41fb8734eb0b855d13b74f", + 1704153558000, + "ADA/USDT", + 1704152700, + 0.0, + ), + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704152700-0xfb223c3583aa934273173a55c226d598a149441c", + "0xfb223c3583aa934273173a55c226d598a149441c", + 1704153051000, + "ADA/USDT", + 1704152700, + 3.786517720904995824, + ), + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x02e9d2eede4c5347e55346860c8a8988117bde9e", + "0x02e9d2eede4c5347e55346860c8a8988117bde9e", + 1704153351000, + "ADA/USDT", + 1704153000, + 3.687473663992716148, + ), + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x04a5e9f565dfa83c77581d1022b9ef041f55210b", + "0x04a5e9f565dfa83c77581d1022b9ef041f55210b", + 1704153504000, + "ADA/USDT", + 1704153000, + 6.334665366356455078, + ), + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x7149ceca72c61991018ed80788bea3f3f4540c3c", + "0x7149ceca72c61991018ed80788bea3f3f4540c3c", + 1704153534000, + "ADA/USDT", + 1704153000, + 1.463270654801637113, + ), + ( + # pylint: disable=line-too-long + "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0xeb18bad7365a40e36a41fb8734eb0b855d13b74f", + "0xeb18bad7365a40e36a41fb8734eb0b855d13b74f", + 1704153558000, + "ADA/USDT", + 1704153000, + 0.0, + ), +] diff --git a/pdr_backend/subgraph/subgraph_payout.py b/pdr_backend/subgraph/subgraph_payout.py index 5b519a1b2..4cfb0bc02 100644 --- a/pdr_backend/subgraph/subgraph_payout.py +++ b/pdr_backend/subgraph/subgraph_payout.py @@ -5,6 +5,7 @@ from pdr_backend.util.networkutil import get_subgraph_url from pdr_backend.subgraph.payout import Payout + @enforce_types def get_payout_query( asset_ids: List[str], start_ts: int, end_ts: int, first: int, skip: int @@ -14,41 +15,129 @@ def get_payout_query( specified assets within a slot range. Args: - asset_ids: A list of asset identifiers to include in the query. - initial_slot: The starting slot number for the query range. - last_slot: The ending slot number for the query range. + prediction_ids: A list of prediction identifiers to include in the query. first: The number of records to fetch per query (pagination limit). skip: The number of records to skip (pagination offset). Returns: A string representing the GraphQL query. """ - asset_ids_str = str(asset_ids).replace("[", "[").replace("]", "]").replace("'", '"') + + # asset_ids_str = str(asset_ids).replace("[", "[").replace("]", "]").replace("'", '"') + where_query_arr = [] + + for asset_id in asset_ids: + where_query_arr.append( + """ + { + timestamp_gte: %s, + timestamp_lte: %s, + prediction_contains: "%s" + } + """ + % (start_ts, end_ts, asset_id) + ) return """ query { predictPayouts ( first: %s skip: %s - where: { slot_: {slot_gte: %s, slot_lte: %s, predictContract_in: %s}} - ) { - id - timestamp - payout - prediction { - user { - id - } + where: { + or: [%s] } - slot { + ) { id - } + timestamp + payout + prediction { + user { + id + } + slot { + id + predictContract{ + id + token{ + name + } + } + } + } } } """ % ( first, skip, + ", ".join(where_query_arr), + ) + + +@enforce_types +def filter_by_addresses(result, addresses): + """ + Filter the result["data"]["predictPayouts"] by a list of addresses. + + Parameters: + result (dict): The result dictionary. + addresses (list): The list of addresses to filter by. + + Returns: + list: The filtered list of payouts. + """ + payouts = result["data"]["predictPayouts"] + filtered_payouts = [ + payout + for payout in payouts + if payout["prediction"]["slot"]["predictContract"]["id"] in addresses + ] + + return filtered_payouts + + +@enforce_types +def fetch_payouts( + addresses: List[str], + start_ts: int, + end_ts: int, + skip: int, + network: str = "mainnet", +) -> List[Payout]: + records_per_page = 10 + query = get_payout_query( + addresses, start_ts, end_ts, - asset_ids_str, + records_per_page, + skip, ) + + print("payout query", query) + + result = query_subgraph( + get_subgraph_url(network), + query, + timeout=20.0, + ) + + new_payouts = filter_by_addresses(result, addresses) + + if new_payouts is None: + return [] + + new_payouts = [ + Payout( + **{ + "payout": float(payout["payout"]), + "user": payout["prediction"]["user"]["id"], + "timestamp": payout["timestamp"], + "ID": payout["id"], + "token": payout["prediction"]["slot"]["predictContract"]["token"][ + "name" + ], + "slot": int(payout["id"].split("-")[1]), + } + ) + for payout in new_payouts + ] + return new_payouts diff --git a/pdr_backend/subgraph/test/test_subgraph_payout.py b/pdr_backend/subgraph/test/test_subgraph_payout.py new file mode 100644 index 000000000..a10046b4e --- /dev/null +++ b/pdr_backend/subgraph/test/test_subgraph_payout.py @@ -0,0 +1,67 @@ +from unittest.mock import patch + +from enforce_typing import enforce_types + +from pdr_backend.subgraph.subgraph_payout import ( + Payout, + get_payout_query, + fetch_payouts, +) + +# pylint: disable=line-too-long +MOCK_PAYOUT_QUERY_RESPONSE = { + "data": { + "predictPayouts": [ + { + "id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1696880700-0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd", + "timestamp": 1698527000, + "payout": "0", + "prediction": { + "user": {"id": "0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd"}, + "slot": { + "id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1696880700", + "predictContract": { + "id": "0x18f54cc21b7a2fdd011bea06bba7801b280e3151", + "token": {"name": "ADA/USDT"}, + }, + }, + }, + } + ] + } +} + + +def test_get_payout_query(): + payout_query = get_payout_query( + ["0x18f54cc21b7a2fdd011bea06bba7801b280e3151"], 1622547000, 1622548800, 1, 1 + ) + + assert "1622547000" in payout_query + assert "1622548800" in payout_query + assert "0x18f54cc21b7a2fdd011bea06bba7801b280e3151" in payout_query + + +@enforce_types +@patch("pdr_backend.subgraph.subgraph_payout.query_subgraph") +def test_fetch_payouts(mock_query_subgraph): + mock_query_subgraph.return_value = MOCK_PAYOUT_QUERY_RESPONSE + + payouts = fetch_payouts( + addresses=["0x18f54cc21b7a2fdd011bea06bba7801b280e3151"], + start_ts=1622547000, + end_ts=1622548800, + skip=1, + network="mainnet", + ) + assert len(payouts) == 1 + assert isinstance(payouts[0], Payout) + assert ( + payouts[0].ID + == "0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1696880700-0xd2a24cb4ff2584bad80ff5f109034a891c3d88dd" + ) + assert payouts[0].token == "ADA/USDT" + assert payouts[0].timestamp == 1698527000 + assert payouts[0].slot == 1696880700 + assert payouts[0].payout == float(0) + assert mock_query_subgraph.call_count == 1