diff --git a/Cargo.toml b/Cargo.toml index adb21d55..a500d131 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,9 @@ dapr = { path = "./" } tokio-test = "0.4.2" tokio-stream = { version = "0.1" } +[workspace] +members = ["durabletask"] + [[example]] name = "actor-client" path = "examples/actors/client.rs" diff --git a/durabletask/Cargo.toml b/durabletask/Cargo.toml new file mode 100644 index 00000000..f8b2046c --- /dev/null +++ b/durabletask/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "durabletask" +version = "0.1.0" +edition = "2021" +build = "build.rs" +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +tonic = "0.11.0" +prost = "0.12.3" +bytes = "1" +prost-types = "0.12.3" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +anyhow = "1.0.81" +thiserror = "1.0.58" +uuid = { version = "1.8.0", features = ["v4", "fast-rng"] } +chrono = "0.4.35" +gethostname = "0.4.3" +opentelemetry = "0.22.0" + +[build-dependencies] +tonic-build = "0.11.0" +prost-build = "0.12.3" \ No newline at end of file diff --git a/durabletask/build.rs b/durabletask/build.rs new file mode 100644 index 00000000..23868551 --- /dev/null +++ b/durabletask/build.rs @@ -0,0 +1,14 @@ +use prost_build::Config; + +fn main() -> Result<(), std::io::Error> { + let mut config = Config::default(); + config.default_package_filename("durabletask"); + tonic_build::configure() + .build_server(false) + .compile_with_config( + config, + &["submodules/durabletask-protobuf/protos/orchestrator_service.proto"], + &["."], + )?; + Ok(()) +} diff --git a/durabletask/src/helpers.rs b/durabletask/src/helpers.rs new file mode 100644 index 00000000..0e776456 --- /dev/null +++ b/durabletask/src/helpers.rs @@ -0,0 +1,5 @@ +pub mod history; +pub mod task; +pub mod time; +pub mod tracing; +pub mod worker; diff --git a/durabletask/src/helpers/history.rs b/durabletask/src/helpers/history.rs new file mode 100644 index 00000000..49ce4a1c --- /dev/null +++ b/durabletask/src/helpers/history.rs @@ -0,0 +1,588 @@ +use std::any::type_name; +use std::fmt::Display; + +use prost_types::Timestamp; +use uuid::Uuid; + +use crate::durable_task::history_event::EventType; +use crate::durable_task::orchestrator_action::OrchestratorActionType; +use crate::durable_task::{ + CompleteOrchestrationAction, CreateSubOrchestrationAction, CreateTimerAction, EventRaisedEvent, + EventSentEvent, ExecutionCompletedEvent, ExecutionResumedEvent, ExecutionStartedEvent, + ExecutionSuspendedEvent, ExecutionTerminatedEvent, HistoryEvent, OrchestrationInstance, + OrchestrationStatus, OrchestratorAction, OrchestratorStartedEvent, ParentInstanceInfo, + ScheduleTaskAction, SendEventAction, SubOrchestrationInstanceCompletedEvent, + SubOrchestrationInstanceCreatedEvent, SubOrchestrationInstanceFailedEvent, TaskCompletedEvent, + TaskFailedEvent, TaskFailureDetails, TaskScheduledEvent, TerminateOrchestrationAction, + TimerCreatedEvent, TimerFiredEvent, TraceContext, +}; +use crate::helpers::time::now; + +impl HistoryEvent { + pub fn get_task_completed(self) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::TaskCompleted(task) => Some(task.clone()), + _ => None, + }, + } + } + + pub fn get_task_failed(self) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::TaskFailed(task) => Some(task.clone()), + _ => None, + }, + } + } + + pub fn get_sub_orchestration_instance_completed( + self, + ) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::SubOrchestrationInstanceCompleted(instance) => Some(instance.clone()), + _ => None, + }, + } + } + + pub fn get_sub_orchestration_instance_failed( + self, + ) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::SubOrchestrationInstanceFailed(instance) => Some(instance.clone()), + _ => None, + }, + } + } + + pub fn get_timer_fired(self) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::TimerFired(timer) => Some(timer.clone()), + _ => None, + }, + } + } + + pub fn get_execution_started(self) -> Option { + match &self.event_type { + None => None, + Some(e) => match e { + EventType::ExecutionStarted(execution) => Some(execution.clone()), + _ => None, + }, + } + } + + pub fn new_execution_started_event( + name: String, + instance_id: String, + input: Option, + parent: Option, + parent_trace_context: Option, + scheduled_start_time: Option, + ) -> Self { + let orchestration_instance = Some(OrchestrationInstance { + instance_id, + execution_id: Some(Uuid::new_v4().to_string()), + }); + let execution_started = ExecutionStartedEvent { + name, + parent_instance: parent, + input, + orchestration_instance, + parent_trace_context, + scheduled_start_timestamp: scheduled_start_time, + version: None, + orchestration_span_id: None, + }; + let event_type = Some(EventType::ExecutionStarted(execution_started)); + HistoryEvent { + event_id: -1, + event_type, + timestamp: Some(now()), + } + } + + pub fn new_execution_completed_event( + event_id: i32, + status: i32, + result: Option, + failure_details: Option, + ) -> Self { + let execution_completed = ExecutionCompletedEvent { + orchestration_status: status, + result, + failure_details, + }; + let event_type = Some(EventType::ExecutionCompleted(execution_completed)); + HistoryEvent { + event_id, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_execution_terminated_event(raw_reason: Option, recurse: bool) -> Self { + let execution_terminated = ExecutionTerminatedEvent { + input: raw_reason, + recurse, + }; + let event_type = Some(EventType::ExecutionTerminated(execution_terminated)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_orchestrator_started_event() -> Self { + let orchestrator_started = OrchestratorStartedEvent {}; + let event_type = Some(EventType::OrchestratorStarted(orchestrator_started)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_event_raised_event(name: String, raw_input: Option) -> Self { + let event_raised = EventRaisedEvent { + name, + input: raw_input, + }; + let event_type = Some(EventType::EventRaised(event_raised)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_task_scheduled_event( + task_id: i32, + name: String, + version: Option, + raw_input: Option, + trace_context: Option, + ) -> Self { + let task_scheduled = TaskScheduledEvent { + name, + version, + input: raw_input, + parent_trace_context: trace_context, + }; + let event_type = Some(EventType::TaskScheduled(task_scheduled)); + HistoryEvent { + event_id: task_id, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_task_completed_event(task_id: i32, result: Option) -> Self { + let task_completed = TaskCompletedEvent { + task_scheduled_id: task_id, + result, + }; + let event_type = Some(EventType::TaskCompleted(task_completed)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_task_failed_event( + task_id: i32, + failure_details: Option, + ) -> Self { + let task_failed = TaskFailedEvent { + task_scheduled_id: task_id, + failure_details, + }; + let event_type = Some(EventType::TaskFailed(task_failed)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_timer_created_event(event_id: i32, fire_at: Option) -> Self { + let time_created = TimerCreatedEvent { fire_at }; + let event_type = Some(EventType::TimerCreated(time_created)); + HistoryEvent { + event_id, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_timer_fired_event(timer_id: i32, fire_at: Option) -> Self { + let timer_fired = TimerFiredEvent { timer_id, fire_at }; + let event_type = Some(EventType::TimerFired(timer_fired)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_sub_orchestration_created_event( + event_id: i32, + name: String, + version: Option, + raw_input: Option, + instance_id: String, + parent_trace_context: Option, + ) -> Self { + let sub_orchestration_instance = SubOrchestrationInstanceCreatedEvent { + name, + version, + input: raw_input, + instance_id, + parent_trace_context, + }; + let event_type = Some(EventType::SubOrchestrationInstanceCreated( + sub_orchestration_instance, + )); + HistoryEvent { + event_id, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_send_event_event( + event_id: i32, + instance_id: String, + name: String, + raw_input: Option, + ) -> Self { + let send_event = EventSentEvent { + instance_id, + input: raw_input, + name, + }; + let event_type = Some(EventType::EventSent(send_event)); + HistoryEvent { + event_id, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_suspend_orchestration_event(reason: String) -> Self { + let input = if reason != "" { Some(reason) } else { None }; + let execution_suspended = ExecutionSuspendedEvent { input }; + let event_type = Some(EventType::ExecutionSuspended(execution_suspended)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } + + pub fn new_resume_orchestration_event(reason: String) -> Self { + let input = if reason != "" { Some(reason) } else { None }; + let execution_resumed = ExecutionResumedEvent { input }; + let event_type = Some(EventType::ExecutionResumed(execution_resumed)); + HistoryEvent { + event_id: -1, + timestamp: Some(now()), + event_type, + } + } +} + +impl ParentInstanceInfo { + pub fn new_parent_info(task_id: i32, name: String, instance_id: String) -> Self { + let orchestration_instance = Some(OrchestrationInstance { + instance_id, + execution_id: None, + }); + ParentInstanceInfo { + task_scheduled_id: task_id, + name: Some(name), + version: None, + orchestration_instance, + } + } +} + +impl OrchestratorAction { + pub fn new_schedule_task_action(task_id: i32, name: String, input: Option) -> Self { + let scheduled_task = ScheduleTaskAction { + name, + version: None, + input, + }; + let orchestrator_action_type = Some(OrchestratorActionType::ScheduleTask(scheduled_task)); + OrchestratorAction { + id: task_id, + orchestrator_action_type, + } + } + + pub fn new_create_timer_action(task_id: i32, fire_at: Timestamp) -> Self { + let create_timer = CreateTimerAction { + fire_at: Some(fire_at), + }; + let orchestrator_action_type = Some(OrchestratorActionType::CreateTimer(create_timer)); + OrchestratorAction { + id: task_id, + orchestrator_action_type, + } + } + + pub fn new_send_event_action(instance_id: String, name: String, data: Option) -> Self { + let instance = Some(OrchestrationInstance { + instance_id, + execution_id: None, + }); + let send_event = SendEventAction { + instance, + name, + data, + }; + let orchestrator_action_type = Some(OrchestratorActionType::SendEvent(send_event)); + OrchestratorAction { + id: -1, + orchestrator_action_type, + } + } + + pub fn new_create_sub_orchestration_action( + task_id: i32, + name: String, + instance_id: String, + input: Option, + ) -> Self { + let create_sub_orchestration = CreateSubOrchestrationAction { + name, + version: None, + input, + instance_id, + }; + let orchestrator_action_type = Some(OrchestratorActionType::CreateSubOrchestration( + create_sub_orchestration, + )); + OrchestratorAction { + id: task_id, + orchestrator_action_type, + } + } + + pub fn new_complete_orchestration_action( + task_id: i32, + status: i32, + result: Option, + carryover_events: Vec, + failure_details: Option, + ) -> Self { + let complete_orchestration = CompleteOrchestrationAction { + orchestration_status: status, + result, + details: None, + new_version: None, + carryover_events, + failure_details, + }; + let orchestrator_action_type = Some(OrchestratorActionType::CompleteOrchestration( + complete_orchestration, + )); + OrchestratorAction { + id: task_id, + orchestrator_action_type, + } + } + + pub fn new_terminate_orchestration_action( + task_id: i32, + instance_id: String, + recurse: bool, + reason: Option, + ) -> Self { + let terminated_orchestration = TerminateOrchestrationAction { + instance_id, + reason, + recurse, + }; + let orchestrator_action_type = Some(OrchestratorActionType::TerminateOrchestration( + terminated_orchestration, + )); + OrchestratorAction { + id: task_id, + orchestrator_action_type, + } + } +} + +impl TaskFailureDetails { + pub fn new(error: T, non_retriable: bool) -> Self + where + T: Display, + { + let error_type = type_name::().to_string(); + let error_message = error.to_string(); + TaskFailureDetails { + error_type, + error_message, + stack_trace: None, + inner_failure: None, + is_non_retriable: non_retriable, + } + } +} + +pub fn get_event_history_name(event: &HistoryEvent) -> String { + match &event.event_type { + None => "".to_string(), + Some(e) => match e { + EventType::ExecutionStarted(_) => "ExecutionStarted".to_string(), + EventType::ExecutionCompleted(_) => "ExecutionCompleted".to_string(), + EventType::ExecutionTerminated(_) => "ExecutionTerminated".to_string(), + EventType::TaskScheduled(_) => "TaskScheduled".to_string(), + EventType::TaskCompleted(_) => "TaskCompleted".to_string(), + EventType::TaskFailed(_) => "TaskFailed".to_string(), + EventType::SubOrchestrationInstanceCreated(_) => { + "SubOrchestrationInstanceCreated".to_string() + } + EventType::SubOrchestrationInstanceCompleted(_) => { + "SubOrchestrationInstanceCompleted".to_string() + } + EventType::SubOrchestrationInstanceFailed(_) => { + "SubOrchestrationInstanceFailed".to_string() + } + EventType::TimerCreated(_) => "TimerCreated".to_string(), + EventType::TimerFired(_) => "TimerFired".to_string(), + EventType::OrchestratorStarted(_) => "OrchestratorStarted".to_string(), + EventType::OrchestratorCompleted(_) => "OrchestratorCompleted".to_string(), + EventType::EventSent(_) => "EventSent".to_string(), + EventType::EventRaised(_) => "EventRaised".to_string(), + EventType::GenericEvent(_) => "GenericEvent".to_string(), + EventType::HistoryState(_) => "HistoryState".to_string(), + EventType::ContinueAsNew(_) => "ContinueAsNew".to_string(), + EventType::ExecutionSuspended(_) => "ExecutionSuspended".to_string(), + EventType::ExecutionResumed(_) => "ExecutionResumed".to_string(), + }, + } +} + +pub fn get_action_type_name(action: &OrchestratorAction) -> String { + match &action.orchestrator_action_type { + None => "".to_string(), + Some(action_type) => match action_type { + OrchestratorActionType::ScheduleTask(_) => "ScheduleTask".to_string(), + OrchestratorActionType::CreateSubOrchestration(_) => { + "CreateSubOrchestration".to_string() + } + OrchestratorActionType::CreateTimer(_) => "CreateTimer".to_string(), + OrchestratorActionType::SendEvent(_) => "SendEvent".to_string(), + OrchestratorActionType::CompleteOrchestration(_) => "CompleteOrchestration".to_string(), + OrchestratorActionType::TerminateOrchestration(_) => { + "TerminateOrchestration".to_string() + } + }, + } +} + +pub fn get_task_id(event: HistoryEvent) -> i32 { + if event.event_id >= 0 { + event.event_id + } else if let Some(task) = event.clone().get_task_completed() { + task.task_scheduled_id + } else if let Some(task) = event.clone().get_task_failed() { + task.task_scheduled_id + } else if let Some(instance) = event.clone().get_sub_orchestration_instance_completed() { + instance.task_scheduled_id + } else if let Some(instance) = event.clone().get_sub_orchestration_instance_failed() { + instance.task_scheduled_id + } else if let Some(timer) = event.clone().get_timer_fired() { + timer.timer_id + } else if let Some(execution) = event.get_execution_started() { + if let Some(parent) = execution.parent_instance { + parent.task_scheduled_id + } else { + -1 + } + } else { + -1 + } +} + +pub fn history_list_summary(list: Vec) -> String { + let list = if list.len() > 10 { + list[0..10].to_vec() + } else { + list + }; + let return_value = list + .iter() + .enumerate() + .fold(vec![], |mut list, (i, event)| { + if i > 0 { + list.push(", ".to_string()); + } + if i != 10 { + let name = get_event_history_name(event); + list.push(name); + let task_id = get_task_id(event.clone()); + if task_id > 0 { + list.push(format!("#{}", task_id)) + } + } else { + list.push("...".to_string()) + } + list + }) + .join(""); + format!("[{return_value}]") +} + +pub fn action_list_summary(actions: Vec) -> String { + let actions = if actions.len() > 10 { + actions[0..10].to_vec() + } else { + actions + }; + let return_value = actions + .iter() + .enumerate() + .fold(vec![], |mut list, (i, action)| { + if i > 0 { + list.push(", ".to_string()); + } + if i != 10 { + let name = get_action_type_name(action); + list.push(name); + list.push(format!("#{}", action.id)) + } else { + list.push("...".to_string()) + } + list + }) + .join(""); + format!("[{return_value}]") +} + +pub fn to_runtime_status_string(status: &OrchestrationStatus) -> String { + status.as_str_name().replace("ORCHESTRATION_STATUS_", "") +} + +pub fn from_runtime_string(status: T) -> Option +where + T: ToString, +{ + OrchestrationStatus::from_str_name(status.to_string().as_str()) +} diff --git a/durabletask/src/helpers/task.rs b/durabletask/src/helpers/task.rs new file mode 100644 index 00000000..94f043f8 --- /dev/null +++ b/durabletask/src/helpers/task.rs @@ -0,0 +1,85 @@ +fn get_function_name(_: F) -> &'static str { + let mut name = std::any::type_name::(); + if name.contains("<") { + name = name.split("<").next().unwrap(); + } + name.split("::").last().unwrap() +} + +#[cfg(test)] +mod test { + use crate::helpers::task::get_function_name; + + #[test] + fn test_no_params() { + fn test() { + println!("hello world") + } + + let name = get_function_name(test); + + assert_eq!(name, "test") + } + + #[test] + fn test_params() { + fn test(name: String) { + println!("hello {name}") + } + + let name = get_function_name(test); + + assert_eq!(name, "test") + } + + #[test] + fn test_async_no_params() { + async fn test() { + println!("hello world") + } + + let name = get_function_name(test); + + assert_eq!(name, "test") + } + + #[test] + fn test_async_params() { + async fn test(name: String) { + println!("hello {name}") + } + + let name = get_function_name(test); + + assert_eq!(name, "test") + } + + #[test] + fn test_generic() { + fn test(name: T) + where + T: ToString, + { + println!("hello {}", name.to_string()) + } + + let name = get_function_name(test::); + + assert_eq!(name, "test") + } + + #[test] + fn test_multi_generic() { + fn test<'a, T, A>(names: T, age: A) + where + T: Into>, + A: Into, + { + println!("hello {:#?} {}", names.into(), age.into()) + } + + let name = get_function_name(test::, i32>); + + assert_eq!(name, "test") + } +} diff --git a/durabletask/src/helpers/time.rs b/durabletask/src/helpers/time.rs new file mode 100644 index 00000000..ad0801cc --- /dev/null +++ b/durabletask/src/helpers/time.rs @@ -0,0 +1,15 @@ +use chrono::{Datelike, Timelike}; +use prost_types::Timestamp; + +pub fn now() -> Timestamp { + let now = chrono::offset::Local::now(); + Timestamp::date_time( + now.year() as i64, + now.month() as u8, + now.day() as u8, + now.hour() as u8, + now.minute() as u8, + now.second() as u8, + ) + .unwrap() +} diff --git a/durabletask/src/helpers/tracing.rs b/durabletask/src/helpers/tracing.rs new file mode 100644 index 00000000..5031c274 --- /dev/null +++ b/durabletask/src/helpers/tracing.rs @@ -0,0 +1,281 @@ +use std::borrow::BorrowMut; +use std::str::FromStr; +use std::time::SystemTime; + +use anyhow::Result; +use chrono::DateTime; +use opentelemetry::{Context, Key, KeyValue, StringValue, Value}; +use opentelemetry::global::{BoxedSpan, ObjectSafeSpan, tracer}; +use opentelemetry::trace::{ + SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, Tracer, TraceState, +}; +use opentelemetry::trace::noop::NoopSpan; + +use crate::durable_task::{TimerFiredEvent, TraceContext}; + +pub fn start_new_create_orchestration_span( + context: Context, + name: String, + version: String, + instance_id: String, +) -> BoxedSpan { + let attributes = vec![ + KeyValue { + key: Key::from_static_str("durabletask.type"), + value: Value::String(StringValue::from("orchestration")), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.name"), + value: Value::String(StringValue::from(name.clone())), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.instance_id"), + value: Value::String(StringValue::from(instance_id)), + }, + ]; + start_new_span( + context, + "create_orchestration".to_string(), + name, + version, + attributes, + SpanKind::Client, + SystemTime::from(chrono::Utc::now()), + ) +} + +pub fn start_new_run_orchestration_span( + context: Context, + name: String, + version: String, + instance_id: String, + started_time: SystemTime, +) -> BoxedSpan { + let attributes = vec![ + KeyValue { + key: Key::from_static_str("durabletask.type"), + value: Value::String(StringValue::from("orchestration")), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.name"), + value: Value::String(StringValue::from(name.clone())), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.instance_id"), + value: Value::String(StringValue::from(instance_id)), + }, + ]; + start_new_span( + context, + "orchestration".to_string(), + name, + version, + attributes, + SpanKind::Server, + started_time, + ) +} + +pub fn start_new_activity_span( + context: Context, + name: String, + version: String, + instance_id: String, + task_id: i32, +) -> BoxedSpan { + let attributes = vec![ + KeyValue { + key: Key::from_static_str("durabletask.type"), + value: Value::String(StringValue::from("activity")), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.name"), + value: Value::String(StringValue::from(name.clone())), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.instance_id"), + value: Value::String(StringValue::from(instance_id)), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.task_id"), + value: Value::I64(task_id as i64), + }, + ]; + start_new_span( + context, + "create_orchestration".to_string(), + name, + version, + attributes, + SpanKind::Client, + SystemTime::from(chrono::Utc::now()), + ) +} + +pub fn start_and_end_new_timer_span( + context: Context, + tf: TimerFiredEvent, + created_time: SystemTime, + instance_id: String, +) { + let fire_at = tf.fire_at.unwrap(); + let time = DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32) + .unwrap() + .to_rfc3339() + .to_string(); + let attributes = vec![ + KeyValue { + key: Key::from_static_str("durabletask.type"), + value: Value::String(StringValue::from("timer")), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.name"), + value: Value::String(StringValue::from(time)), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.instance_id"), + value: Value::String(StringValue::from(instance_id)), + }, + KeyValue { + key: Key::from_static_str("durabletask.task.task_id"), + value: Value::I64(tf.timer_id as i64), + }, + ]; + let mut span = start_new_span( + context, + "timer".to_string(), + "".to_string(), + "".to_string(), + attributes, + SpanKind::Internal, + created_time, + ); + ObjectSafeSpan::end(&mut span); +} + +pub fn start_new_span( + context: Context, + task_type: String, + task_name: String, + task_version: String, + attributes: Vec, + kind: SpanKind, + timestamp: SystemTime, +) -> BoxedSpan { + let span_name; + if task_version != "" { + span_name = format!("{task_type}||{task_name}||{task_version}") + } else if task_name != "" { + span_name = format!("{task_type}||{task_name}") + } else { + span_name = task_name + } + let tracer = tracer(span_name.clone()); + tracer + .span_builder(span_name) + .with_kind(kind) + .with_start_time(timestamp) + .with_attributes(attributes) + .start_with_context(&tracer, &context) +} + +pub fn set_span_context(span: Box, span_context: SpanContext) -> bool { + if span.is_recording() { + return false; + } + *span.span_context().borrow_mut() = &span_context; + true +} + +pub fn context_from_trace_context( + context: Context, + trace_context: TraceContext, +) -> Result { + let span_context = span_context_from_trace_context(trace_context)?; + Ok(Context::with_remote_span_context(&context, span_context)) +} + +pub fn span_context_from_trace_context(trace_context: TraceContext) -> Result { + let decoded_trace_id: TraceId; + let trace_id: String; + let span_id: String; + let trace_flags: String; + + let parts = trace_context.trace_parent.split("-").collect::>(); + if parts.len() == 4 { + trace_id = parts.get(1).unwrap().to_string(); + span_id = parts.get(2).unwrap().to_string(); + trace_flags = parts.get(3).unwrap().to_string(); + } else { + trace_id = trace_context.trace_parent; + span_id = trace_context.span_id; + trace_flags = "01".to_string() + } + let trace_id: u128 = trace_id.parse()?; + decoded_trace_id = TraceId::from(trace_id); + let trace_state = if let Some(state) = trace_context.trace_state { + TraceState::from_str(state.as_str())? + } else { + TraceState::default() + }; + + let span_id: u64 = span_id.parse()?; + + Ok(SpanContext::new( + decoded_trace_id, + SpanId::from(span_id), + TraceFlags::new(trace_flags.chars().next().unwrap().to_string().parse()?), + false, + trace_state, + )) +} + +pub fn trace_from_context_span(span: Option>) -> Option { + if span.is_none() { + return None; + } + let span = span.unwrap(); + if !span.span_context().is_sampled() { + return None; + } + let context = span.span_context(); + if context.is_valid() { + let trace_parent = format!( + "00-{}-{}-{}", + context.trace_id().to_string(), + context.span_id().to_string(), + context.trace_flags().to_u8().to_string() + ); + let trace_state = context.trace_state().clone().header(); + let trace_state = if trace_state == "".to_string() { + None + } else { + Some(trace_state) + }; + Some(TraceContext { + trace_parent, + span_id: context.span_id().to_string(), + trace_state, + }) + } else { + None + } +} + +pub fn change_span_id(span: Box, new_span_id: SpanId) { + let context = span.span_context().borrow_mut().clone(); + *context.span_id().borrow_mut() = new_span_id; + set_span_context(span, context); +} + +pub fn cancel_span(span: Box) { + if span.span_context().is_sampled() { + let context = span.span_context().borrow_mut().clone(); + *context.trace_flags().borrow_mut() = TraceFlags::new(0); + set_span_context(span, context); + } +} + +pub fn noop_span() -> NoopSpan { + NoopSpan::DEFAULT +} diff --git a/durabletask/src/helpers/worker.rs b/durabletask/src/helpers/worker.rs new file mode 100644 index 00000000..04c0b9bf --- /dev/null +++ b/durabletask/src/helpers/worker.rs @@ -0,0 +1,10 @@ +use gethostname::gethostname; + +pub fn get_default_worker_name() -> String { + let hostname = gethostname(); + // Had to make two line since I got a freed value error + let hostname = hostname.to_str().unwrap_or("unknown"); + let pid = std::process::id(); + let uuid = uuid::Uuid::new_v4().to_string(); + format!("{hostname},{pid},{uuid}") +} diff --git a/durabletask/src/lib.rs b/durabletask/src/lib.rs new file mode 100644 index 00000000..78575fc7 --- /dev/null +++ b/durabletask/src/lib.rs @@ -0,0 +1,19 @@ +pub mod helpers; + +pub mod durable_task { + tonic::include_proto!("durabletask"); // The string specified here must match the proto package name +} +pub fn add(left: usize, right: usize) -> usize { + left + right +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_works() { + let result = add(2, 2); + assert_eq!(result, 4); + } +}