Skip to content

Commit

Permalink
add initial steps on p2p comms
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgeantonio21 committed Feb 6, 2025
1 parent 78ece46 commit 0cfa379
Show file tree
Hide file tree
Showing 22 changed files with 3,927 additions and 309 deletions.
1,474 changes: 1,405 additions & 69 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ resolver = "2"

members = [
"atoma-confidential",
"atoma-daemon",
"atoma-daemon",
"atoma-p2p",
"atoma-service",
"atoma-state",
"atoma-sui",
Expand All @@ -27,6 +28,8 @@ atoma-utils = { path = "./atoma-utils" }
axum = "0.7.5"
base64 = "0.22.1"
blake2 = "0.10.6"
blake3 = "1.5.5"
ciborium = "0.2.2"
clap = "4.5.4"
config = "0.14.0"
dcap-rs = { git = "https://github.com/automata-network/dcap-rs.git" }
Expand All @@ -42,6 +45,7 @@ http = "1.2"
hyper = "1.6.0"
isocountry = "0.3.2"
lazy_static = "1.5.0"
libp2p = "0.55.0"
metrics = "0.23"
metrics-exporter-prometheus = "0.14.0"
once_cell = "1.20.2"
Expand All @@ -68,6 +72,7 @@ tower-http = "0.6.2"
tracing = "0.1.40"
tracing-appender = "0.2.3"
tracing-subscriber = "0.3.18"
url = "2.5.4"
utoipa = "5.1.1"
utoipa-swagger-ui = "8.0.1"
validator = "0.20.0"
Expand Down
72 changes: 41 additions & 31 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ use std::{
use anyhow::{Context, Result};
use atoma_confidential::AtomaConfidentialCompute;
use atoma_daemon::{AtomaDaemonConfig, DaemonState};
use atoma_service::{
config::AtomaServiceConfig,
proxy::{config::ProxyConfig, register_on_proxy},
server::AppState,
};
use atoma_p2p::{AtomaP2pNode, AtomaP2pNodeConfig};
use atoma_service::{config::AtomaServiceConfig, server::AppState};
use atoma_state::{config::AtomaStateManagerConfig, AtomaState, AtomaStateManager};
use atoma_sui::{client::Client, config::Config, subscriber::Subscriber};
use atoma_utils::spawn_with_shutdown;
Expand All @@ -37,7 +34,6 @@ use tracing_subscriber::{
prelude::*,
EnvFilter, Registry,
};
use validator::{Validate, ValidationErrors};

/// The name of the environment variable for the Hugging Face token
const HF_TOKEN: &str = "HF_TOKEN";
Expand Down Expand Up @@ -69,6 +65,9 @@ struct NodeConfig {
/// Configuration for the Sui component.
sui: Config,

/// Configuration for the p2p component.
p2p: AtomaP2pNodeConfig,

/// Configuration for the service component.
service: AtomaServiceConfig,

Expand All @@ -77,26 +76,22 @@ struct NodeConfig {

/// Configuration for the daemon component.
daemon: AtomaDaemonConfig,

/// Configuration for the proxy component.
proxy: ProxyConfig,
}

impl NodeConfig {
fn load(path: &str) -> Result<Self, ValidationErrors> {
fn load(path: &str) -> Self {
let sui = Config::from_file_path(path);
let p2p = AtomaP2pNodeConfig::from_file_path(path);
let service = AtomaServiceConfig::from_file_path(path);
let state = AtomaStateManagerConfig::from_file_path(path);
let daemon = AtomaDaemonConfig::from_file_path(path);
let proxy = ProxyConfig::from_file_path(path);
proxy.validate()?;
Ok(Self {
Self {
sui,
p2p,
service,
state,
daemon,
proxy,
})
}
}
}

Expand Down Expand Up @@ -181,14 +176,14 @@ async fn main() -> Result<()> {
dotenvy::dotenv().ok();

let args = Args::parse();
let config = NodeConfig::load(&args.config_path)?;
let config = NodeConfig::load(&args.config_path);

info!("Starting Atoma node service");

let (shutdown_sender, mut shutdown_receiver) = watch::channel(false);
let (event_subscriber_sender, event_subscriber_receiver) = flume::unbounded();
let (state_manager_sender, state_manager_receiver) = flume::unbounded();

let (p2p_event_sender, p2p_event_receiver) = flume::unbounded();
info!(
target = "atoma-node-service",
event = "keystore_path",
Expand All @@ -212,6 +207,22 @@ async fn main() -> Result<()> {
.unwrap()
});

info!(
target = "atoma-node-service",
event = "p2p_node_spawn",
"Spawning Atoma's p2p node service"
);
let p2p_node_service_shutdown_receiver = shutdown_receiver.clone();
let p2p_node_service_handle = spawn_with_shutdown(
async move {
let p2p_node =
AtomaP2pNode::start(config.p2p, Arc::new(keystore), p2p_event_sender, false)?;
let pinned_future = Box::pin(p2p_node.run(p2p_node_service_shutdown_receiver));
pinned_future.await
},
shutdown_sender.clone(),
);

info!(
target = "atoma-node-service",
event = "state_manager_service_spawn",
Expand All @@ -226,6 +237,7 @@ async fn main() -> Result<()> {
&database_url,
event_subscriber_receiver,
state_manager_receiver,
p2p_event_receiver,
)
.await?;
state_manager.run(state_manager_shutdown_receiver).await
Expand All @@ -240,19 +252,6 @@ async fn main() -> Result<()> {
let (app_state_encryption_sender, app_state_encryption_receiver) =
tokio::sync::mpsc::unbounded_channel();

for (_, node_small_id) in &config.daemon.node_badges {
if let Err(e) =
register_on_proxy(&config.proxy, *node_small_id, &keystore, address_index).await
{
error!(
target = "atoma-node-service",
event = "register_on_proxy_error",
error = ?e,
"Failed to register on proxy server"
);
}
}

info!(
target = "atoma-node-service",
event = "confidential_compute_service_spawn",
Expand Down Expand Up @@ -288,7 +287,7 @@ async fn main() -> Result<()> {
);

let subscriber = Subscriber::new(
config.sui,
config.sui.clone(),
event_subscriber_sender,
stack_retrieve_receiver,
subscriber_confidential_compute_sender,
Expand Down Expand Up @@ -326,6 +325,9 @@ async fn main() -> Result<()> {
let tokenizers =
initialize_tokenizers(&config.service.models, &config.service.revisions, hf_token).await?;

let keystore = FileBasedKeystore::new(&config.sui.sui_keystore_path().into())
.context("Failed to initialize keystore")?;

let app_state = AppState {
state_manager_sender,
stack_retrieve_sender,
Expand Down Expand Up @@ -417,13 +419,15 @@ async fn main() -> Result<()> {
state_manager_result,
server_result,
daemon_result,
p2p_node_service_result,
confidential_compute_service_result,
_,
) = try_join!(
subscriber_handle,
state_manager_handle,
service_handle,
daemon_handle,
p2p_node_service_handle,
confidential_compute_service_handle,
ctrl_c
)?;
Expand All @@ -432,6 +436,7 @@ async fn main() -> Result<()> {
state_manager_result,
server_result,
daemon_result,
p2p_node_service_result,
confidential_compute_service_result,
)?;

Expand Down Expand Up @@ -534,6 +539,7 @@ fn handle_tasks_results(
state_manager_result: Result<()>,
server_result: Result<()>,
daemon_result: Result<()>,
p2p_node_service_result: Result<()>,
confidential_compute_service_result: Result<()>,
) -> Result<()> {
let result_handler = |result: Result<()>, message: &str| {
Expand All @@ -552,6 +558,10 @@ fn handle_tasks_results(
result_handler(state_manager_result, "State manager terminated abruptly")?;
result_handler(server_result, "Server terminated abruptly")?;
result_handler(daemon_result, "Daemon terminated abruptly")?;
result_handler(
p2p_node_service_result,
"P2P node service terminated abruptly",
)?;
result_handler(
confidential_compute_service_result,
"Confidential compute service terminated abruptly",
Expand Down
25 changes: 25 additions & 0 deletions atoma-p2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[package]
name = "atoma-p2p"
version.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
blake3 = { workspace = true }
ciborium = { workspace = true }
config = { workspace = true }
libp2p = { workspace = true, features = [ "tokio", "identify", "gossipsub", "mdns", "kad", "noise", "macros", "tcp", "yamux", "quic"] }
fastcrypto = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sui-keys = { workspace = true }
sui-sdk = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }
url = { workspace = true }

[dev-dependencies]
rand = { workspace = true }
tempfile = { workspace = true }
94 changes: 94 additions & 0 deletions atoma-p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use config::{Config, File};
use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::Duration;

/// Configuration settings for a P2P Atoma Node.
///
/// This struct holds timing-related configuration parameters that control
/// the behavior of peer-to-peer connections in an Atoma node.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct AtomaP2pNodeConfig {
/// Country of origin of the node
pub country: String,

/// The interval at which heartbeat messages are sent to peers.
///
/// Heartbeats are used to verify that connections are still alive and
/// to maintain the connection state with peers.
pub heartbeat_interval: Duration,

/// The maximum duration a connection can remain idle before it is closed.
///
/// If no messages are exchanged within this duration, the connection
/// will be terminated to free up resources.
pub idle_connection_timeout: Duration,

/// The address to listen on for incoming tcp connections.
///
/// This is the address that the node will use to listen for incoming connections.
/// It is a string in the format of "/ip4/x.x.x.x/tcp/x".
pub listen_addr: String,

/// The public URL of the node.
///
/// This is the URL that the node will use to send requests to the network.
/// It is a string in the format of "https://x.x.x.x:x".
pub public_url: String,

/// The node's small id (assigned by the Atoma smart contract)
pub node_small_id: u64,

/// The list of bootstrap nodes to connect to.
///
/// Bootstrap nodes are nodes that the node will use to bootstrap its network connection.
/// They are a list of strings in the format of "/ip4/x.x.x.x/tcp/x".
pub bootstrap_nodes: Vec<String>,
}

impl AtomaP2pNodeConfig {
/// Creates a new `AtomaP2pNodeConfig` instance from a configuration file.
///
/// This method loads configuration settings from both a file and environment variables:
/// - File: Reads the specified configuration file
/// - Environment: Reads variables prefixed with `ATOMA_P2P__`
///
/// # Arguments
///
/// * `config_file_path` - Path to the configuration file
///
/// # Returns
///
/// Returns a new `AtomaP2pNodeConfig` instance with the loaded configuration.
///
/// # Panics
///
/// This method will panic if:
/// - The configuration file cannot be read or parsed
/// - Required configuration values are missing
/// - The configuration format is invalid
///
/// # Example
///
/// ```rust,ignore
/// use atoma_p2p::config::AtomaP2pNodeConfig;
///
/// let config = AtomaP2pNodeConfig::from_file_path("config/atoma.toml");
/// ```
pub fn from_file_path<P: AsRef<Path>>(config_file_path: P) -> Self {
let builder = Config::builder()
.add_source(File::with_name(config_file_path.as_ref().to_str().unwrap()))
.add_source(
config::Environment::with_prefix("ATOMA_P2P")
.keep_prefix(true)
.separator("__"),
);
let config = builder
.build()
.expect("Failed to generate atoma-p2p configuration file");
config
.get::<Self>("atoma_p2p")
.expect("Failed to generate configuration instance")
}
}
8 changes: 8 additions & 0 deletions atoma-p2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
pub mod config;
pub mod service;
pub mod timer;
pub mod types;

pub use config::AtomaP2pNodeConfig;
pub use service::AtomaP2pNode;
pub use types::AtomaP2pEvent;
Loading

0 comments on commit 0cfa379

Please sign in to comment.