Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: forester: switch logging from log to tracing #1143

Merged
merged 3 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions forester-utils/src/rpc/solana_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anchor_lang::prelude::Pubkey;
use anchor_lang::solana_program::clock::Slot;
use anchor_lang::solana_program::hash::Hash;
use anchor_lang::AnchorDeserialize;
use log::{debug, info, warn};
use log::{debug, warn};
use solana_client::rpc_client::RpcClient;
use solana_client::rpc_config::{RpcSendTransactionConfig, RpcTransactionConfig};
use solana_program_test::BanksClientError;
Expand Down Expand Up @@ -151,7 +151,7 @@ impl RpcConnection for SolanaRpcConnection {
&self,
program_id: &Pubkey,
) -> Result<Vec<(Pubkey, Account)>, RpcError> {
info!(
debug!(
"Fetching accounts for program: {}, client url: {}",
program_id,
self.client.url()
Expand Down Expand Up @@ -361,7 +361,7 @@ impl RpcConnection for SolanaRpcConnection {
}

async fn get_slot(&mut self) -> Result<u64, RpcError> {
println!("Calling get_slot");
debug!("Calling get_slot");
self.client.get_slot().map_err(RpcError::from)
}

Expand Down
1 change: 0 additions & 1 deletion forester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ photon-api = { path = "../photon-api" }
bincode = "1.3"
sysinfo = "0.31"
forester-utils = { path = "../forester-utils" }
log = "0.4"
env_logger = "0.11"
rand = "0.8.5"
dotenvy = "0.15.7"
Expand Down
8 changes: 4 additions & 4 deletions forester/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use forester_utils::forester_epoch::{Epoch, TreeAccounts, TreeForesterSchedule};
use light_registry::{EpochPda, ForesterEpochPda};
use log::info;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use tracing::debug;

#[derive(Debug, Clone)]
pub struct ForesterEpochInfo {
Expand All @@ -21,9 +21,9 @@ impl ForesterEpochInfo {
// let state = self.phases.get_current_epoch_state(current_solana_slot);
// TODO: add epoch state to sync schedule
for tree in trees {
info!("Adding tree schedule for {:?}", tree);
info!("Current slot: {}", current_solana_slot);
info!("Epoch: {:?}", self.epoch_pda);
debug!("Adding tree schedule for {:?}", tree);
debug!("Current slot: {}", current_solana_slot);
debug!("Epoch: {:?}", self.epoch_pda);
let tree_schedule = TreeForesterSchedule::new_with_schedule(
tree,
current_solana_slot,
Expand Down
24 changes: 15 additions & 9 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
Ok(())
}

#[instrument(level = "debug", skip(self, tx))]
async fn monitor_epochs(&self, tx: mpsc::Sender<u64>) -> Result<()> {
let mut last_epoch: Option<u64> = None;
debug!("Starting epoch monitor");
Expand All @@ -159,7 +160,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
let next_phases = get_epoch_phases(&self.protocol_config, next_epoch);
let mut rpc = self.rpc_pool.get_connection().await?;
let slots_to_wait = next_phases.registration.start.saturating_sub(slot);
info!(
debug!(
"Waiting for epoch {} registration phase to start. Current slot: {}, Registration phase start slot: {}, Slots to wait: {}",
next_epoch, slot, next_phases.registration.start, slots_to_wait
);
Expand Down Expand Up @@ -192,7 +193,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
.fetch_add(increment_by, Ordering::Relaxed);
}

#[instrument(level = "debug", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch
#[instrument(level = "info", skip(self), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch
))]
async fn process_epoch(&self, epoch: u64) -> Result<()> {
info!("Entering process_epoch");
Expand Down Expand Up @@ -258,7 +259,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
.await
{
Ok(Some(epoch)) => {
info!("Registered epoch: {:?}", epoch);
debug!("Registered epoch: {:?}", epoch);
epoch
}
Ok(None) => {
Expand All @@ -279,7 +280,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
.await
{
Ok(Some(pda)) => {
info!("ForesterEpochPda: {:?}", pda);
debug!("ForesterEpochPda: {:?}", pda);
pda
}
Ok(None) => {
Expand Down Expand Up @@ -373,7 +374,7 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
Ok(forester_epoch_info)
}

#[instrument(level = "debug", skip(self, epoch_info), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch
#[instrument(level = "info", skip(self, epoch_info), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch.epoch
))]
async fn wait_for_active_phase(
&self,
Expand Down Expand Up @@ -503,21 +504,21 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
epoch_pda: ForesterEpochPda,
mut tree: TreeForesterSchedule,
) -> Result<()> {
info!("enter process_queue");
info!("Tree schedule slots: {:?}", tree.slots);
debug!("enter process_queue");
debug!("Tree schedule slots: {:?}", tree.slots);
// TODO: sync at some point
let mut estimated_slot = self.slot_tracker.estimated_current_slot();

while estimated_slot < epoch_info.phases.active.end {
info!("Processing queue");
debug!("Processing queue");
// search for next eligible slot
let index_and_forester_slot = tree
.slots
.iter()
.enumerate()
.find(|(_, slot)| slot.is_some());

info!("Result: {:?}", index_and_forester_slot);
debug!("Result: {:?}", index_and_forester_slot);
if let Some((index, forester_slot)) = index_and_forester_slot {
let forester_slot = forester_slot.as_ref().unwrap().clone();
tree.slots.remove(index);
Expand Down Expand Up @@ -697,6 +698,11 @@ impl<R: RpcConnection, I: Indexer<R>> EpochManager<R, I> {
}
}

#[instrument(
level = "info",
skip(config, protocol_config, rpc_pool, indexer, shutdown, work_report_sender, slot_tracker),
fields(forester = %config.payer_keypair.pubkey())
)]
pub async fn run_service<R: RpcConnection, I: Indexer<R>>(
config: Arc<ForesterConfig>,
protocol_config: Arc<ProtocolConfig>,
Expand Down
6 changes: 3 additions & 3 deletions forester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ pub use config::{ForesterConfig, ForesterEpochInfo};
use forester_utils::forester_epoch::{TreeAccounts, TreeType};
use forester_utils::indexer::Indexer;
use forester_utils::rpc::{RpcConnection, SolanaRpcConnection};
use log::info;
pub use settings::init_config;
use solana_sdk::commitment_config::CommitmentConfig;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::debug;

pub async fn run_queue_info(
config: Arc<ForesterConfig>,
Expand All @@ -55,7 +55,7 @@ pub async fn run_queue_info(
QUEUE_LENGTH
.with_label_values(&[&*queue_type.to_string(), &tree_data.merkle_tree.to_string()])
.set(queue_length as i64);
info!(
println!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

println! instead of debug on purpose?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It's not a debug printout, its output for the user.

"{:?} queue {} length: {}",
queue_type, tree_data.queue, queue_length
);
Expand Down Expand Up @@ -102,7 +102,7 @@ pub async fn run_pipeline<R: RpcConnection, I: Indexer<R>>(
SlotTracker::run(arc_slot_tracker_clone, &mut *rpc).await;
});

info!("Starting Forester pipeline");
debug!("Starting Forester pipeline");
run_service(
config,
Arc::new(protocol_config),
Expand Down
6 changes: 3 additions & 3 deletions forester/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ use forester::tree_data_sync::fetch_trees;
use forester::{init_config, run_pipeline, run_queue_info};
use forester_utils::forester_epoch::TreeType;
use forester_utils::rpc::{RpcConnection, SolanaRpcConnection};
use log::{debug, info, warn};
use std::sync::Arc;
use tokio::signal::ctrl_c;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, warn};

#[tokio::main]
async fn main() -> Result<(), ForesterError> {
Expand Down Expand Up @@ -55,8 +55,8 @@ async fn main() -> Result<(), ForesterError> {
run_pipeline(config, indexer, shutdown_receiver, work_report_sender).await?
}
Some(Commands::Status) => {
info!("Fetching trees...");
info!("RPC URL: {}", config.external_services.rpc_url);
debug!("Fetching trees...");
debug!("RPC URL: {}", config.external_services.rpc_url);
let rpc = SolanaRpcConnection::new(config.external_services.rpc_url.to_string(), None);
let trees = fetch_trees(&rpc).await;
if trees.is_empty() {
Expand Down
8 changes: 4 additions & 4 deletions forester/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use reqwest::Client;
use std::sync::Once;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::Mutex;
use tracing::{error, info};
use tracing::{debug, error};

lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
Expand Down Expand Up @@ -94,7 +94,7 @@ pub fn update_transactions_processed(epoch: u64, count: usize, duration: std::ti
.with_label_values(&[&epoch.to_string()])
.set(rate);

info!(
debug!(
"Updated metrics for epoch {}: processed = {}, rate = {} tx/s",
epoch, count, rate
);
Expand All @@ -120,13 +120,13 @@ pub async fn push_metrics(url: &str) -> Result<()> {
let mut buffer = Vec::new();
encoder.encode(&metric_families, &mut buffer)?;

info!("Pushing metrics to Pushgateway");
debug!("Pushing metrics to Pushgateway");

let client = Client::new();
let res = client.post(url).body(buffer).send().await?;

if res.status().is_success() {
println!("Successfully pushed metrics to Pushgateway");
debug!("Successfully pushed metrics to Pushgateway");
Ok(())
} else {
let error_message = format!(
Expand Down
4 changes: 2 additions & 2 deletions forester/src/photon_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::utils::decode_hash;
use account_compression::initialize_address_merkle_tree::Pubkey;
use forester_utils::indexer::{Indexer, IndexerError, MerkleProof, NewAddressProofWithContext};
use forester_utils::rpc::RpcConnection;
use log::{debug, info};
use photon_api::apis::configuration::{ApiKey, Configuration};
use photon_api::models::GetCompressedAccountsByOwnerPostRequestParams;
use solana_sdk::bs58;
use std::fmt::Debug;
use tracing::debug;

pub struct PhotonIndexer<R: RpcConnection> {
configuration: Configuration,
Expand Down Expand Up @@ -132,7 +132,7 @@ impl<R: RpcConnection> Indexer<R> for PhotonIndexer<R> {
..Default::default()
};

info!("Request: {:?}", request);
debug!("Request: {:?}", request);

let result = photon_api::apis::default_api::get_multiple_new_address_proofs_post(
&self.configuration,
Expand Down
12 changes: 6 additions & 6 deletions forester/src/pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::ForesterConfig;
use crate::Result;
use account_compression::initialize_address_merkle_tree::Pubkey;
use futures::StreamExt;
use log::{debug, error, info};
use solana_account_decoder::UiAccountEncoding;
use solana_client::nonblocking::pubsub_client::PubsubClient;
use solana_client::rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig};
Expand All @@ -13,12 +12,13 @@ use std::str::FromStr;
use std::thread;
use tokio::runtime::Builder;
use tokio::sync::mpsc;
use tracing::{debug, error};

pub async fn setup_pubsub_client(
config: &ForesterConfig,
queue_pubkeys: std::collections::HashSet<Pubkey>,
) -> Result<(mpsc::Receiver<QueueUpdate>, mpsc::Sender<()>)> {
info!(
debug!(
"Setting up pubsub client for {} queues",
queue_pubkeys.len()
);
Expand All @@ -38,7 +38,7 @@ pub async fn setup_pubsub_client(
if let Err(e) = result {
error!("PubSub client error: {:?}", e);
} else {
info!("PubSub client thread completed successfully");
debug!("PubSub client thread completed successfully");
}
}
Err(e) => error!("Failed to join PubSub client thread: {:?}", e),
Expand All @@ -61,12 +61,12 @@ fn spawn_pubsub_client(
.map_err(|e| ForesterError::Custom(format!("Failed to build runtime: {}", e)))?;

rt.block_on(async {
info!("Connecting to PubSub at {}", ws_url);
debug!("Connecting to PubSub at {}", ws_url);
let pubsub_client = PubsubClient::new(&ws_url).await.map_err(|e| {
ForesterError::Custom(format!("Failed to create PubsubClient: {}", e))
})?;

info!("PubSub connection established");
debug!("PubSub connection established");

let (mut subscription, _) = pubsub_client
.program_subscribe(
Expand Down Expand Up @@ -109,7 +109,7 @@ fn spawn_pubsub_client(
}
}
}
info!("PubSub client loop ended");
debug!("PubSub client loop ended");
Ok(())
})
})
Expand Down
4 changes: 2 additions & 2 deletions forester/src/queue_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use account_compression::initialize_address_merkle_tree::Pubkey;
use account_compression::QueueAccount;
use forester_utils::rpc::RpcConnection;
use light_hash_set::HashSet;
use log::debug;
use std::mem;
use tracing::debug;

#[derive(Debug, Clone)]
pub struct QueueItemData {
Expand Down Expand Up @@ -37,7 +37,7 @@ pub async fn fetch_queue_item_data<R: RpcConnection>(
}
})
.collect();
log::info!("Queue data fetched: {:?}", filtered_queue);
debug!("Queue data fetched: {:?}", filtered_queue);
Ok(filtered_queue)
}

Expand Down
4 changes: 2 additions & 2 deletions forester/src/rollover/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use light_registry::account_compression_cpi::sdk::{
CreateRolloverMerkleTreeInstructionInputs,
};
use light_registry::protocol_config::state::ProtocolConfig;
use log::info;
use solana_sdk::instruction::Instruction;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Keypair;
use solana_sdk::signer::Signer;
use solana_sdk::transaction::Transaction;
use tokio::sync::Mutex;
use tracing::{debug, info};

use crate::errors::ForesterError;
use crate::ForesterConfig;
Expand Down Expand Up @@ -42,7 +42,7 @@ pub async fn is_tree_ready_for_rollover<R: RpcConnection>(
tree_pubkey: Pubkey,
tree_type: TreeType,
) -> Result<bool, ForesterError> {
info!(
debug!(
"Checking if tree is ready for rollover: {:?}",
tree_pubkey.to_string()
);
Expand Down
Loading