Skip to content

Commit

Permalink
Progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubadamw committed Oct 20, 2024
1 parent c547568 commit 7a1b6c9
Show file tree
Hide file tree
Showing 11 changed files with 728 additions and 250 deletions.
602 changes: 457 additions & 145 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 7 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"

[dependencies]
anyhow = "1"
ctrlc-async = "3"
ctrlc2 = { version = "3", features = ["termination", "tokio"] }
derive_builder = "0.20"
envy = "0.4"
futures-util = "0.3"
Expand All @@ -17,9 +17,14 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "sync"] }
tonic = { version = "0.12", features = ["tls"] }
tonic = { version = "0.12", features = ["tls", "tls-native-roots"] }
tracing = "0.1"

[build-dependencies]
tonic-build = "0.12"
tempfile = "3"

[dev-dependencies]
dotenv = "0.15"
rstest = "0.23"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
7 changes: 6 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ fn prepend_package_name_and_build(
writeln!(temp_file, "package {package_name};")?;

let target_path = temp_file.into_temp_path();
tonic_build::compile_protos(target_path)?;
tonic_build::configure()
.disable_package_emission()
.compile_protos(
&[&target_path],
&[target_path.parent().expect("must succeed")],
)?;
Ok(())
}

Expand Down
68 changes: 68 additions & 0 deletions examples/fibonacci.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use hatchet_sdk::{Client, StepBuilder, WorkflowBuilder};

fn fibonacci(n: u32) -> u32 {
(1..=n)
.fold((0, 1), |(last, current), _| (current, last + current))
.0
}

#[cfg(test)]
mod tests {
use rstest::rstest;

#[rstest]
#[case(0, 0)]
#[case(1, 1)]
#[case(2, 1)]
#[case(3, 2)]
#[case(4, 3)]
#[case(5, 5)]
#[case(6, 8)]
fn fibonacci_test(#[case] input: u32, #[case] expected: u32) {
assert_eq!(expected, super::fibonacci(input))
}
}

#[derive(serde::Deserialize)]
struct Input {
n: u32,
}

#[derive(serde::Serialize)]
struct Output {
result: u32,
}

async fn execute(Input { n }: Input) -> anyhow::Result<Output> {
Ok(Output {
result: fibonacci(n),
})
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("hatchet_sdk=debug".parse()?),
)
.init();

