Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue436 - Implement GQL data factory #438

Merged
merged 38 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
e98c3d1
First pass on gql data factory
idiom-bytes Dec 7, 2023
0cdcd74
Passing tests, formatting, etc... Still lots to cleanup
idiom-bytes Dec 8, 2023
1f0c753
Removing feeds so cli is stable, will return it right away
idiom-bytes Dec 8, 2023
88a2a53
adjusting plot names
idiom-bytes Dec 8, 2023
8b65b12
Returning logic to get contracts from owner_address
idiom-bytes Dec 8, 2023
85aa19b
Continuing to clean up code around gql_data_factory to make it easier…
idiom-bytes Dec 12, 2023
22845bf
merging latest from yaml-cli
idiom-bytes Dec 12, 2023
0dcc013
fixing build
idiom-bytes Dec 13, 2023
09aaf74
Merge branch 'yaml-cli2' into issue436-gql-data-factory
idiom-bytes Dec 13, 2023
58dc49a
Addressing subdirectories
idiom-bytes Dec 13, 2023
05c3dd3
Simplifying code to use common mock_pss, and improve DRY
idiom-bytes Dec 13, 2023
ef3bd4d
Simplifying code by using common mock_web3_pp and mock_ppss
idiom-bytes Dec 13, 2023
f85d347
cleaning up usage of common mock_pss function
idiom-bytes Dec 13, 2023
f98f643
adding enforce_types, and reducing TODOs as I move them into tickets.
idiom-bytes Dec 13, 2023
34e8180
rename gql_fn => do_fetch
idiom-bytes Dec 14, 2023
c14ca14
reverting some gql_fns to parquet_fns, such that we can keep it gener…
idiom-bytes Dec 14, 2023
e6be7ca
simplfy test
idiom-bytes Dec 14, 2023
53e7238
reverting changes to pq_data_factory
idiom-bytes Dec 14, 2023
174814d
Merge branch 'yaml-cli2' into issue436-gql-data-factory
idiom-bytes Dec 14, 2023
76aae07
restructuring GQLDataFactory such that it takes in PPSS, rather than …
idiom-bytes Dec 14, 2023
82c79d6
fixing conflicts
idiom-bytes Dec 14, 2023
a5d7e84
Cleaned up logic around network-postfix such that we can use PPSS onc…
idiom-bytes Dec 14, 2023
0d22b22
Updating code to use common mock_ppss and mock_ppss_web3
idiom-bytes Dec 14, 2023
bc82956
Fixing tests such that we're using path str, rather than _pytest._py.…
idiom-bytes Dec 14, 2023
d1fbfa7
Merge branch 'yaml-cli2' into issue436-gql-data-factory
idiom-bytes Dec 14, 2023
5919d99
Merge branch 'yaml-cli2' into issue436-gql-data-factory
idiom-bytes Dec 14, 2023
17d93f7
Cleaning up redundancy around common test_data, and helpers to create…
idiom-bytes Dec 15, 2023
0e08801
fixing prediction test
idiom-bytes Dec 15, 2023
476324c
fixing pylint
idiom-bytes Dec 15, 2023
78671c5
Added logic to cull anything weird from subgraph, and to raise an exc…
idiom-bytes Dec 15, 2023
b0d35f2
Added logic to cull/enforce no-duplicates
idiom-bytes Dec 15, 2023
05d5197
Merge branch 'yaml-cli2' into issue436-gql-data-factory
trentmc Dec 15, 2023
2c15de1
(a) move get_sapphire_postfix() to networkutil.py (b) put imports in …
trentmc Dec 15, 2023
71e33c7
Fix breaking tests introduced by recent commit
trentmc Dec 15, 2023
468e50a
prediction.id -> prediction.ID
trentmc Dec 15, 2023
4ce232d
(a) ContractIdAndSPE.id -> ID (b) PredictSlot.id -> ID (c) fix tests …
trentmc Dec 15, 2023
ea8669c
Fix pylint complaints in CI (not local)
trentmc Dec 15, 2023
7ed9d27
make pylint actually happy. Last commit didn't work
trentmc Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions pdr_backend/data_eng/gql/predictoor/predictions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import List, Dict
from enforce_typing import enforce_types

import polars as pl
from polars import Utf8, Int64, Float64, Boolean

from pdr_backend.util.subgraph_predictions import (
fetch_filtered_predictions,
FilterMode,
)

from pdr_backend.util.timeutil import ms_to_seconds

# RAW_PREDICTIONS_SCHEMA
predictions_schema = {
"id": Utf8,
"pair": Utf8,
"timeframe": Utf8,
"prediction": Boolean,
"stake": Float64,
"trueval": Boolean,
"timestamp": Int64,
"source": Utf8,
"payout": Float64,
"slot": Int64,
"user": Utf8,
}


def _object_list_to_df(objects: List[object], schema: Dict) -> pl.DataFrame:
"""
@description
Convert list objects to a dataframe using their __dict__ structure.
"""
# Get all predictions into a dataframe
obj_dicts = [object.__dict__ for object in objects]
obj_df = pl.DataFrame(obj_dicts, schema=schema)
assert obj_df.schema == schema

