Skip to content

Commit

Permalink
jobs store in DHT
Browse files Browse the repository at this point in the history
  • Loading branch information
Okm165 committed Aug 11, 2024
1 parent c317801 commit 28e5fa6
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 97 deletions.
13 changes: 3 additions & 10 deletions crates/common/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::hash;
use cairo_vm::vm::runners::cairo_pie::CairoPie;
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use serde::{Deserialize, Serialize};
use starknet::signers::{SigningKey, VerifyingKey};
use starknet_crypto::{poseidon_hash_many, FieldElement, Signature};
Expand Down Expand Up @@ -101,17 +101,10 @@ impl Display for Job {
}
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobBid {
pub identity: PeerId,
pub job_hash: u64,
pub price: u64,
}

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct JobDelegation {
pub identity: PeerId,
pub job: Job,
pub job_key: kad::RecordKey,
pub price: u64,
}

Expand Down
35 changes: 32 additions & 3 deletions crates/common/src/job_trace.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use crate::hash;
use libp2p::kad;
use std::{
fmt::Display,
hash::{DefaultHasher, Hash, Hasher},
mem::ManuallyDrop,
};
use tempfile::NamedTempFile;

Expand All @@ -13,11 +15,38 @@ use tempfile::NamedTempFile;

#[derive(Debug)]
pub struct JobTrace {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub air_public_input: NamedTempFile, // Temporary file containing the public input
pub air_private_input: NamedTempFile, // Temporary file containing the private input; memory and trace files must exist for this to be valid
pub memory: NamedTempFile, // Temporary file containing memory data (required for air_private_input validity)
pub trace: NamedTempFile, // Temporary file containing trace data (required for air_private_input validity)
pub memory: ManuallyDrop<NamedTempFile>, // Temporary file containing memory data (required for air_private_input validity)
pub trace: ManuallyDrop<NamedTempFile>, // Temporary file containing trace data (required for air_private_input validity)
}

impl JobTrace {
pub fn new(
job_key: kad::RecordKey,
air_public_input: NamedTempFile,
air_private_input: NamedTempFile,
memory: NamedTempFile,
trace: NamedTempFile,
) -> Self {
Self {
job_key,
air_public_input,
air_private_input,
memory: ManuallyDrop::new(memory),
trace: ManuallyDrop::new(trace),
}
}
}

impl Drop for JobTrace {
fn drop(&mut self) {
unsafe {
ManuallyDrop::drop(&mut self.memory);
ManuallyDrop::drop(&mut self.trace);
}
}
}

