Skip to content

feat(argus): ControllerService update loop, ChainPriceService poll loop #2693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions apps/argus/src/adapters/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,39 @@ use std::collections::HashMap;

#[async_trait]
pub trait GetChainPrices {
async fn get_price_unsafe(
async fn get_all_prices_for_subscription(
&self,
subscription_id: SubscriptionId,
feed_id: &PriceId,
) -> Result<Option<Price>>;
) -> Result<Vec<Price>>;

async fn get_prices_for_subscription(
&self,
subscription_id: SubscriptionId,
price_ids: &Vec<PriceId>,
) -> Result<Vec<Price>>;
}

#[async_trait]
impl<M: Middleware + 'static> GetChainPrices for PythPulse<M> {
async fn get_price_unsafe(
async fn get_all_prices_for_subscription(
&self,
_subscription_id: SubscriptionId,
_feed_id: &PriceId,
) -> Result<Option<Price>> {
todo!()
subscription_id: SubscriptionId,
) -> Result<Vec<Price>> {
let price_ids = self.get_prices_unsafe(subscription_id, vec![]).await?;
Ok(price_ids.into_iter().map(From::from).collect())
}
async fn get_prices_for_subscription(
&self,
subscription_id: SubscriptionId,
price_ids: &Vec<PriceId>,
) -> Result<Vec<Price>> {
let price_ids = self
.get_prices_unsafe(
subscription_id,
price_ids.into_iter().map(|id| id.to_bytes()).collect(),
)
.await?;
Ok(price_ids.into_iter().map(From::from).collect())
}
}
#[async_trait]
Expand Down
17 changes: 17 additions & 0 deletions apps/argus/src/adapters/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,20 @@ use pyth_sdk::PriceIdentifier;

pub type PriceId = PriceIdentifier;
pub type SubscriptionId = U256;

use crate::adapters::ethereum::pyth_pulse::Price as ContractPrice; // ABI-generated Price
use pyth_sdk::Price as SdkPrice; // pyth_sdk::Price

impl From<ContractPrice> for SdkPrice {
fn from(contract_price: ContractPrice) -> Self {
SdkPrice {
price: contract_price.price,
conf: contract_price.conf,
expo: contract_price.expo,
publish_time: contract_price
.publish_time
.try_into()
.expect("Failed to convert publish_time from U256 to i64 (UnixTimestamp)"),
}
}
}
2 changes: 2 additions & 0 deletions apps/argus/src/command/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub async fn run_keeper_for_chain(
contract.clone(),
config.keeper.chain_price_poll_interval,
state.chain_price_state.clone(),
state.subscription_state.clone(),
);

let price_pusher_service = PricePusherService::new(
Expand All @@ -157,6 +158,7 @@ pub async fn run_keeper_for_chain(
state.subscription_state.clone(),
state.pyth_price_state.clone(),
state.chain_price_state.clone(),
price_pusher_service.request_sender(),
);

let services: Vec<Arc<dyn Service>> = vec![
Expand Down
73 changes: 65 additions & 8 deletions apps/argus/src/services/chain_price_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,28 @@

use anyhow::Result;
use async_trait::async_trait;
use pyth_sdk::Price;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::time;
use tracing;

use crate::adapters::contract::GetChainPrices;
use crate::adapters::types::PriceId;
use crate::services::Service;
use crate::state::ChainName;
use crate::state::ChainPriceState;
use crate::state::SubscriptionState;

pub struct ChainPriceService {
chain_name: ChainName,
name: String,
contract: Arc<dyn GetChainPrices + Send + Sync>,
poll_interval: Duration,
chain_price_state: Arc<ChainPriceState>,
subscription_state: Arc<SubscriptionState>,
}

impl ChainPriceService {
Expand All @@ -32,24 +37,70 @@ impl ChainPriceService {
contract: Arc<dyn GetChainPrices + Send + Sync>,
poll_interval: Duration,
chain_price_state: Arc<ChainPriceState>,
subscription_state: Arc<SubscriptionState>,
) -> Self {
Self {
chain_name: chain_name.clone(),
name: format!("ChainPriceService-{}", chain_name),
contract,
poll_interval,
chain_price_state,
subscription_state,
}
}

async fn poll_prices(&self, state: Arc<ChainPriceState>) {
let feed_ids = state.get_feed_ids();
#[tracing::instrument(skip_all, fields(task = self.name, chain_name = self.chain_name))]
async fn poll_prices(&self) -> Result<()> {
// Get all active subscriptions
let subscriptions = self.subscription_state.get_subscriptions();

tracing::debug!(
service = self.name,
feed_count = feed_ids.len(),
"Polled for on-chain price updates"
);
// For each subscription, query the chain for the price of each feed
for item in subscriptions.iter() {
let subscription_id = item.key().clone();
let subscription_params = item.value().clone();

// TODO: do this in parallel using tokio tasks?
let price_ids = subscription_params
.price_ids
.into_iter()
.map(|id| PriceId::new(id))
.collect::<Vec<PriceId>>();

match self
.contract
.get_prices_for_subscription(subscription_id, &price_ids)
.await
{
Ok(prices) => {
let prices_map: HashMap<PriceId, Price> = price_ids
.clone()
.into_iter()
.zip(prices.into_iter())
.collect();

tracing::debug!(
price_ids = ?price_ids,
subscription_id = %subscription_id,
"Got prices for subscription"
);

// Store the latest price feeds for the subscription
self.chain_price_state
.update_prices(subscription_id, prices_map);
}
Err(e) => {
// If we failed to get prices for a subscription, we'll retry on the next poll interval.
// Continue to the next subscription.
tracing::error!(
subscription_id = %subscription_id,
error = %e,
"Failed to get prices for subscription"
);
continue;
}
}
}
Ok(())
}
}

Expand All @@ -64,7 +115,13 @@ impl Service for ChainPriceService {
loop {
tokio::select! {
_ = interval.tick() => {
self.poll_prices(self.chain_price_state.clone()).await;
if let Err(e) = self.poll_prices().await {
tracing::error!(
service = self.name,
error = %e,
"Failed to poll chain prices"
);
}
}
_ = stop_rx.changed() => {
if *stop_rx.borrow() {
Expand Down
Loading