return obj_df


def _transform_timestamp_to_ms(df: pl.DataFrame) -> pl.DataFrame:
df = df.with_columns(
[
pl.col("timestamp").mul(1000).alias("timestamp"),
]
)
return df


@enforce_types
def get_predictoor_predictions_df(
network: str, st_ut: int, fin_ut: int, config: Dict
) -> pl.DataFrame:
"""
@description
Fetch raw predictions from predictoor subgraph
Update function for graphql query, returns raw data
+ Transforms ts into ms as required for data factory
"""
# TO DO: This code has DRY problems. Reduce.
if "main" in network:
network = "mainnet"
elif "test" in network:
network = "testnet"
else:
raise ValueError(network)

# fetch predictions
predictions = fetch_filtered_predictions(
ms_to_seconds(st_ut),
ms_to_seconds(fin_ut),
config["contract_list"],
network,
FilterMode.CONTRACT_TS,
payout_only=False,
trueval_only=False,
)

if len(predictions) == 0:
print(" No predictions to fetch. Exit.")
return pl.DataFrame()

# convert predictions to df and transform timestamp into ms
predictions_df = _object_list_to_df(predictions, predictions_schema)
predictions_df = _transform_timestamp_to_ms(predictions_df)

return predictions_df
267 changes: 267 additions & 0 deletions pdr_backend/data_eng/gql_data_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
import os
from typing import Dict, Callable

from enforce_typing import enforce_types
import polars as pl

from pdr_backend.ppss.data_pp import DataPP
from pdr_backend.ppss.data_ss import DataSS
from pdr_backend.ppss.web3_pp import Web3PP
from pdr_backend.util.timeutil import pretty_timestr, current_ut

from pdr_backend.data_eng.plutil import (
has_data,
newest_ut,
)

from pdr_backend.util.subgraph_predictions import (
get_all_contract_ids_by_owner,
)

from pdr_backend.data_eng.gql.predictoor.predictions import (
predictions_schema,
get_predictoor_predictions_df,
)


@enforce_types
class GQLDataFactory:
"""
Roles:
- From each GQL API, fill >=1 parquet_dfs -> parquet files data lake
- From parquet_dfs, calculate stats and other dfs
- All timestamps, after fetching, are transformed into milliseconds wherever appropriate

Finally:
- "timestamp" values are ut: int is unix time, UTC, in ms (not s)
- "datetime" values ares python datetime.datetime, UTC
"""

def __init__(self, pp: DataPP, ss: DataSS, web3: Web3PP):
self.pp = pp
self.ss = ss
self.web3 = web3

# TO DO: Solve duplicates from subgraph.
# Method 1: Cull anything returned outside st_ut, fin_ut
self.debug_duplicate = False

# TO DO: This code has DRY problems. Reduce.
# get network
if "main" in self.web3.network:
network = "mainnet"
elif "test" in self.web3.network:
network = "testnet"
else:
raise ValueError(self.web3.network)

# filter by feed contract address
contract_list = get_all_contract_ids_by_owner(
owner_address=self.web3.owner_addrs,
network=network,
)
contract_list = [f.lower() for f in contract_list]

# TO-DO: Roll into yaml config
self.record_config = {
"pdr_predictions": {
"fetch_fn": get_predictoor_predictions_df,
"schema": predictions_schema,
"config": {
"contract_list": contract_list,
},
},
}

def get_gql_dfs(self) -> Dict[str, pl.DataFrame]:
"""
@description
Get historical dataframes across many feeds and timeframes.

@return
predictions_df -- *polars* Dataframe. See class docstring
"""
print("Get predictions data across many feeds and timeframes.")

# Ss_timestamp is calculated dynamically if ss.fin_timestr = "now".
# But, we don't want fin_timestamp changing as we gather data here.
# To solve, for a given call to this method, we make a constant fin_ut
fin_ut = self.ss.fin_timestamp

print(f" Data start: {pretty_timestr(self.ss.st_timestamp)}")
print(f" Data fin: {pretty_timestr(fin_ut)}")

self._update(fin_ut)
gql_dfs = self._load_parquet(fin_ut)

print("Get historical data across many subgraphs. Done.")

# postconditions
assert len(gql_dfs.values()) > 0
for df in gql_dfs.values():
assert isinstance(df, pl.DataFrame)

return gql_dfs

def _update(self, fin_ut: int):
"""
@description
Iterate across all gql queries and update their parquet files:
- Predictoors
- Slots
- Claims

Improve this by:
1. Break out raw data from any transformed/cleaned data
2. Integrate other queries and summaries
3. Integrate config/pp if needed
@arguments
fin_ut -- a timestamp, in ms, in UTC
"""

for k, record in self.record_config.items():
filename = self._parquet_filename(k)
print(f" filename={filename}")

st_ut = self._calc_start_ut(filename)
print(f" Aim to fetch data from start time: {pretty_timestr(st_ut)}")
if st_ut > min(current_ut(), fin_ut):
print(" Given start time, no data to gather. Exit.")
continue

