Skip to content

Commit

Permalink
chore: removed all potential issues
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Jan 28, 2025
1 parent 0dcacd2 commit 1f530d8
Show file tree
Hide file tree
Showing 14 changed files with 282 additions and 241 deletions.
12 changes: 6 additions & 6 deletions atoma-bin/atoma_daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ struct DaemonArgs {
}

#[tokio::main]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
setup_logging()?;
setup_logging();
let args = DaemonArgs::parse();
let daemon_config = AtomaDaemonConfig::from_file_path(args.config_path.clone());
let state_manager_config = AtomaStateManagerConfig::from_file_path(args.config_path.clone());
Expand Down Expand Up @@ -86,7 +87,7 @@ async fn main() -> Result<()> {

let ctrl_c = tokio::task::spawn(async move {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
result = tokio::signal::ctrl_c() => {
info!(
target = "atoma_daemon",
event = "atoma-daemon-stop",
Expand All @@ -95,10 +96,10 @@ async fn main() -> Result<()> {
shutdown_sender
.send(true)
.context("Failed to send shutdown signal")?;
Ok::<(), anyhow::Error>(())
result.map_err(anyhow::Error::from)
}
_ = shutdown_receiver.changed() => {
Ok::<(), anyhow::Error>(())
Ok(())
}
}
});
Expand All @@ -108,7 +109,7 @@ async fn main() -> Result<()> {
daemon_result
}

fn setup_logging() -> Result<()> {
fn setup_logging() {
let log_dir = Path::new(LOGS);
let file_appender = RollingFileAppender::new(Rotation::DAILY, log_dir, LOG_FILE);
let (non_blocking_appender, _guard) = non_blocking(file_appender);
Expand All @@ -133,5 +134,4 @@ fn setup_logging() -> Result<()> {
.with(console_layer)
.with(file_layer)
.init();
Ok(())
}
34 changes: 18 additions & 16 deletions atoma-bin/atoma_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct NodeConfig {
}

impl NodeConfig {
async fn load(path: &str) -> Result<Self, ValidationErrors> {
fn load(path: &str) -> Result<Self, ValidationErrors> {
let sui = Config::from_file_path(path);
let service = AtomaServiceConfig::from_file_path(path);
let state = AtomaStateManagerConfig::from_file_path(path);
Expand Down Expand Up @@ -174,13 +174,15 @@ async fn initialize_tokenizers(
}

#[tokio::main]
#[allow(clippy::too_many_lines)]
#[allow(clippy::redundant_pub_crate)]
async fn main() -> Result<()> {
let _log_guards = setup_logging(LOGS).context("Failed to setup logging")?;

dotenv().ok();

let args = Args::parse();
let config = NodeConfig::load(&args.config_path).await?;
let config = NodeConfig::load(&args.config_path)?;

info!("Starting Atoma node service");

Expand All @@ -203,13 +205,13 @@ async fn main() -> Result<()> {
config.sui.max_concurrent_requests(),
)?;
let address = wallet_ctx.active_address()?;
let address_index = args.address_index.unwrap_or(
let address_index = args.address_index.unwrap_or_else(|| {
wallet_ctx
.get_addresses()
.iter()
.position(|a| a == &address)
.unwrap(),
);
.unwrap()
});

info!(
target = "atoma-node-service",
Expand All @@ -232,14 +234,14 @@ async fn main() -> Result<()> {
shutdown_sender.clone(),
);

let (subscriber_confidential_compute_sender, _subscriber_confidential_compute_receiver) =
let (subscriber_confidential_compute_sender, subscriber_confidential_compute_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (app_state_decryption_sender, _app_state_decryption_receiver) =
let (app_state_decryption_sender, app_state_decryption_receiver) =
tokio::sync::mpsc::unbounded_channel();
let (app_state_encryption_sender, _app_state_encryption_receiver) =
let (app_state_encryption_sender, app_state_encryption_receiver) =
tokio::sync::mpsc::unbounded_channel();

for (_, node_small_id) in config.daemon.node_badges.iter() {
for (_, node_small_id) in &config.daemon.node_badges {
if let Err(e) =
register_on_proxy(&config.proxy, *node_small_id, &keystore, address_index).await
{
Expand All @@ -262,16 +264,16 @@ async fn main() -> Result<()> {
Client::new_from_config(args.config_path).await?,
));

let (compute_shared_secret_sender, _compute_shared_secret_receiver) =
let (compute_shared_secret_sender, compute_shared_secret_receiver) =
tokio::sync::mpsc::unbounded_channel();

let confidential_compute_service_handle = spawn_with_shutdown(
AtomaConfidentialCompute::start_confidential_compute_service(
client.clone(),
_subscriber_confidential_compute_receiver,
_app_state_decryption_receiver,
_app_state_encryption_receiver,
_compute_shared_secret_receiver,
subscriber_confidential_compute_receiver,
app_state_decryption_receiver,
app_state_encryption_receiver,
compute_shared_secret_receiver,
shutdown_receiver.clone(),
),
shutdown_sender.clone(),
Expand Down Expand Up @@ -320,8 +322,8 @@ async fn main() -> Result<()> {
shutdown_sender.clone(),
);

let hf_token = std::env::var(HF_TOKEN)
.context(format!("Variable {} not set in the .env file", HF_TOKEN))?;
let hf_token =
std::env::var(HF_TOKEN).context(format!("Variable {HF_TOKEN} not set in the .env file"))?;
let tokenizers =
initialize_tokenizers(&config.service.models, &config.service.revisions, hf_token).await?;

Expand Down
2 changes: 1 addition & 1 deletion atoma-service/src/components/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pub(crate) mod openapi;
pub mod openapi;
22 changes: 12 additions & 10 deletions atoma-service/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl AtomaServiceError {
/// - `"MODEL_ERROR"` for ML model errors
/// - `"AUTH_ERROR"` for authentication failures
/// - `"INTERNAL_ERROR"` for unexpected server errors
fn error_code(&self) -> &'static str {
const fn error_code(&self) -> &'static str {
match self {
Self::MissingHeader { .. } => "MISSING_HEADER",
Self::InvalidHeader { .. } => "INVALID_HEADER",
Expand Down Expand Up @@ -154,7 +154,8 @@ impl AtomaServiceError {
/// # Returns
///
/// An [`axum::http::StatusCode`] representing the appropriate HTTP response code for this error
pub fn status_code(&self) -> StatusCode {
#[must_use]
pub const fn status_code(&self) -> StatusCode {
match self {
Self::MissingHeader { .. }
| Self::InvalidHeader { .. }
Expand All @@ -173,14 +174,15 @@ impl AtomaServiceError {
/// # Returns
///
/// A `String` containing the API endpoint path where the error was encountered.
fn endpoint(&self) -> String {
#[must_use]
pub fn get_endpoint(&self, _endpoint: &str) -> String {
match self {
Self::MissingHeader { endpoint, .. } => endpoint.clone(),
Self::InvalidHeader { endpoint, .. } => endpoint.clone(),
Self::InvalidBody { endpoint, .. } => endpoint.clone(),
Self::ModelError { endpoint, .. } => endpoint.clone(),
Self::AuthError { endpoint, .. } => endpoint.clone(),
Self::InternalError { endpoint, .. } => endpoint.clone(),
Self::MissingHeader { endpoint, .. }
| Self::InvalidHeader { endpoint, .. }
| Self::InvalidBody { endpoint, .. }
| Self::ModelError { endpoint, .. }
| Self::AuthError { endpoint, .. }
| Self::InternalError { endpoint, .. } => endpoint.clone(),
}
}

Expand Down Expand Up @@ -216,7 +218,7 @@ impl IntoResponse for AtomaServiceError {
tracing::error!(
target = "atoma-service",
event = "error_occurred",
endpoint = self.endpoint(),
endpoint = self.get_endpoint(""),
error = %self.message(),
);
let error_response = ErrorResponse {
Expand Down
Loading

0 comments on commit 1f530d8

Please sign in to comment.