Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CERT 7649 Allow dynamic price feed providers #20

Merged
merged 12 commits into from
Dec 1, 2024
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
**/local*
build/
*.egg-info
CustomerClones/
CustomerClones/
**cache**
17 changes: 8 additions & 9 deletions Quorum/apis/git_api/git_manager.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
from pathlib import Path
from git import Repo

import Quorum.config as config
import Quorum.utils.pretty_printer as pp


class GitManager:
"""
A class to manage Git repositories for a specific customer.
Expand All @@ -15,12 +13,13 @@ class GitManager:
repos (dict): A dictionary mapping repository names to their URLs.
"""

def __init__(self, customer: str) -> None:
def __init__(self, customer: str, gt_config: dict[str, any]) -> None:
"""
Initialize the GitManager with the given customer name and load the repository URLs.

Args:
customer (str): The name or identifier of the customer.
gt_config (dict[str, any]): The ground truth configuration data.
"""
self.customer = customer

Expand All @@ -30,22 +29,22 @@ def __init__(self, customer: str) -> None:
self.review_module_path = config.MAIN_PATH / self.customer / "review_module"
self.review_module_path.mkdir(parents=True, exist_ok=True)

self.repos, self.review_repo = self._load_repos_from_file()
self.repos, self.review_repo = self._load_repos_from_file(gt_config)

def _load_repos_from_file(self) -> tuple[dict[str, str], dict[str, str]]:
def _load_repos_from_file(self, gt_config: dict[str, any]) -> tuple[dict[str, str], dict[str, str]]:
"""
Load repository URLs from the JSON file for the given customer.

Args:
gt_config (dict[str, any]): The ground truth configuration data.

Returns:
tuple[dict[str, str], dict[str, str]]: 2 dictionaries mapping repository names to their URLs.
The first dictionary contains the repos to diff against. The second dictionary is the verification repo.
"""
with open(config.REPOS_PATH) as f:
repos_data = json.load(f)

# Normalize the customer name to handle case differences
normalized_customer = self.customer.lower()
customer_repos = next((repos for key, repos in repos_data.items() if key.lower() == normalized_customer), None)
customer_repos = next((repos for key, repos in gt_config.items() if key.lower() == normalized_customer), None)
if customer_repos is None:
return {}, {}

Expand Down
3 changes: 2 additions & 1 deletion Quorum/apis/price_feeds/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .chainlink_api import ChainLinkAPI
from .chronicle_api import ChronicleAPI
from .price_feed_utils import PriceFeedData, PriceFeedProvider, PriceFeedProviderBase

all = [ChainLinkAPI, ChronicleAPI]
all = [ChainLinkAPI, ChronicleAPI, PriceFeedData, PriceFeedProvider, PriceFeedProviderBase]
94 changes: 19 additions & 75 deletions Quorum/apis/price_feeds/chainlink_api.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,9 @@
import requests
from pydantic import BaseModel
from typing import Optional

from Quorum.utils.chain_enum import Chain
from Quorum.utils.singleton import Singleton

class Docs(BaseModel):
assetClass: Optional[str] = None
assetName: Optional[str] = None
baseAsset: Optional[str] = None
baseAssetClic: Optional[str] = None
blockchainName: Optional[str] = None
clicProductName: Optional[str] = None
deliveryChannelCode: Optional[str] = None
feedType: Optional[str] = None
hidden: Optional[bool] = None
marketHours: Optional[str] = None
productSubType: Optional[str] = None
productType: Optional[str] = None
productTypeCode: Optional[str] = None
quoteAsset: Optional[str] = None
quoteAssetClic: Optional[str] = None


class PriceFeedData(BaseModel):
compareOffchain: Optional[str] = None
contractAddress: str
contractType: Optional[str] = None
contractVersion: Optional[int] = None
decimalPlaces: Optional[int] = None
ens: Optional[str] = None
formatDecimalPlaces: Optional[int] = None
healthPrice: Optional[str] = None
heartbeat: Optional[int] = None
history: Optional[str | bool] = None
multiply: Optional[str] = None
name: Optional[str] = None
pair: Optional[list[Optional[str]]] = None
path: Optional[str] = None
proxyAddress: Optional[str] = None
threshold: Optional[float] = None
valuePrefix: Optional[str] = None
assetName: Optional[str] = None
feedCategory: Optional[str] = None
feedType: Optional[str] = None
docs: Optional[Docs] = None
decimals: Optional[int] = None

