Skip to content

Commit

Permalink
issue-481: Add Payouts to the Data Factory
Browse files Browse the repository at this point in the history
  • Loading branch information
kdetry committed Jan 15, 2024
1 parent 5f39450 commit 2a36373
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 23 deletions.
15 changes: 15 additions & 0 deletions pdr_backend/lake/gql_data_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
get_pdr_subscriptions_df,
subscriptions_schema,
)
from pdr_backend.lake.table_pdr_payouts import (
get_pdr_payouts_df,
payouts_schema,
)
from pdr_backend.ppss.ppss import PPSS
from pdr_backend.subgraph.subgraph_predictions import get_all_contract_ids_by_owner
from pdr_backend.util.networkutil import get_sapphire_postfix
Expand Down Expand Up @@ -42,6 +46,10 @@ def __init__(self, ppss: PPSS):
network=network,
)
contract_list = [f.lower() for f in contract_list]

# For debugging
# t_contract_list = [f.lower() for f in contract_list]
# contract_list = [t_contract_list[0], t_contract_list[1]]

# configure all tables that will be recorded onto lake
self.record_config = {
Expand All @@ -59,6 +67,13 @@ def __init__(self, ppss: PPSS):
"contract_list": contract_list,
},
},
"pdr_payouts": {
"fetch_fn": get_pdr_payouts_df,
"schema": payouts_schema,
"config": {
"contract_list": contract_list,
},
},
}

def get_gql_dfs(self) -> Dict[str, pl.DataFrame]:
Expand Down
46 changes: 46 additions & 0 deletions pdr_backend/lake/table_pdr_payouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Dict
import polars as pl
from enforce_typing import enforce_types
from polars import Int64, Float64, Utf8

from pdr_backend.subgraph.subgraph_payout import fetch_payouts
from pdr_backend.lake.plutil import _object_list_to_df
from pdr_backend.util.networkutil import get_sapphire_postfix
from pdr_backend.util.timeutil import ms_to_seconds

# RAW PAYOUT SCHEMA
payouts_schema = {
"ID": Utf8,
"token": Utf8,
"user": Utf8,
"slot": Int64,
"timestamp": Int64,
"payout": Float64,
}


@enforce_types
def get_pdr_payouts_df(
network: str, st_ut: int, fin_ut: int, config: Dict
) -> pl.DataFrame:
"""
@description
Fetch raw payouts from predictoor subgraph
Update function for graphql query, returns raw data
+ Transforms ts into ms as required for data factory
"""
network = get_sapphire_postfix(network)

# fetch payouts
payouts = fetch_payouts(
config["contract_list"], ms_to_seconds(st_ut), ms_to_seconds(fin_ut), 0, network
)

if len(payouts) == 0:
print("No payouts to fetch. Exit.")
return pl.DataFrame()

# convert payouts to df and transform timestamp into ms
payout_df = _object_list_to_df(payouts, payouts_schema)

return payout_df
6 changes: 6 additions & 0 deletions pdr_backend/lake/test/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from pdr_backend.subgraph.prediction import mock_daily_predictions
from pdr_backend.subgraph.subscription import mock_subscriptions
from pdr_backend.subgraph.payout import mock_payouts


@pytest.fixture()
Expand All @@ -12,3 +13,8 @@ def sample_daily_predictions():
@pytest.fixture()
def sample_subscriptions():
return mock_subscriptions()


@pytest.fixture()
def sample_payouts():
return mock_payouts()
149 changes: 149 additions & 0 deletions pdr_backend/lake/test/test_table_pdr_payouts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from typing import List
from unittest.mock import patch

import polars as pl
from enforce_typing import enforce_types

from pdr_backend.lake.table_pdr_payouts import payouts_schema
from pdr_backend.lake.test.resources import _gql_data_factory, _filter_gql_config
from pdr_backend.ppss.web3_pp import del_network_override
from pdr_backend.util.timeutil import timestr_to_ut

# ====================================================================
pdr_payouts_record = "pdr_payouts"


@patch("pdr_backend.lake.table_pdr_payouts.fetch_payouts")
@patch("pdr_backend.lake.gql_data_factory.get_all_contract_ids_by_owner")
def test_update_payout_gql_proxy(
mock_get_all_contract_ids_by_owner,
mock_fetch_payouts,
tmpdir,
sample_payouts,
monkeypatch,
):
del_network_override(monkeypatch)
mock_get_all_contract_ids_by_owner.return_value = ["0x123"]
_test_update_payout_gql(
mock_fetch_payouts,
tmpdir,
sample_payouts,
"2023-01-01_0:00", # earlier date
"2024-01-04_17:00", # later date
n_items=6,
)


@enforce_types
def _test_update_payout_gql(
mock_fetch_payouts,
tmpdir,
sample_payouts,
st_timestr: str,
fin_timestr: str,
n_items: int,
):
"""
@arguments
n_items -- expected # payouts. Typically int. If '>1K', expect >1000
"""

_, gql_data_factory = _gql_data_factory(
tmpdir,
"binanceus ETH/USDT h 5m",
st_timestr,
fin_timestr,
)

# Update subscriptions record only
default_config = gql_data_factory.record_config
gql_data_factory.record_config = _filter_gql_config(
gql_data_factory.record_config, pdr_payouts_record
)

