Skip to content

Commit

Permalink
Merge pull request #285 from ergoplatform/i53-prom-monitoring
Browse files Browse the repository at this point in the history
Export Prometheus metrics
  • Loading branch information
greenhat authored Jun 12, 2023
2 parents ab90edc + 098d391 commit 66ccb9a
Show file tree
Hide file tree
Showing 13 changed files with 1,608 additions and 246 deletions.
26 changes: 16 additions & 10 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
[package]
name = "oracle-core"
version = "2.0.0-beta9"
authors = ["Robert Kornacki <11645932+robkorn@users.noreply.github.com>", "@greenhat", "@kettlebell", "@SethDusek"]
authors = [
"Robert Kornacki <11645932+robkorn@users.noreply.github.com>",
"@greenhat",
"@kettlebell",
"@SethDusek",
]
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down Expand Up @@ -29,19 +34,20 @@ ergo-lib = { workspace = true }
ergo-node-interface = { git = "https://github.com/ergoplatform/ergo-node-interface-rust", rev = "143c2a3dc8fb772d1af37f1f1e1924067c6aad14" }
# ergo-node-interface = { version = "0.4" }
derive_more = "0.99"
clap = {version = "4.2.4", features = ["derive"]}
clap = { version = "4.2.4", features = ["derive"] }
exitcode = "1.1.2"
lazy_static = "1.4.0"
once_cell = "1.15.0"
futures = "0.3"
prometheus = "0.13"