from Quorum.utils.singleton import singleton
from .price_feed_utils import PriceFeedData, PriceFeedProviderBase, PriceFeedProvider

class ChainLinkAPI(metaclass=Singleton):
@singleton
class ChainLinkAPI(PriceFeedProviderBase):
"""
ChainLinkAPI is a class designed to interact with the Chainlink data feed API.
It fetches and stores price feed data for various blockchain networks supported by Chainlink.
Expand All @@ -72,42 +27,31 @@ class ChainLinkAPI(metaclass=Singleton):
Chain.SCR: "https://reference-data-directory.vercel.app/feeds-ethereum-mainnet-scroll-1.json",
Chain.ZK: "https://reference-data-directory.vercel.app/feeds-ethereum-mainnet-zksync-1.json"
}

def __init__(self) -> None:
"""
Initialize the ChainLinkAPI instance.

Creates an HTTP session for managing requests and initializes an in-memory cache to store
price feed data for different blockchain networks.
"""
self.session = requests.Session()
self.memory: dict[Chain, list[PriceFeedData]] = {}

def get_price_feeds_info(self, chain: Chain) -> list[PriceFeedData]:
def _get_price_feeds_info(self, chain: Chain) -> dict[str, PriceFeedData]:
"""
Fetches the price feeds information from the Chainlink API for a specified blockchain network.

This method first checks if the data for the given chain is already cached in memory. If not, it makes an HTTP
request to the Chainlink API to fetch the data, parses the JSON response into a list of PriceFeedData objects,
and stores it in the cache.
Get price feed data for a given blockchain network.

Args:
chain (Chain): The blockchain network to fetch price feeds for.

Returns:
list[PriceFeedData]: A list of PriceFeedData objects containing the price feeds information.
dict[str, PriceFeedData]: A dictionary mapping the contract address of the price feed to the PriceFeedData object.

Raises:
requests.HTTPError: If the HTTP request to the Chainlink API fails.
KeyError: If the specified chain is not supported.
"""
if chain not in self.memory:
url = self.chain_mapping.get(chain)
if not url:
raise KeyError(f"Chain {chain.name} is not supported.")

response = self.session.get(url)
response.raise_for_status()
self.memory[chain] = [PriceFeedData(**feed) for feed in response.json()]
url = self.chain_mapping.get(chain)
if not url:
raise KeyError(f"Chain {chain.name} is not supported.")

return self.memory[chain]
response = self.session.get(url)
response.raise_for_status()
chain_link_price_feeds = [PriceFeedData(**feed) for feed in response.json()]
chain_link_price_feeds = {feed.address: feed for feed in chain_link_price_feeds}
chain_link_price_feeds.update({feed.proxy_address: feed for feed in chain_link_price_feeds.values() if feed.proxy_address})
return chain_link_price_feeds

def get_name(self) -> PriceFeedProvider:
return PriceFeedProvider.CHAINLINK
49 changes: 27 additions & 22 deletions Quorum/apis/price_feeds/chronicle_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
from collections import defaultdict

from Quorum.utils.chain_enum import Chain
from Quorum.utils.singleton import Singleton
from Quorum.utils.singleton import singleton
from .price_feed_utils import PriceFeedData, PriceFeedProviderBase, PriceFeedProvider

class ChronicleAPI(metaclass=Singleton):

@singleton
class ChronicleAPI(PriceFeedProviderBase):
"""
ChronicleAPI is a class designed to interact with the Chronicle data feed API.
It fetches and stores price feed data for various blockchain networks supported by Chronicle.
Expand All @@ -15,8 +18,7 @@ class ChronicleAPI(metaclass=Singleton):
"""

def __init__(self):
self.session = requests.Session()
self.memory = defaultdict(list)
super().__init__()
self.__pairs = self.__process_pairs()

def __process_pairs(self) -> dict[str, list[str]]:
Expand All @@ -33,31 +35,34 @@ def __process_pairs(self) -> dict[str, list[str]]:
result[p["blockchain"]].append(p["pair"])
return result

def get_price_feeds_info(self, chain: Chain) -> list[dict]:
def _get_price_feeds_info(self, chain: Chain) -> dict[str, PriceFeedData]:
"""
Get price feed data for a given blockchain network.