impl Hash for JobTrace {
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/job_witness.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::hash;
use libp2p::kad;
use serde::{Deserialize, Serialize};
use std::{
fmt::Display,
Expand All @@ -14,7 +15,7 @@ use std::{

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct JobWitness {
pub job_hash: u64,
pub job_key: kad::RecordKey,
pub proof: Vec<u8>,
}

Expand Down
15 changes: 7 additions & 8 deletions crates/delegator/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use axum::{
};
use futures::StreamExt;
use hyper::StatusCode;
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use serde::{Deserialize, Serialize};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{io, time::Duration};
Expand All @@ -19,7 +19,7 @@ use crate::delegator::DelegatorEvent;
#[derive(Debug)]
pub struct ServerState {
pub delegate_tx: mpsc::Sender<JobData>,
pub events_rx: broadcast::Receiver<(u64, DelegatorEvent)>,
pub events_rx: broadcast::Receiver<(kad::RecordKey, DelegatorEvent)>,
}

impl Clone for ServerState {
Expand All @@ -39,22 +39,22 @@ pub struct DelegateRequest {

#[derive(Debug, Serialize)]
pub struct DelegateResponse {
job_hash: String,
job_hash: kad::RecordKey,
}

pub async fn deletage_handler(
State(state): State<ServerState>,
Json(input): Json<DelegateRequest>,
) -> Result<Json<DelegateResponse>, StatusCode> {
let job_data = JobData::new(input.pie);
let job_data_hash = hash!(&job_data);
let job_data_hash = kad::RecordKey::new(&hash!(job_data).to_be_bytes());
state.delegate_tx.send(job_data).await.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(DelegateResponse { job_hash: job_data_hash.to_string() }))
Ok(Json(DelegateResponse { job_hash: job_data_hash }))
}

#[derive(Debug, Deserialize)]
pub struct JobEventsRequest {
job_hash: String,
job_hash: kad::RecordKey,
}

#[derive(Debug, Serialize)]
Expand All @@ -70,8 +70,7 @@ pub async fn job_events_handler(
Query(input): Query<JobEventsRequest>,
) -> Sse<impl Stream<Item = Result<Event, io::Error>>> {
let stream = stream! {
let job_hash = input.job_hash.parse::<u64>()
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
let job_hash = input.job_hash;
loop {
tokio::select! {
Ok((hash, event)) = state.events_rx.recv() => {
Expand Down
18 changes: 11 additions & 7 deletions crates/delegator/src/bid_queue.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use libp2p::PeerId;
use libp2p::{kad, PeerId};
use std::{collections::BTreeMap, future::Future, pin::Pin, time::Duration};
use tokio::{sync::mpsc, time::sleep};
use zetina_common::{job::Job, process::Process};
use zetina_common::process::Process;

pub struct BidQueue {}

Expand All @@ -19,17 +19,21 @@ impl Default for BidQueue {

impl BidQueue {
pub fn run<'future>(
job: Job,
job_hash: kad::RecordKey,
) -> (
Process<'future, Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<'future, Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
mpsc::Sender<(u64, PeerId)>,
) {
let (terminate_tx, mut terminate_rx) = mpsc::channel::<()>(10);
let (bid_tx, mut bid_rx) = mpsc::channel::<(u64, PeerId)>(10);
let future: Pin<
Box<
dyn Future<Output = Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>
+ Send
dyn Future<
Output = Result<
(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>),
BidControllerError,
>,
> + Send
+ '_,
>,
> = Box::pin(async move {
Expand All @@ -53,7 +57,7 @@ impl BidQueue {
}
}
_ = sleep(duration) => {
break Ok((job, bids.take().ok_or(BidControllerError::BidsTerminated)?))
break Ok((job_hash, bids.take().ok_or(BidControllerError::BidsTerminated)?))
}
_ = terminate_rx.recv() => {
break Err(BidControllerError::TaskTerminated);
Expand Down
70 changes: 45 additions & 25 deletions crates/delegator/src/delegator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::bid_queue::{BidControllerError, BidQueue};
use futures::stream::FuturesUnordered;
use futures::Stream;
use libp2p::{gossipsub, PeerId};
use libp2p::{gossipsub, kad, PeerId};
use starknet::signers::SigningKey;
use std::collections::{BTreeMap, HashMap};
use std::hash::{DefaultHasher, Hash, Hasher};
Expand All @@ -13,10 +13,10 @@ use tokio_stream::StreamExt;
use tracing::{error, info};
use zetina_common::graceful_shutdown::shutdown_signal;
use zetina_common::hash;
use zetina_common::job::{Job, JobData, JobDelegation};
use zetina_common::job::{Job, JobBid, JobData};
use zetina_common::process::Process;
use zetina_peer::swarm::{
DelegationMessage, GossipsubMessage, MarketMessage, PeerBehaviourEvent, Topic,
DelegationMessage, GossipsubMessage, KademliaMessage, MarketMessage, PeerBehaviourEvent, Topic,
};

pub struct Delegator {
Expand All @@ -27,39 +27,39 @@ impl Delegator {
pub fn new(
mut swarm_events: Pin<Box<dyn Stream<Item = PeerBehaviourEvent> + Send>>,
gossipsub_tx: Sender<GossipsubMessage>,
kademlia_tx: Sender<KademliaMessage>,
mut delegate_rx: mpsc::Receiver<JobData>,
events_tx: broadcast::Sender<(u64, DelegatorEvent)>,
events_tx: broadcast::Sender<(kad::RecordKey, DelegatorEvent)>,
signing_key: SigningKey,
) -> Self {
Self {
handle: Some(tokio::spawn(async move {
let mut job_bid_scheduler = FuturesUnordered::<
Process<Result<(Job, BTreeMap<u64, Vec<PeerId>>), BidControllerError>>,
Process<
Result<(kad::RecordKey, BTreeMap<u64, Vec<PeerId>>), BidControllerError>,
>,
>::new();
let mut job_hash_store = HashMap::<u64, mpsc::Sender<(u64, PeerId)>>::new();
let mut job_hash_store =
HashMap::<kad::RecordKey, mpsc::Sender<(u64, PeerId)>>::new();
loop {
tokio::select! {
Some(job_data) = delegate_rx.recv() => {
let job = Job::try_from_job_data(job_data, &signing_key);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::Job(job.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hash!(job));
let (process, bid_tx) = BidQueue::run(job.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(hash!(job), bid_tx);
let job_key = kad::RecordKey::new(&hash!(job).to_be_bytes());
kademlia_tx.send(KademliaMessage::PUT(
(job_key, serde_json::to_vec(&job)?)
)).await?;
},
Some(event) = swarm_events.next() => {
match event {
PeerBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, .. }) => {
if message.topic == Topic::Market.into() {
match serde_json::from_slice::<MarketMessage>(&message.data)? {
MarketMessage::JobBid(job_bid) => {
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_hash) {
info!("Received job bid: {} price: {} from: {}", job_bid.job_hash, job_bid.price, job_bid.identity);
if let Some(bid_tx) = job_hash_store.get_mut(&job_bid.job_key) {
info!("Received job bid: {} price: {} from: {}", hex::encode(&job_bid.job_key), job_bid.price, job_bid.identity);
bid_tx.send((job_bid.price, job_bid.identity)).await?;
events_tx.send((job_bid.job_hash, DelegatorEvent::BidReceived(job_bid.identity)))?;
events_tx.send((job_bid.job_key, DelegatorEvent::BidReceived(job_bid.identity)))?;
}
}
_ => {}
Expand All @@ -68,27 +68,42 @@ impl Delegator {
if message.topic == Topic::Delegation.into() {
match serde_json::from_slice::<DelegationMessage>(&message.data)? {
DelegationMessage::Finished(job_witness) => {
info!("Received finished job: {}", job_witness.job_hash);
events_tx.send((job_witness.job_hash, DelegatorEvent::Finished(job_witness.proof)))?;
info!("Received finished job: {}", hex::encode(&job_witness.job_key));
events_tx.send((job_witness.job_key, DelegatorEvent::Finished(job_witness.proof)))?;
}
_ => {}
}
}
},
PeerBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed { result, ..}) => {
match result {
kad::QueryResult::PutRecord(Ok(kad::PutRecordOk { key })) => {
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Market.into(),
data: serde_json::to_vec(&MarketMessage::JobBidPropagation(key.to_owned()))?
}).await?;
info!("Propagated job: {} for bidding", hex::encode(&key));
let (process, bid_tx) = BidQueue::run(key.to_owned());
job_bid_scheduler.push(process);
job_hash_store.insert(key, bid_tx);
}
_ => {}
}
}
_ => {}
}
}
Some(Ok((job, bids))) = job_bid_scheduler.next() => {
job_hash_store.remove(&hash!(job));
Some(Ok((job_key, bids))) = job_bid_scheduler.next() => {
job_hash_store.remove(&job_key);
let bid = bids.first_key_value().unwrap();
let price = *bid.0;
let identity = *bid.1.first().unwrap();
info!("Job {} delegated to best bidder: {}", hash!(job), identity);
info!("Job {} delegated to best bidder: {}", hex::encode(&job_key), identity);
gossipsub_tx.send(GossipsubMessage {
topic: Topic::Delegation.into(),
data: serde_json::to_vec(&DelegationMessage::Delegate(JobDelegation{identity, job: job.to_owned(), price}))?
data: serde_json::to_vec(&DelegationMessage::Delegate(JobBid{identity, job_key: job_key.to_owned(), price}))?
}).await?;
events_tx.send((hash!(job), DelegatorEvent::Delegated(identity)))?;
events_tx.send((job_key, DelegatorEvent::Delegated(identity)))?;
}
_ = shutdown_signal() => {
break
Expand Down Expand Up @@ -125,8 +140,13 @@ pub enum Error {
#[error("mpsc_send_error GossipsubMessage")]
MpscSendErrorGossipsubMessage(#[from] mpsc::error::SendError<GossipsubMessage>),

#[error("mpsc_send_error KademliaMessage")]
MpscSendErrorKademliaMessage(#[from] mpsc::error::SendError<KademliaMessage>),

#[error("mpsc_send_error DelegatorEvent")]
BreadcastSendErrorDelegatorEvent(#[from] broadcast::error::SendError<(u64, DelegatorEvent)>),
BreadcastSendErrorDelegatorEvent(
#[from] broadcast::error::SendError<(kad::RecordKey, DelegatorEvent)>,
),

#[error("mpsc_send_error JobBid")]
MpscSendErrorJobBid(#[from] mpsc::error::SendError<(u64, PeerId)>),
Expand Down
13 changes: 7 additions & 6 deletions crates/delegator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use axum::{
};
use clap::Parser;
use delegator::{Delegator, DelegatorEvent};
use libp2p::Multiaddr;
use libp2p::{kad, Multiaddr};
use starknet::{core::types::FieldElement, signers::SigningKey};
use std::{str::FromStr, time::Duration};
use tokio::{
Expand All @@ -24,7 +24,7 @@ use tower_http::{
};
use tracing_subscriber::EnvFilter;
use zetina_common::{graceful_shutdown::shutdown_signal, job::JobData};
use zetina_peer::swarm::{GossipsubMessage, SwarmRunner};
use zetina_peer::swarm::{GossipsubMessage, KademliaMessage, SwarmRunner};

#[derive(Parser)]
struct Cli {
Expand Down Expand Up @@ -71,11 +71,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.unwrap();

let (gossipsub_tx, gossipsub_rx) = mpsc::channel::<GossipsubMessage>(100);
let (delegate_tx, delegate_rx) = mpsc::channel::<JobData>(100);
let (events_tx, events_rx) = broadcast::channel::<(u64, DelegatorEvent)>(100);
let swarm_events = swarm_runner.run(gossipsub_rx);
let (kademlia_tx, kademlia_rx) = mpsc::channel::<KademliaMessage>(100);
let swarm_events = swarm_runner.run(gossipsub_rx, kademlia_rx);

Delegator::new(swarm_events, gossipsub_tx, delegate_rx, events_tx, signing_key);
let (delegate_tx, delegate_rx) = mpsc::channel::<JobData>(100);
let (events_tx, events_rx) = broadcast::channel::<(kad::RecordKey, DelegatorEvent)>(100);
Delegator::new(swarm_events, gossipsub_tx, kademlia_tx, delegate_rx, events_tx, signing_key);

// Create a `TcpListener` using tokio.
let listener = TcpListener::bind("0.0.0.0:3000").await.unwrap();
Expand Down
Loading

0 comments on commit 28e5fa6

Please sign in to comment.