let client = Client::new()?;
let mut worker = client.worker("example").build();
worker.register_workflow(
WorkflowBuilder::default()
.name("fibonacci")
.step(
StepBuilder::default()
.name("compute")
.function(&execute)
.build()?,
)
.build()?,
);
worker.start().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion hatchet
Submodule hatchet updated 421 files
3 changes: 1 addition & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::worker::WorkerBuilder;
pub(crate) struct Environment {
pub(crate) token: SecretString,
pub(crate) host_port: Option<String>,
#[serde(default)]
pub(crate) listener_v2_timeout: u64,
pub(crate) listener_v2_timeout: Option<u64>,
#[serde(default)]
pub(crate) tls_strategy: crate::ClientTlStrategy,
pub(crate) tls_cert_file: Option<String>,
Expand Down
20 changes: 15 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@
pub enum Error {
#[error("failed to load configuration from the environment: {0}")]
Environment(#[from] envy::Error),
#[error("transport error: {0}")]
TonicTransport(#[from] tonic::transport::Error),
#[error("status: {0}")]
TonicStatus(#[from] tonic::Status),
#[error("worker registration request: {0}")]
CouldNotRegisterWorker(tonic::Status),
#[error("workflow registration request:: {0}")]
CouldNotPutWorkflow(tonic::Status),
#[error("dispatcher listen error: {0}")]
CouldNotListenToDispatcher(tonic::Status),
#[error("step status send error: {0}")]
CouldNotSendStepStatus(tonic::Status),
#[error("heartbeat error: {0}")]
CouldNotSendHeartbeat(tonic::Status),
#[error("dispatcher connection error: {0}")]
CouldNotConnectToDispatcher(tonic::transport::Error),
#[error("workflow service connection error: {0:?}")]
CouldNotConnectToWorkflowService(tonic::transport::Error),
#[error("could not read file under `{1}`: {0}")]
CouldNotReadFile(std::io::Error, String),
#[error("environment variables {0} and {1} cannot be set simultaneously")]
CantSetBothEnvironmentVariables(&'static str, &'static str),
#[error("could not subscribe to actions after {0} retries")]
CouldNotSubscribeToActions(usize),
#[error("could not decode the provided token to retrieve the host/port pair")]
#[error("could not decode the provided token to retrieve the host/port pair: {0}")]
CouldNotDecodeToken(jsonwebtoken::errors::Error),
#[error("could not decode action payload: {0}")]
CouldNotDecodeActionPayload(serde_json::Error),
Expand Down
3 changes: 2 additions & 1 deletion src/worker/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ where
heartbeat_at: Some(std::time::SystemTime::now().into()),
worker_id: worker_id.clone(),
})
.await?;
.await
.map_err(crate::Error::CouldNotSendHeartbeat)?;

tokio::select! {
_ = interval.tick() => {
Expand Down
105 changes: 73 additions & 32 deletions src/worker/listener.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use futures_util::FutureExt;
use tokio::{task::LocalSet, task_local};
use tonic::IntoRequest;
use tracing::{error, warn};
use tracing::{debug, error, info, warn};

use crate::{
worker::{grpc::ActionType, DEFAULT_ACTION_TIMEOUT},
Expand Down Expand Up @@ -37,10 +39,16 @@ fn step_action_event(
}
}

#[derive(serde::Deserialize)]
struct ActionInput<T> {
input: T,
}

async fn handle_start_step_run<F>(
dispatcher: &mut DispatcherClient<
tonic::service::interceptor::InterceptedService<tonic::transport::Channel, F>,
>,
local_set: &tokio::task::LocalSet,
namespace: &str,
worker_id: &str,
workflows: &[Workflow],
Expand All @@ -59,45 +67,53 @@ where
return Ok(());
};

debug!("Received a new action: {action:?}.");

dispatcher
.send_step_action_event(step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeStarted,
Default::default(),
))
.await?
.await
.map_err(crate::Error::CouldNotSendStepStatus)?
.into_inner();

let input = serde_json::from_str(&action.action_payload)
let input: ActionInput<serde_json::Value> = serde_json::from_str(&action.action_payload)
.map_err(crate::Error::CouldNotDecodeActionPayload)?;

// FIXME: Obviously, run this asynchronously rather than blocking the main listening loop.
let action_event =
match tokio::task::spawn_local(async move { action_callable(input).await }).await {
Ok(Ok(output_value)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeCompleted,
serde_json::to_string(&output_value).expect("must succeed"),
),
Ok(Err(error)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
error.to_string(),
),
Err(join_error) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
join_error.to_string(),
),
};
let action_event = match local_set
.run_until(async move {
tokio::task::spawn_local(async move { action_callable(input.input).await }).await
})
.await
{
Ok(Ok(output_value)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeCompleted,
serde_json::to_string(&output_value).expect("must succeed"),
),
Ok(Err(error)) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
error.to_string(),
),
Err(join_error) => step_action_event(
worker_id,
&action,
StepActionEventType::StepEventTypeFailed,
join_error.to_string(),
),
};

dispatcher
.send_step_action_event(action_event)
.await?
.await
.map_err(crate::Error::CouldNotSendStepStatus)?
.into_inner();

Ok(())
Expand All @@ -110,7 +126,7 @@ pub(crate) async fn run<F>(
namespace: &str,
worker_id: &str,
workflows: Vec<Workflow>,
listener_v2_timeout: u64,
listener_v2_timeout: Option<u64>,
mut interrupt_receiver: tokio::sync::mpsc::Receiver<()>,
_heartbeat_interrupt_sender: tokio::sync::mpsc::Sender<()>,
) -> crate::Result<()>
Expand All @@ -125,6 +141,8 @@ where
let connection_attempt = tokio::time::Instant::now();

'main_loop: loop {
info!("Listening…");

if connection_attempt.elapsed() > DEFAULT_ACTION_LISTENER_RETRY_INTERVAL {
retries = 0;
}
Expand All @@ -134,25 +152,46 @@ where
));
}

let mut stream = match listen_strategy {
let response = match listen_strategy {
ListenStrategy::V1 => {
info!("Using strategy v1");

let mut request = WorkerListenRequest {
worker_id: worker_id.to_owned(),
}
.into_request();
request.set_timeout(DEFAULT_ACTION_TIMEOUT);
dispatcher.listen(request).await?.into_inner()
Box::new(dispatcher.listen(request)).boxed()
}
ListenStrategy::V2 => {
info!("Using strategy v2");

let mut request = WorkerListenRequest {
worker_id: worker_id.to_owned(),
}
.into_request();
request.set_timeout(std::time::Duration::from_millis(listener_v2_timeout));
dispatcher.listen_v2(request).await?.into_inner()
if let Some(listener_v2_timeout) = listener_v2_timeout {
request.set_timeout(std::time::Duration::from_millis(listener_v2_timeout));
}
dispatcher.listen_v2(request).boxed()
}
};

let mut stream = tokio::select! {
response = response => {
response
.map_err(crate::Error::CouldNotListenToDispatcher)?
.into_inner()
}
result = interrupt_receiver.recv() => {
assert!(result.is_some());
warn!("Interrupt received.");
break 'main_loop;
}
};

let local_set = LocalSet::new();

loop {
tokio::select! {
element = stream.next() => {
Expand Down Expand Up @@ -190,7 +229,7 @@ where

match action_type {
ActionType::StartStepRun => {
handle_start_step_run(&mut dispatcher, namespace, worker_id, &workflows, action).await?;
handle_start_step_run(&mut dispatcher, &local_set, namespace, worker_id, &workflows, action).await?;
}
ActionType::CancelStepRun => {
todo!()
Expand All @@ -200,7 +239,9 @@ where
}
}
}
_ = interrupt_receiver.recv() => {
result = interrupt_receiver.recv() => {
assert!(result.is_some());
warn!("Interrupt received.");
break 'main_loop;
}
}
Expand Down
Loading

0 comments on commit 7a1b6c9

Please sign in to comment.