[dev-dependencies]
ergo-lib = { workspace = true, features = ["arbitrary"]}
proptest = {version = "1.0.0"}
proptest-derive = {version = "0.3.0"}
sigma-test-util = {version = "0.3.0"}
ergo-chain-sim = {version = "0.1.0", path="../ergo-chain-sim"}
env_logger = {version = "0.10.0"}
tokio-test = {version = "0.4"}
pretty_assertions = {workspace = true}
ergo-lib = { workspace = true, features = ["arbitrary"] }
proptest = { version = "1.0.0" }
proptest-derive = { version = "0.3.0" }
sigma-test-util = { version = "0.3.0" }
ergo-chain-sim = { version = "0.1.0", path = "../ergo-chain-sim" }
env_logger = { version = "0.10.0" }
tokio-test = { version = "0.4" }
pretty_assertions = { workspace = true }
expect-test = "1.0.1"
142 changes: 14 additions & 128 deletions core/src/address_util.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use ergo_lib::ergotree_ir::{
chain::address::{Address, AddressEncoder, AddressEncoderError},
mir::constant::{Constant, Literal},
serialization::{SigmaParsingError, SigmaSerializable, SigmaSerializationError},
sigma_protocol::sigma_boolean::ProveDlog,
};
use ergo_lib::ergo_chain_types::EcPoint;
use ergo_lib::ergotree_ir::chain::address::Address;
use ergo_lib::ergotree_ir::chain::address::AddressEncoderError;
use ergo_lib::ergotree_ir::chain::address::NetworkAddress;
use ergo_lib::ergotree_ir::chain::address::NetworkPrefix;
use ergo_lib::ergotree_ir::serialization::SigmaParsingError;
use ergo_lib::ergotree_ir::serialization::SigmaSerializationError;
use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -22,126 +23,11 @@ pub enum AddressUtilError {
Base16DecodeError(#[from] base16::DecodeError),
}

/// Given a P2S Ergo address, extract the hex-encoded serialized ErgoTree (script)
pub fn address_to_tree(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
let script = address_parsed.address().script()?;
Ok(base16::encode_lower(&script.sigma_serialize_bytes()?))
}

/// Given a P2S Ergo address, convert it to a hex-encoded Sigma byte array constant
pub fn address_to_bytes(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
let script = address_parsed.address().script()?;
Ok(base16::encode_lower(
&Constant::from(script.sigma_serialize_bytes()?).sigma_serialize_bytes()?,
))
}

/// Given an Ergo P2PK Address, convert it to a raw hex-encoded EC point
/// and prepend the type bytes so it is encoded and ready
/// to be used in a register.
pub fn address_to_raw_for_register(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
match address_parsed.address() {
Address::P2Pk(ProveDlog { h }) => Ok(base16::encode_lower(
&Constant::from(*h).sigma_serialize_bytes()?,
)),
Address::P2SH(_) | Address::P2S(_) => Err(AddressUtilError::ExpectedP2PK),
}
}

/// Given an Ergo P2PK Address, convert it to a raw hex-encoded EC point
pub fn address_to_raw(address: &str) -> Result<String, AddressUtilError> {
let address_parsed = AddressEncoder::unchecked_parse_network_address_from_str(address)?;
match address_parsed.address() {
Address::P2Pk(_) => Ok(base16::encode_lower(
&address_parsed.address().content_bytes(),
)),
Address::P2SH(_) | Address::P2S(_) => Err(AddressUtilError::ExpectedP2PK),
}
}

/// Given a raw hex-encoded EC point, convert it to a P2PK address
pub fn raw_to_address(raw: &str) -> Result<Address, AddressUtilError> {
let bytes = base16::decode(raw)?;
Address::p2pk_from_pk_bytes(&bytes).map_err(Into::into)
}

/// Given a raw hex-encoded EC point from a register (thus with type encoded characters in front),
/// convert it to a P2PK address
pub fn raw_from_register_to_address(raw: &str) -> Result<Address, AddressUtilError> {
let bytes = base16::decode(raw)?;
let constant = Constant::sigma_parse_bytes(&bytes)?;
if let Literal::GroupElement(h) = constant.v {
Ok(Address::P2Pk(ProveDlog { h }))
} else {
Err(AddressUtilError::ExpectedP2PK)
}
}

#[cfg(test)]
mod test {
use ergo_lib::ergotree_ir::chain::address::{AddressEncoder, NetworkPrefix};

use crate::address_util::{
address_to_bytes, address_to_raw, address_to_raw_for_register, address_to_tree,
raw_from_register_to_address, raw_to_address,
};

// Test serialization for default address argument of /utils/addressToRaw
#[test]
fn test_address_to_raw_for_register() {
assert_eq!(
"07028333f9f7454f8d5ff73dbac9833767ed6fc3a86cf0a73df946b32ea9927d9197",
address_to_raw_for_register("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap()
);
assert_eq!(
"028333f9f7454f8d5ff73dbac9833767ed6fc3a86cf0a73df946b32ea9927d9197",
address_to_raw("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt").unwrap()
);
}
#[test]
fn test_address_raw_roundtrip() {
let address = AddressEncoder::new(NetworkPrefix::Testnet)
.parse_address_from_str("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap();
assert_eq!(
address,
raw_to_address(
&address_to_raw("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt").unwrap()
)
.unwrap()
);
}
#[test]
fn test_address_raw_register_roundtrip() {
let address = AddressEncoder::new(NetworkPrefix::Testnet)
.parse_address_from_str("3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt")
.unwrap();
assert_eq!(
address,
raw_from_register_to_address(
&address_to_raw_for_register(
"3WwbzW6u8hKWBcL1W7kNVMr25s2UHfSBnYtwSHvrRQt7DdPuoXrt"
)
.unwrap()
)
.unwrap()
);
}

// test serialization of "sigmaProp(true)" script
#[test]
fn test_address_to_tree() {
assert_eq!(
"10010101d17300",
address_to_tree("Ms7smJwLGbUAjuWQ").unwrap()
);
assert_eq!(
"0e0710010101d17300",
address_to_bytes("Ms7smJwLGbUAjuWQ").unwrap()
);
}
pub fn pks_to_network_addresses(
pks: Vec<EcPoint>,
network_prefix: NetworkPrefix,
) -> Vec<NetworkAddress> {
pks.into_iter()
.map(|pk| NetworkAddress::new(network_prefix, &Address::P2Pk(pk.into())))
.collect()
}
105 changes: 33 additions & 72 deletions core/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use std::convert::From;
use std::net::SocketAddr;
use std::sync::Arc;

use crate::box_kind::{OracleBoxWrapper, PoolBox};
use crate::node_interface::node_api::NodeApi;
use crate::oracle_config::{get_core_api_port, ORACLE_CONFIG};
use crate::box_kind::PoolBox;
use crate::monitor::{check_oracle_health, check_pool_health, PoolHealth};
use crate::node_interface::node_api::{NodeApi, NodeApiError};
use crate::oracle_config::ORACLE_CONFIG;
use crate::oracle_state::{DataSourceError, LocalDatapointState, OraclePool};
use crate::pool_config::POOL_CONFIG;
use axum::http::StatusCode;
Expand Down Expand Up @@ -135,26 +136,8 @@ fn pool_status_sync(oracle_pool: Arc<OraclePool>) -> Result<Json<serde_json::Val
.epoch_length();
let pool_box_height = pool_box.get_box().creation_height;
let epoch_end_height = pool_box_height + epoch_length.0 as u32;

let posted_boxes = oracle_pool
.get_posted_datapoint_boxes_source()
.get_posted_datapoint_boxes()?;
let posted_count_current_epoch = posted_boxes
.into_iter()
.filter(|b| b.get_box().creation_height >= pool_box_height)
.count();

let collected_boxes = oracle_pool
.get_collected_datapoint_boxes_source()
.get_collected_datapoint_boxes()?;
let collected_count_previous_epoch = collected_boxes
.into_iter()
.filter(|b| b.get_box().creation_height == pool_box_height)
.count();

let active_oracle_count = collected_count_previous_epoch + posted_count_current_epoch;
let pool_health = pool_health_sync(oracle_pool)?;

let active_oracle_count = pool_health.details.active_oracles.len();
let json = Json(json!({
"latest_pool_datapoint": pool_box.rate(),
"latest_pool_box_height": pool_box_height,
Expand Down Expand Up @@ -202,72 +185,43 @@ fn oracle_health_sync(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value,
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height;
let mut check_details = json!({
"pool_box_height": pool_box_height,
});
let is_healthy = match oracle_pool
.get_local_datapoint_box_source()
.get_local_oracle_datapoint_box()?
{
Some(b) => match b {
OracleBoxWrapper::Posted(posted_box) => {
let creation_height = posted_box.get_box().creation_height;
check_details["posted_box_height"] = json!(creation_height);
creation_height > pool_box_height
}
OracleBoxWrapper::Collected(collected_box) => {
let creation_height = collected_box.get_box().creation_height;
check_details["collected_box_height"] = json!(creation_height);
creation_height == pool_box_height
}
},
None => false,
};
let json = json!({
"status": if is_healthy { "OK" } else { "DOWN" },
"details": check_details,
});
Ok(json)
.creation_height
.into();
let oracle_health = check_oracle_health(oracle_pool, pool_box_height)?;
Ok(serde_json::to_value(oracle_health).unwrap())
}

async fn pool_health(oracle_pool: Arc<OraclePool>) -> Result<Json<serde_json::Value>, ApiError> {
let json = task::spawn_blocking(|| pool_health_sync(oracle_pool))
let json = task::spawn_blocking(|| pool_health_sync_json(oracle_pool))
.await
.unwrap()?;
Ok(Json(json))
}
fn pool_health_sync(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value, ApiError> {
let pool_conf = &POOL_CONFIG;

fn pool_health_sync(oracle_pool: Arc<OraclePool>) -> Result<PoolHealth, ApiError> {
let node_api = NodeApi::new(ORACLE_CONFIG.node_api_key.clone(), &ORACLE_CONFIG.node_url);
let current_height = node_api.node.current_block_height()? as u32;
let current_height = (node_api.node.current_block_height()? as u32).into();
let pool_box_height = oracle_pool
.get_pool_box_source()
.get_pool_box()?
.get_box()
.creation_height;
let epoch_length = pool_conf
.refresh_box_wrapper_inputs
.contract_inputs
.contract_parameters()
.epoch_length()
.0 as u32;
let check_details = json!({
"pool_box_height": pool_box_height,
"current_block_height": current_height,
"epoch_length": epoch_length,
});
let is_healthy = pool_box_height >= current_height - epoch_length;
let json = json!({
"status": if is_healthy { "OK" } else { "DOWN" },
"details": check_details,
});
Ok(json)
.creation_height
.into();
let network_prefix = node_api.get_change_address()?.network();
let pool_health =
check_pool_health(current_height, pool_box_height, oracle_pool, network_prefix)?;
Ok(pool_health)
}

fn pool_health_sync_json(oracle_pool: Arc<OraclePool>) -> Result<serde_json::Value, ApiError> {
let pool_health = pool_health_sync(oracle_pool)?;
Ok(serde_json::to_value(pool_health).unwrap())
}

pub async fn start_rest_server(
repost_receiver: Receiver<bool>,
oracle_pool: Arc<OraclePool>,
api_port: u16,
) -> Result<(), anyhow::Error> {
let op_clone = oracle_pool.clone();
let op_clone2 = oracle_pool.clone();
Expand All @@ -290,7 +244,8 @@ pub async fn start_rest_server(
.allow_origin(tower_http::cors::Any)
.allow_methods([axum::http::Method::GET]),
);
let addr = SocketAddr::from(([0, 0, 0, 0], get_core_api_port().parse().unwrap()));
let addr = SocketAddr::from(([0, 0, 0, 0], api_port));
log::info!("Starting REST server on {}", addr);
axum::Server::try_bind(&addr)?
.serve(app.into_make_service())
.await?;
Expand Down Expand Up @@ -322,3 +277,9 @@ impl From<anyhow::Error> for ApiError {
ApiError(format!("Error: {:?}", err))
}
}

impl From<NodeApiError> for ApiError {
fn from(err: NodeApiError) -> Self {
ApiError(format!("NodeApiError: {:?}", err))
}
}
17 changes: 12 additions & 5 deletions core/src/box_kind/oracle_box.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,10 @@ impl OracleBox for OracleBoxWrapper {
}

fn public_key(&self) -> EcPoint {
self.get_box()
.get_register(NonMandatoryRegisterId::R4.into())
.unwrap()
.try_extract_into::<EcPoint>()
.unwrap()
match self {
OracleBoxWrapper::Posted(p) => p.public_key().clone(),
OracleBoxWrapper::Collected(c) => c.public_key().clone(),
}
}

fn get_box(&self) -> &ErgoBox {
Expand Down Expand Up @@ -276,6 +275,14 @@ impl CollectedOracleBox {
pub fn get_box(&self) -> &ErgoBox {
&self.ergo_box
}

pub fn public_key(&self) -> EcPoint {
self.ergo_box
.get_register(NonMandatoryRegisterId::R4.into())
.unwrap()
.try_extract_into::<EcPoint>()
.unwrap()
}
}

#[derive(Clone, Debug)]
Expand Down
Loading

0 comments on commit 66ccb9a

Please sign in to comment.