# setup: filename
# everything will be inside the gql folder
filename = gql_data_factory._parquet_filename(pdr_payouts_record)
assert ".parquet" in filename

fin_ut = timestr_to_ut(fin_timestr)
st_ut = gql_data_factory._calc_start_ut(filename)

# calculate ms locally so we can filter raw subscriptions
st_ut_sec = st_ut // 1000
fin_ut_sec = fin_ut // 1000

mock_fetch_payouts.return_value = sample_payouts

# work 1: update parquet
gql_data_factory._update(fin_ut)

# assert params
mock_fetch_payouts.assert_called_with(
["0x123"],
st_ut_sec,
fin_ut_sec,
0,
"mainnet",
)

# read parquet and columns
def _payouts_in_parquet(filename: str) -> List[int]:
df = pl.read_parquet(filename)
assert df.schema == payouts_schema
return df["timestamp"].to_list()

# assert expected length of payouts in parquet
payouts: List[int] = _payouts_in_parquet(filename)
if isinstance(n_items, int):
assert len(payouts) == n_items
elif n_items == ">1K":
assert len(payouts) > 1000

# reset record config
gql_data_factory.record_config = default_config


@patch("pdr_backend.lake.table_pdr_payouts.fetch_payouts")
@patch("pdr_backend.lake.gql_data_factory.get_all_contract_ids_by_owner")
def test_load_and_verify_payout_schema(
mock_get_all_contract_ids_by_owner,
mock_fetch_payouts,
tmpdir,
sample_payouts,
monkeypatch,
):
del_network_override(monkeypatch)
mock_get_all_contract_ids_by_owner.return_value = ["0x123"]
st_timestr = "2023-01-01_0:00"
fin_timestr = "2024-01-04_17:00"

_test_update_payout_gql(
mock_fetch_payouts,
tmpdir,
sample_payouts,
"2023-01-01_0:00", # earlier date
"2024-01-04_17:00", # later date
n_items=6,
)

_, gql_data_factory = _gql_data_factory(
tmpdir,
"binanceus ETH/USDT h 5m",
st_timestr,
fin_timestr,
)

# Update subscriptions record only
gql_data_factory.record_config = _filter_gql_config(
gql_data_factory.record_config, pdr_payouts_record
)

fin_ut = timestr_to_ut(fin_timestr)

gql_dfs = gql_data_factory._load_parquet(fin_ut)

assert len(gql_dfs) == 1
assert len(gql_dfs[pdr_payouts_record]) == 6
assert round(gql_dfs[pdr_payouts_record]["payout"].sum(), 0) == 15.0
assert gql_dfs[pdr_payouts_record].schema == payouts_schema
83 changes: 76 additions & 7 deletions pdr_backend/subgraph/payout.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,88 @@
from typing import List

from enforce_typing import enforce_types


@enforce_types
class Payout:
def __init__(
self,
ID: str,
token: str,
user: str,
slot: int,
timestamp: int,
payout: float
self, ID: str, token: str, user: str, slot: int, timestamp: int, payout: float
) -> None:
self.ID = ID
self.user = user
self.timestamp = timestamp
self.token = token
self.slot = slot
self.payout = payout


@enforce_types
def mock_payout(payout_tuple: tuple) -> Payout:
(ID, user, timestamp, token, slot, payout) = payout_tuple

return Payout(
ID=ID, user=user, timestamp=timestamp, token=token, slot=slot, payout=payout
)


@enforce_types
def mock_payouts() -> List[Payout]:
return [mock_payout(payout_tuple) for payout_tuple in _PAYOUT_TUPS]


_PAYOUT_TUPS = [
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704152700-0xeb18bad7365a40e36a41fb8734eb0b855d13b74f",
"0xeb18bad7365a40e36a41fb8734eb0b855d13b74f",
1704153558000,
"ADA/USDT",
1704152700,
0.0,
),
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704152700-0xfb223c3583aa934273173a55c226d598a149441c",
"0xfb223c3583aa934273173a55c226d598a149441c",
1704153051000,
"ADA/USDT",
1704152700,
3.786517720904995824,
),
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x02e9d2eede4c5347e55346860c8a8988117bde9e",
"0x02e9d2eede4c5347e55346860c8a8988117bde9e",
1704153351000,
"ADA/USDT",
1704153000,
3.687473663992716148,
),
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x04a5e9f565dfa83c77581d1022b9ef041f55210b",
"0x04a5e9f565dfa83c77581d1022b9ef041f55210b",
1704153504000,
"ADA/USDT",
1704153000,
6.334665366356455078,
),
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0x7149ceca72c61991018ed80788bea3f3f4540c3c",
"0x7149ceca72c61991018ed80788bea3f3f4540c3c",
1704153534000,
"ADA/USDT",
1704153000,
1.463270654801637113,
),
(
# pylint: disable=line-too-long
"0x18f54cc21b7a2fdd011bea06bba7801b280e3151-1704153000-0xeb18bad7365a40e36a41fb8734eb0b855d13b74f",
"0xeb18bad7365a40e36a41fb8734eb0b855d13b74f",
1704153558000,
"ADA/USDT",
1704153000,
0.0,
),
]
Loading

0 comments on commit 2a36373

Please sign in to comment.