This method retrieves price feed data from the cache if it has been fetched previously.

Args:
chain (Chain): The blockchain network to fetch price feed data for.
chain (Chain): The blockchain network to fetch price feeds for.

Returns:
dict: The price feed data for the specified chain.
dict[str, PriceFeedData]: A dictionary mapping the contract address of the price feed to the PriceFeedData object.
"""
if chain not in self.memory:
pairs = self.__pairs.get(chain)
if not pairs:
return []

pairs = self.__pairs.get(chain)
if not pairs:
return {}

chronicle_price_feeds: list[PriceFeedData] = []
for pair in pairs:
response = self.session.get(
f"https://chroniclelabs.org/api/median/info/{pair}/{chain.value}/?testnet=false"
)
response.raise_for_status()
data = response.json()

for pair in pairs:
response = self.session.get(
f"https://chroniclelabs.org/api/median/info/{pair}/{chain.value}/?testnet=false"
)
response.raise_for_status()
data = response.json()
for pair_info in data:
chronicle_price_feeds.append(PriceFeedData(**pair_info))

for pair_info in data:
self.memory[chain].append(pair_info)
return {feed.address: feed for feed in chronicle_price_feeds}


return self.memory[chain]
def get_name(self) -> PriceFeedProvider:
return PriceFeedProvider.CHRONICLE
73 changes: 73 additions & 0 deletions Quorum/apis/price_feeds/price_feed_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from abc import ABC, abstractmethod
from enum import StrEnum
from typing import Optional
from pydantic import BaseModel, Field
from pathlib import Path
import json
import requests

from Quorum.utils.chain_enum import Chain

class PriceFeedProvider(StrEnum):
"""
Enumeration for supported price feed providers.
"""
CHAINLINK = 'Chainlink'
CHRONICLE = 'Chronicle'

class PriceFeedData(BaseModel):
name: Optional[str]
pair: Optional[str | list]
address: str = Field(..., alias='contractAddress')
proxy_address: Optional[str] = Field(None, alias='proxyAddress')

class Config:
allow_population_by_field_name = True # Allows population using field names
extra = 'ignore' # Ignores extra fields not defined in the model


class PriceFeedProviderBase(ABC):
"""
PriceFeedProviderBase is an abstract base class for price feed providers.
It defines the interface for fetching price feed data for different blockchain networks.

Attributes:
cache_dir (Path): The directory path to store the fetched price feed data.
"""

def __init__(self):
super().__init__()
self.cache_dir = Path(__file__).parent / "cache" / self.get_name().value
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.session = requests.Session()
self.memory: dict[Chain, dict[str, PriceFeedData]] = {}

def get_feeds(self, chain: Chain) -> dict[str, PriceFeedData]:
"""
Get price feed data for a given blockchain network from the cache.

Args:
chain (Chain): The blockchain network to fetch price feed data for.

Returns:
dict[str, PriceFeedData]: A dictionary mapping the contract address of the price feed to the price feed data.
"""
cache_file = self.cache_dir / f"{chain.value}.json"
if cache_file.exists():
with open(cache_file, 'r') as file:
data: dict = json.load(file)
self.memory[chain] = {addr: PriceFeedData(**feed) for addr, feed in data.items()}
Comment on lines +56 to +59
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be missing something here but price feed data isn't updating? I mean I see you never update the cache only create it or read from it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create is the update, the same as it worked before

else:
if chain not in self.memory:
self.memory[chain] = self._get_price_feeds_info(chain)
with open(cache_file, 'w') as file:
json.dump({addr: feed.dict() for addr, feed in self.memory[chain].items()}, file)
return self.memory[chain]

@abstractmethod
def _get_price_feeds_info(self, chain: Chain) -> dict[str, PriceFeedData]:
pass

@abstractmethod
def get_name(self) -> PriceFeedProvider:
pass
Loading