# to satisfy mypy, get an explicit function pointer
gql_fn: Callable[[str, int, int, Dict], pl.DataFrame] = record["fetch_fn"]

# call the function
print(f" Fetching {k}")
gql_df = gql_fn(self.web3.network, st_ut, fin_ut, record["config"])

# postcondition
assert gql_df.schema == record["schema"]

# save to parquet
self._save_parquet(filename, gql_df)

def _calc_start_ut(self, filename: str) -> int:
"""
@description
Calculate start timestamp, reconciling whether file exists and where
its data starts. If file exists, you can only append to end.

@arguments
filename - parquet file with data. May or may not exist.

@return
start_ut - timestamp (ut) to start grabbing data for (in ms)
"""
if not os.path.exists(filename):
print(" No file exists yet, so will fetch all data")
return self.ss.st_timestamp

print(" File already exists")
if not has_data(filename):
print(" File has no data, so delete it")
os.remove(filename)
return self.ss.st_timestamp

file_utN = newest_ut(filename)
return file_utN + 1000

def _load_parquet(self, fin_ut: int) -> Dict[str, pl.DataFrame]:
"""
@arguments
fin_ut -- finish timestamp

@return
parquet_dfs -- dict of [parquet_filename] : df
Where df has columns=GQL_COLS+"datetime", and index=timestamp
"""
print(" Load parquet.")
st_ut = self.ss.st_timestamp

gql_dfs: Dict[str, pl.DataFrame] = {} # [parquet_filename] : df

for k, record in self.record_config.items():
filename = self._parquet_filename(k)
print(f" filename={filename}")

# load all data from file
parquet_df = pl.read_parquet(filename)
parquet_df = parquet_df.filter(
(pl.col("timestamp") >= st_ut) & (pl.col("timestamp") <= fin_ut)
)

# postcondition
assert parquet_df.schema == record["schema"]
gql_dfs[k] = parquet_df

return gql_dfs

def _parquet_filename(self, filename_str: str) -> str:
"""
@description
Computes the lake-path for the parquet file.

@arguments
filename_str -- eg "subgraph_predictions"

@return
parquet_filename -- name for parquet file.
"""
gql_dir = os.path.join(self.ss.parquet_dir, "gql")
if not os.path.exists(gql_dir):
os.makedirs(gql_dir)

basename = f"{filename_str}.parquet"
filename = os.path.join(gql_dir, basename)
return filename

@enforce_types
def _save_parquet(self, filename: str, df: pl.DataFrame):
"""write to parquet file
parquet only supports appending via the pyarrow engine
"""

# precondition
assert "timestamp" in df.columns and df["timestamp"].dtype == pl.Int64
assert len(df) > 0
if len(df) > 1:
assert (
df.head(1)["timestamp"].to_list()[0]
< df.tail(1)["timestamp"].to_list()[0]
)

if os.path.exists(filename): # "append" existing file
cur_df = pl.read_parquet(filename)

if self.debug_duplicate is True:
print(">>> Existing rows")
print(f"HEAD: {cur_df.head(2)}")
print(f"TAIL: {cur_df.tail(2)}")

print(">>> Appending rows")
print(f"HEAD: {df.head(2)}")
print(f"TAIL: {df.tail(2)}")

df = pl.concat([cur_df, df])
df.write_parquet(filename)

duplicate_rows = df.filter(pl.struct("id").is_duplicated())
if len(duplicate_rows) > 0 and self.debug_duplicate is True:
print(f">>>> Duplicate rows found. {len(duplicate_rows)} rows:")
print(f"HEAD: {duplicate_rows.head(2)}")
print(f"TAIL: {duplicate_rows.tail(2)}")

# log duplicate rows to log file
log_filename = "debug.log"
with open(log_filename, "a") as f:
f.write(
f">>>>>>>> Duplicate rows found. {len(duplicate_rows)} rows:"
)
f.write(str(duplicate_rows))

n_new = df.shape[0] - cur_df.shape[0]
print(f" Just appended {n_new} df rows to file {filename}")
else: # write new file
df.write_parquet(filename)
print(f" Just saved df with {df.shape[0]} rows to new file {filename}")
4 changes: 1 addition & 3 deletions pdr_backend/data_eng/plutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ def save_rawohlcv_file(filename: str, df: pl.DataFrame):

df = df.select(columns)

if os.path.exists(filename): # append existing file
# TO DO: Implement parquet-append with pyarrow
if os.path.exists(filename): # "append" existing file
cur_df = pl.read_parquet(filename)
df = pl.concat([cur_df, df])
df.write_parquet(filename)
Expand Down Expand Up @@ -138,7 +137,6 @@ def load_rawohlcv_file(filename: str, cols=None, st=None, fin=None) -> pl.DataFr
df = transform_df(df)

# postconditions, return
# TO DO: Helper to go from np<->pl schema/dtypes
assert "timestamp" in df.columns and df["timestamp"].dtype == pl.Int64
assert "datetime" in df.columns and df["datetime"].dtype == pl.Datetime

Expand Down
Loading