Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add response decryption to existing endpoints #46

Merged
merged 10 commits into from
Dec 9, 2024
137 changes: 68 additions & 69 deletions Cargo.lock

Large diffs are not rendered by default.

53 changes: 47 additions & 6 deletions atoma-proxy/src/server/handlers/chat_completions.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::time::{Duration, Instant};

use crate::server::middleware::RequestMetadataExtension;
use crate::server::{http_server::ProxyState, streamer::Streamer};
use crate::server::{
handlers::{extract_node_encryption_metadata, handle_confidential_compute_decryption_response},
http_server::ProxyState,
middleware::{NodeEncryptionMetadata, RequestMetadataExtension},
streamer::Streamer,
};
use atoma_state::types::AtomaAtomaStateManagerEvent;
use atoma_utils::constants;
use axum::body::Body;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response, Sse};
Expand All @@ -11,6 +16,7 @@ use axum::{extract::State, http::HeaderMap, Json};
use serde_json::Value;
use tracing::{error, instrument};
use utoipa::OpenApi;
use x25519_dalek::PublicKey;

use super::request_model::RequestModel;

Expand Down Expand Up @@ -155,7 +161,6 @@ impl RequestModel for RequestModelChatCompletions {
skip_all,
fields(
path = metadata.endpoint,
payload = ?payload,
)
)]
pub async fn chat_completions_handler(
Expand All @@ -179,6 +184,8 @@ pub async fn chat_completions_handler(
metadata.num_compute_units as i64,
metadata.selected_stack_small_id,
metadata.endpoint,
metadata.salt,
metadata.node_x25519_public_key,
)
.await
} else {
Expand All @@ -191,6 +198,8 @@ pub async fn chat_completions_handler(
metadata.num_compute_units as i64,
metadata.selected_stack_small_id,
metadata.endpoint,
metadata.salt,
metadata.node_x25519_public_key,
)
.await
}
Expand Down Expand Up @@ -257,6 +266,8 @@ async fn handle_non_streaming_response(
estimated_total_tokens: i64,
selected_stack_small_id: i64,
endpoint: String,
salt: Option<[u8; constants::SALT_SIZE]>,
node_x25519_public_key: Option<PublicKey>,
) -> Result<Response<Body>, StatusCode> {
let client = reqwest::Client::new();
let time = Instant::now();
Expand All @@ -268,17 +279,40 @@ async fn handle_non_streaming_response(
.send()
.await
.map_err(|err| {
error!("Failed to send OpenAI API request: {:?}", err);
error!(
level = "error",
node_address = node_address,
endpoint = endpoint,
error = ?err,
"Failed to send OpenAI API request"
);
StatusCode::INTERNAL_SERVER_ERROR
})?
.json::<Value>()
.await
.map_err(|err| {
error!("Failed to parse OpenAI API response: {:?}", err);
error!(
level = "error",
node_address = node_address,
endpoint = endpoint,
error = ?err,
"Failed to parse OpenAI API response"
);
StatusCode::INTERNAL_SERVER_ERROR
})
.map(Json)?;

let response = if let (Some(node_x25519_public_key), Some(salt)) =
(node_x25519_public_key, salt)
{
let shared_secret = state.compute_shared_secret(&node_x25519_public_key);
let NodeEncryptionMetadata { ciphertext, nonce } =
extract_node_encryption_metadata(response.0)?;
handle_confidential_compute_decryption_response(shared_secret, &ciphertext, &salt, &nonce)?
} else {
response.0
};

// Extract the response total number of tokens
let total_tokens = response
.get("usage")
Expand Down Expand Up @@ -330,7 +364,7 @@ async fn handle_non_streaming_response(
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(response.into_response())
Ok(Json(response).into_response())
}

/// Handles streaming chat completion requests by establishing a Server-Sent Events (SSE) connection.
Expand Down Expand Up @@ -388,6 +422,8 @@ async fn handle_streaming_response(
estimated_total_tokens: i64,
selected_stack_small_id: i64,
endpoint: String,
salt: Option<[u8; constants::SALT_SIZE]>,
node_x25519_public_key: Option<PublicKey>,
) -> Result<Response<Body>, StatusCode> {
// NOTE: If streaming is requested, add the include_usage option to the payload
// so that the atoma node state manager can be updated with the total number of tokens
Expand All @@ -413,6 +449,9 @@ async fn handle_streaming_response(

let stream = response.bytes_stream();

let shared_secret = node_x25519_public_key
.map(|node_x25519_public_key| state.compute_shared_secret(&node_x25519_public_key));

// Create the SSE stream
let stream = Sse::new(Streamer::new(
stream,
Expand All @@ -421,6 +460,8 @@ async fn handle_streaming_response(
estimated_total_tokens,
start,
node_id,
shared_secret,
salt,
))
.keep_alive(
axum::response::sse::KeepAlive::new()
Expand Down
25 changes: 23 additions & 2 deletions atoma-proxy/src/server/handlers/embeddings.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Instant;

use atoma_state::types::AtomaAtomaStateManagerEvent;
use atoma_utils::constants;
use axum::{
body::Body,
extract::State,
Expand All @@ -11,8 +12,13 @@ use axum::{
use serde_json::Value;
use tracing::{error, instrument};
use utoipa::OpenApi;
use x25519_dalek::PublicKey;

use crate::server::{http_server::ProxyState, middleware::RequestMetadataExtension};
use crate::server::{
handlers::{extract_node_encryption_metadata, handle_confidential_compute_decryption_response},
http_server::ProxyState,
middleware::{NodeEncryptionMetadata, RequestMetadataExtension},
};

use super::request_model::RequestModel;

Expand Down Expand Up @@ -148,6 +154,8 @@ pub async fn embeddings_handler(
payload,
num_input_compute_units as i64,
metadata.endpoint,
metadata.salt,
metadata.node_x25519_public_key,
)
.await
}
Expand Down Expand Up @@ -196,6 +204,8 @@ async fn handle_embeddings_response(
payload: Value,
num_input_compute_units: i64,
endpoint: String,
salt: Option<[u8; constants::SALT_SIZE]>,
node_x25519_public_key: Option<PublicKey>,
) -> Result<Response<Body>, StatusCode> {
let client = reqwest::Client::new();
let time = Instant::now();
Expand All @@ -218,6 +228,17 @@ async fn handle_embeddings_response(
})
.map(Json)?;

let response = if let (Some(node_x25519_public_key), Some(salt)) =
(node_x25519_public_key, salt)
{
let shared_secret = state.compute_shared_secret(&node_x25519_public_key);
let NodeEncryptionMetadata { ciphertext, nonce } =
extract_node_encryption_metadata(response.0)?;
handle_confidential_compute_decryption_response(shared_secret, &ciphertext, &salt, &nonce)?
} else {
response.0
};

// Update the node throughput performance
state
.state_manager_sender
Expand All @@ -234,5 +255,5 @@ async fn handle_embeddings_response(
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(response.into_response())
Ok(Json(response).into_response())
}
26 changes: 23 additions & 3 deletions atoma-proxy/src/server/handlers/image_generations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Instant;

use atoma_state::types::AtomaAtomaStateManagerEvent;
use atoma_utils::constants;
use axum::body::Body;
use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
Expand All @@ -9,9 +10,13 @@ use axum::{extract::State, http::HeaderMap, Json};
use serde_json::Value;
use tracing::{error, instrument};
use utoipa::OpenApi;
use x25519_dalek::PublicKey;

use crate::server::http_server::ProxyState;
use crate::server::middleware::RequestMetadataExtension;
use crate::server::{
handlers::{extract_node_encryption_metadata, handle_confidential_compute_decryption_response},
http_server::ProxyState,
middleware::{NodeEncryptionMetadata, RequestMetadataExtension},
};

use super::request_model::RequestModel;

Expand Down Expand Up @@ -151,6 +156,8 @@ pub async fn image_generations_handler(
payload,
metadata.num_compute_units as i64,
metadata.endpoint,
metadata.salt,
metadata.node_x25519_public_key,
)
.await
}
Expand Down Expand Up @@ -202,6 +209,8 @@ async fn handle_image_generation_response(
payload: Value,
total_tokens: i64,
endpoint: String,
salt: Option<[u8; constants::SALT_SIZE]>,
node_x25519_public_key: Option<PublicKey>,
) -> Result<Response<Body>, StatusCode> {
let client = reqwest::Client::new();
let time = Instant::now();
Expand All @@ -224,6 +233,17 @@ async fn handle_image_generation_response(
})
.map(Json)?;

let response = if let (Some(node_x25519_public_key), Some(salt)) =
(node_x25519_public_key, salt)
{
let shared_secret = state.compute_shared_secret(&node_x25519_public_key);
let NodeEncryptionMetadata { ciphertext, nonce } =
extract_node_encryption_metadata(response.0)?;
handle_confidential_compute_decryption_response(shared_secret, &ciphertext, &salt, &nonce)?
} else {
response.0
};

// Update the node throughput performance
state
.state_manager_sender
Expand All @@ -240,5 +260,5 @@ async fn handle_image_generation_response(
StatusCode::INTERNAL_SERVER_ERROR
})?;

Ok(response.into_response())
Ok(Json(response).into_response())
}
Loading