Skip to content

Commit

Permalink
issue483: move the logic from subgraph_slot.py (#489)
Browse files Browse the repository at this point in the history
  • Loading branch information
kdetry authored Jan 9, 2024
1 parent 243ae7e commit 3a5c639
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 290 deletions.
3 changes: 3 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ ignore_missing_imports = True
[mypy-pdr_backend.predictoor.examples.*]
ignore_missing_imports = True

[mypy-polars.*]
ignore_missing_imports = True

[mypy-pylab.*]
ignore_missing_imports = True

Expand Down
195 changes: 193 additions & 2 deletions pdr_backend/accuracy/app.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,210 @@
import json
import threading
from datetime import datetime, timedelta
from typing import Tuple
from typing import Any, Dict, List, Optional, Tuple

from enforce_typing import enforce_types
from flask import Flask, jsonify

from pdr_backend.subgraph.subgraph_predictions import (
fetch_contract_id_and_spe,
get_all_contract_ids_by_owner,
ContractIdAndSPE,
)
from pdr_backend.subgraph.subgraph_slot import calculate_statistics_for_all_assets
from pdr_backend.subgraph.subgraph_slot import fetch_slots_for_all_assets, PredictSlot

app = Flask(__name__)
JSON_FILE_PATH = "pdr_backend/accuracy/output/accuracy_data.json"
SECONDS_IN_A_DAY = 86400


@enforce_types
def calculate_prediction_result(
round_sum_stakes_up: float, round_sum_stakes: float
) -> Optional[bool]:
"""
Calculates the prediction result based on the sum of stakes.
Args:
round_sum_stakes_up: The summed stakes for the 'up' prediction.
round_sum_stakes: The summed stakes for all prediction.
Returns:
A boolean indicating the predicted direction.
"""

# checks for to be sure that the division is not by zero
round_sum_stakes_up_float = float(round_sum_stakes_up)
round_sum_stakes_float = float(round_sum_stakes)

if round_sum_stakes_float == 0.0:
return None

if round_sum_stakes_up_float == 0.0:
return False

return (round_sum_stakes_up_float / round_sum_stakes_float) > 0.5


@enforce_types
def process_single_slot(
slot: PredictSlot, end_of_previous_day_timestamp: int
) -> Optional[Tuple[float, float, int, int]]:
"""
Processes a single slot and calculates the staked amounts for yesterday and today,
as well as the count of correct predictions.
Args:
slot: A PredictSlot TypedDict containing information about a single prediction slot.
end_of_previous_day_timestamp: The Unix timestamp marking the end of the previous day.
Returns:
A tuple containing staked amounts for yesterday, today, and the counts of correct
predictions and slots evaluated, or None if no stakes were made today.
"""

staked_yesterday = staked_today = 0.0
correct_predictions_count = slots_evaluated = 0

if float(slot.roundSumStakes) == 0.0:
return None

# split the id to get the slot timestamp
timestamp = int(slot.ID.split("-")[1]) # Using dot notation for attribute access

if (
end_of_previous_day_timestamp - SECONDS_IN_A_DAY
< timestamp
< end_of_previous_day_timestamp
):
staked_yesterday += float(slot.roundSumStakes)
elif timestamp > end_of_previous_day_timestamp:
staked_today += float(slot.roundSumStakes)

prediction_result = calculate_prediction_result(
slot.roundSumStakesUp, slot.roundSumStakes
)

if prediction_result is None:
return (
staked_yesterday,
staked_today,
correct_predictions_count,
slots_evaluated,
)

true_values: List[Dict[str, Any]] = slot.trueValues or []
true_value: Optional[bool] = true_values[0]["trueValue"] if true_values else None

if len(true_values) > 0 and prediction_result == true_value:
correct_predictions_count += 1

if len(true_values) > 0 and true_value is not None:
slots_evaluated += 1

return staked_yesterday, staked_today, correct_predictions_count, slots_evaluated


@enforce_types
def aggregate_statistics(
slots: List[PredictSlot], end_of_previous_day_timestamp: int
) -> Tuple[float, float, int, int]:
"""
Aggregates statistics across all provided slots for an asset.
Args:
slots: A list of PredictSlot TypedDicts containing information
about multiple prediction slots.
end_of_previous_day_timestamp: The Unix timestamp marking the end of the previous day.
Returns:
A tuple containing the total staked amounts for yesterday, today,
and the total counts of correct predictions and slots evaluated.
"""

total_staked_yesterday = (
total_staked_today
) = total_correct_predictions = total_slots_evaluated = 0
for slot in slots:
slot_results = process_single_slot(slot, end_of_previous_day_timestamp)
if slot_results:
(
staked_yesterday,
staked_today,
correct_predictions_count,
slots_evaluated,
) = slot_results
total_staked_yesterday += staked_yesterday
total_staked_today += staked_today
total_correct_predictions += correct_predictions_count
total_slots_evaluated += slots_evaluated
return (
total_staked_yesterday,
total_staked_today,
total_correct_predictions,
total_slots_evaluated,
)


@enforce_types
def calculate_statistics_for_all_assets(
asset_ids: List[str],
contracts_list: List[ContractIdAndSPE],
start_ts_param: int,
end_ts_param: int,
network: str = "mainnet",
) -> Dict[str, Dict[str, Any]]:
"""
Calculates statistics for all provided assets based on
slot data within a specified time range.
Args:
asset_ids: A list of asset identifiers for which statistics will be calculated.
start_ts_param: The Unix timestamp for the start of the time range.
end_ts_param: The Unix timestamp for the end of the time range.
network: The blockchain network to query ('mainnet' or 'testnet').
Returns:
A dictionary mapping asset IDs to another dictionary with
calculated statistics such as average accuracy and total staked amounts.
"""

slots_by_asset = fetch_slots_for_all_assets(
asset_ids, start_ts_param, end_ts_param, network
)

overall_stats = {}
for asset_id, slots in slots_by_asset.items():
(
staked_yesterday,
staked_today,
correct_predictions_count,
slots_evaluated,
) = aggregate_statistics(slots, end_ts_param - SECONDS_IN_A_DAY)
average_accuracy = (
0
if correct_predictions_count == 0
else (correct_predictions_count / slots_evaluated) * 100
)

# filter contracts to get the contract with the current asset id
contract_item = next(
(
contract_item
for contract_item in contracts_list
if contract_item["ID"] == asset_id
),
None,
)

overall_stats[asset_id] = {
"token_name": contract_item["name"] if contract_item else None,
"average_accuracy": average_accuracy,
"total_staked_yesterday": staked_yesterday,
"total_staked_today": staked_today,
}

return overall_stats


@enforce_types
Expand Down
91 changes: 91 additions & 0 deletions pdr_backend/accuracy/test/test_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from typing import List
from unittest.mock import patch

from enforce_typing import enforce_types

from pdr_backend.subgraph.subgraph_predictions import ContractIdAndSPE
from pdr_backend.accuracy.app import (
calculate_prediction_result,
process_single_slot,
aggregate_statistics,
calculate_statistics_for_all_assets,
)
from pdr_backend.subgraph.subgraph_slot import PredictSlot

# Sample data for tests
SAMPLE_PREDICT_SLOT = PredictSlot(
ID="0xAsset-12345",
slot="12345",
trueValues=[{"ID": "1", "trueValue": True}],
roundSumStakesUp=150.0,
roundSumStakes=100.0,
)


@enforce_types
def test_calculate_prediction_result():
# Test the calculate_prediction_prediction_result function with expected inputs
result = calculate_prediction_result(150.0, 200.0)
assert result

result = calculate_prediction_result(100.0, 250.0)
assert not result


@enforce_types
def test_process_single_slot():
# Test the process_single_slot function
(
staked_yesterday,
staked_today,
correct_predictions,
slots_evaluated,
) = process_single_slot(
slot=SAMPLE_PREDICT_SLOT, end_of_previous_day_timestamp=12340
)

assert staked_yesterday == 0.0
assert staked_today == 100.0
assert correct_predictions == 1
assert slots_evaluated == 1


@enforce_types
def test_aggregate_statistics():
# Test the aggregate_statistics function
(
total_staked_yesterday,
total_staked_today,
total_correct_predictions,
total_slots_evaluated,
) = aggregate_statistics(
slots=[SAMPLE_PREDICT_SLOT], end_of_previous_day_timestamp=12340
)
assert total_staked_yesterday == 0.0
assert total_staked_today == 100.0
assert total_correct_predictions == 1
assert total_slots_evaluated == 1


@enforce_types
@patch("pdr_backend.accuracy.app.fetch_slots_for_all_assets")
def test_calculate_statistics_for_all_assets(mock_fetch_slots):
# Mocks
mock_fetch_slots.return_value = {"0xAsset": [SAMPLE_PREDICT_SLOT] * 1000}
contracts_list: List[ContractIdAndSPE] = [
{"ID": "0xAsset", "seconds_per_epoch": 300, "name": "TEST/USDT"}
]

# Main work
statistics = calculate_statistics_for_all_assets(
asset_ids=["0xAsset"],
contracts_list=contracts_list,
start_ts_param=1000,
end_ts_param=2000,
network="mainnet",
)

print("test_calculate_statistics_for_all_assets", statistics)
# Verify
assert statistics["0xAsset"]["average_accuracy"] == 100.0
mock_fetch_slots.assert_called_once_with(["0xAsset"], 1000, 2000, "mainnet")
Loading

0 comments on commit 3a5c639

Please sign in to comment.