Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added helpers for durabletask #156

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Added helpers for durable task
Signed-off-by: Zachary Edgell <zacharyedgell@gmail.com>
zedgell committed Mar 23, 2024
commit ee7d747cdfd804f4e01f525a52f9bd66993da812
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -37,6 +37,9 @@ dapr = { path = "./" }
tokio-test = "0.4.2"
tokio-stream = { version = "0.1" }

[workspace]
members = ["durabletask"]

[[example]]
name = "actor-client"
path = "examples/actors/client.rs"
24 changes: 24 additions & 0 deletions durabletask/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "durabletask"
version = "0.1.0"
edition = "2021"
build = "build.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tonic = "0.11.0"
prost = "0.12.3"
bytes = "1"
prost-types = "0.12.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
anyhow = "1.0.81"
thiserror = "1.0.58"
uuid = { version = "1.8.0", features = ["v4", "fast-rng"] }
chrono = "0.4.35"
gethostname = "0.4.3"
opentelemetry = "0.22.0"

[build-dependencies]
tonic-build = "0.11.0"
prost-build = "0.12.3"
14 changes: 14 additions & 0 deletions durabletask/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use prost_build::Config;

fn main() -> Result<(), std::io::Error> {
let mut config = Config::default();
config.default_package_filename("durabletask");
tonic_build::configure()
.build_server(false)
.compile_with_config(
config,
&["submodules/durabletask-protobuf/protos/orchestrator_service.proto"],
&["."],
)?;
Ok(())
}
5 changes: 5 additions & 0 deletions durabletask/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod history;
pub mod task;
pub mod time;
pub mod tracing;
pub mod worker;
588 changes: 588 additions & 0 deletions durabletask/src/helpers/history.rs

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions durabletask/src/helpers/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
fn get_function_name<F>(_: F) -> &'static str {
let mut name = std::any::type_name::<F>();
if name.contains("<") {
name = name.split("<").next().unwrap();
}
name.split("::").last().unwrap()
}

#[cfg(test)]
mod test {
use crate::helpers::task::get_function_name;

#[test]
fn test_no_params() {
fn test() {
println!("hello world")
}

let name = get_function_name(test);

assert_eq!(name, "test")
}

#[test]
fn test_params() {
fn test(name: String) {
println!("hello {name}")
}

let name = get_function_name(test);

assert_eq!(name, "test")
}

#[test]
fn test_async_no_params() {
async fn test() {
println!("hello world")
}

let name = get_function_name(test);

assert_eq!(name, "test")
}

#[test]
fn test_async_params() {
async fn test(name: String) {
println!("hello {name}")
}

let name = get_function_name(test);

assert_eq!(name, "test")
}

#[test]
fn test_generic() {
fn test<T>(name: T)
where
T: ToString,
{
println!("hello {}", name.to_string())
}

let name = get_function_name(test::<String>);

assert_eq!(name, "test")
}

#[test]
fn test_multi_generic() {
fn test<'a, T, A>(names: T, age: A)
where
T: Into<Vec<&'a str>>,
A: Into<i32>,
{
println!("hello {:#?} {}", names.into(), age.into())
}

let name = get_function_name(test::<Vec<&str>, i32>);

assert_eq!(name, "test")
}
}
15 changes: 15 additions & 0 deletions durabletask/src/helpers/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use chrono::{Datelike, Timelike};
use prost_types::Timestamp;

pub fn now() -> Timestamp {
let now = chrono::offset::Local::now();
Timestamp::date_time(
now.year() as i64,
now.month() as u8,
now.day() as u8,
now.hour() as u8,
now.minute() as u8,
now.second() as u8,
)
.unwrap()
}
281 changes: 281 additions & 0 deletions durabletask/src/helpers/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
use std::borrow::BorrowMut;
use std::str::FromStr;
use std::time::SystemTime;

use anyhow::Result;
use chrono::DateTime;
use opentelemetry::{Context, Key, KeyValue, StringValue, Value};
use opentelemetry::global::{BoxedSpan, ObjectSafeSpan, tracer};
use opentelemetry::trace::{
SpanContext, SpanId, SpanKind, TraceContextExt, TraceFlags, TraceId, Tracer, TraceState,
};
use opentelemetry::trace::noop::NoopSpan;

use crate::durable_task::{TimerFiredEvent, TraceContext};

pub fn start_new_create_orchestration_span(
context: Context,
name: String,
version: String,
instance_id: String,
) -> BoxedSpan {
let attributes = vec![
KeyValue {
key: Key::from_static_str("durabletask.type"),
value: Value::String(StringValue::from("orchestration")),
},
KeyValue {
key: Key::from_static_str("durabletask.task.name"),
value: Value::String(StringValue::from(name.clone())),
},
KeyValue {
key: Key::from_static_str("durabletask.task.instance_id"),
value: Value::String(StringValue::from(instance_id)),
},
];
start_new_span(
context,
"create_orchestration".to_string(),
name,
version,
attributes,
SpanKind::Client,
SystemTime::from(chrono::Utc::now()),
)
}

pub fn start_new_run_orchestration_span(
context: Context,
name: String,
version: String,
instance_id: String,
started_time: SystemTime,
) -> BoxedSpan {
let attributes = vec![
KeyValue {
key: Key::from_static_str("durabletask.type"),
value: Value::String(StringValue::from("orchestration")),
},
KeyValue {
key: Key::from_static_str("durabletask.task.name"),
value: Value::String(StringValue::from(name.clone())),
},
KeyValue {
key: Key::from_static_str("durabletask.task.instance_id"),
value: Value::String(StringValue::from(instance_id)),
},
];
start_new_span(
context,
"orchestration".to_string(),
name,
version,
attributes,
SpanKind::Server,
started_time,
)
}

pub fn start_new_activity_span(
context: Context,
name: String,
version: String,
instance_id: String,
task_id: i32,
) -> BoxedSpan {
let attributes = vec![
KeyValue {
key: Key::from_static_str("durabletask.type"),
value: Value::String(StringValue::from("activity")),
},
KeyValue {
key: Key::from_static_str("durabletask.task.name"),
value: Value::String(StringValue::from(name.clone())),
},
KeyValue {
key: Key::from_static_str("durabletask.task.instance_id"),
value: Value::String(StringValue::from(instance_id)),
},
KeyValue {
key: Key::from_static_str("durabletask.task.task_id"),
value: Value::I64(task_id as i64),
},
];
start_new_span(
context,
"create_orchestration".to_string(),
name,
version,
attributes,
SpanKind::Client,
SystemTime::from(chrono::Utc::now()),
)
}

pub fn start_and_end_new_timer_span(
context: Context,
tf: TimerFiredEvent,
created_time: SystemTime,
instance_id: String,
) {
let fire_at = tf.fire_at.unwrap();
let time = DateTime::from_timestamp(fire_at.seconds, fire_at.nanos as u32)
.unwrap()
.to_rfc3339()
.to_string();
let attributes = vec![
KeyValue {
key: Key::from_static_str("durabletask.type"),
value: Value::String(StringValue::from("timer")),
},
KeyValue {
key: Key::from_static_str("durabletask.task.name"),
value: Value::String(StringValue::from(time)),
},
KeyValue {
key: Key::from_static_str("durabletask.task.instance_id"),
value: Value::String(StringValue::from(instance_id)),
},
KeyValue {
key: Key::from_static_str("durabletask.task.task_id"),
value: Value::I64(tf.timer_id as i64),
},
];
let mut span = start_new_span(
context,
"timer".to_string(),
"".to_string(),
"".to_string(),
attributes,
SpanKind::Internal,
created_time,
);
ObjectSafeSpan::end(&mut span);
}

pub fn start_new_span(
context: Context,
task_type: String,
task_name: String,
task_version: String,
attributes: Vec<KeyValue>,
kind: SpanKind,
timestamp: SystemTime,
) -> BoxedSpan {
let span_name;
if task_version != "" {
span_name = format!("{task_type}||{task_name}||{task_version}")
} else if task_name != "" {
span_name = format!("{task_type}||{task_name}")
} else {
span_name = task_name
}
let tracer = tracer(span_name.clone());
tracer
.span_builder(span_name)
.with_kind(kind)
.with_start_time(timestamp)
.with_attributes(attributes)
.start_with_context(&tracer, &context)
}

pub fn set_span_context(span: Box<dyn ObjectSafeSpan>, span_context: SpanContext) -> bool {
if span.is_recording() {
return false;
}
*span.span_context().borrow_mut() = &span_context;
true
}

pub fn context_from_trace_context(
context: Context,
trace_context: TraceContext,
) -> Result<Context> {
let span_context = span_context_from_trace_context(trace_context)?;
Ok(Context::with_remote_span_context(&context, span_context))
}

pub fn span_context_from_trace_context(trace_context: TraceContext) -> Result<SpanContext> {
let decoded_trace_id: TraceId;
let trace_id: String;
let span_id: String;
let trace_flags: String;

let parts = trace_context.trace_parent.split("-").collect::<Vec<&str>>();
if parts.len() == 4 {
trace_id = parts.get(1).unwrap().to_string();
span_id = parts.get(2).unwrap().to_string();
trace_flags = parts.get(3).unwrap().to_string();
} else {
trace_id = trace_context.trace_parent;
span_id = trace_context.span_id;
trace_flags = "01".to_string()
}
let trace_id: u128 = trace_id.parse()?;
decoded_trace_id = TraceId::from(trace_id);
let trace_state = if let Some(state) = trace_context.trace_state {
TraceState::from_str(state.as_str())?
} else {
TraceState::default()
};

let span_id: u64 = span_id.parse()?;

Ok(SpanContext::new(
decoded_trace_id,
SpanId::from(span_id),
TraceFlags::new(trace_flags.chars().next().unwrap().to_string().parse()?),
false,
trace_state,
))
}

pub fn trace_from_context_span(span: Option<Box<dyn ObjectSafeSpan>>) -> Option<TraceContext> {
if span.is_none() {
return None;
}
let span = span.unwrap();
if !span.span_context().is_sampled() {
return None;
}
let context = span.span_context();
if context.is_valid() {
let trace_parent = format!(
"00-{}-{}-{}",
context.trace_id().to_string(),
context.span_id().to_string(),
context.trace_flags().to_u8().to_string()
);
let trace_state = context.trace_state().clone().header();
let trace_state = if trace_state == "".to_string() {
None
} else {
Some(trace_state)
};
Some(TraceContext {
trace_parent,
span_id: context.span_id().to_string(),
trace_state,
})
} else {
None
}
}

pub fn change_span_id(span: Box<dyn ObjectSafeSpan>, new_span_id: SpanId) {
let context = span.span_context().borrow_mut().clone();
*context.span_id().borrow_mut() = new_span_id;
set_span_context(span, context);
}

pub fn cancel_span(span: Box<dyn ObjectSafeSpan>) {
if span.span_context().is_sampled() {
let context = span.span_context().borrow_mut().clone();
*context.trace_flags().borrow_mut() = TraceFlags::new(0);
set_span_context(span, context);
}
}

pub fn noop_span() -> NoopSpan {
NoopSpan::DEFAULT
}
10 changes: 10 additions & 0 deletions durabletask/src/helpers/worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use gethostname::gethostname;

pub fn get_default_worker_name() -> String {
let hostname = gethostname();
// Had to make two line since I got a freed value error
let hostname = hostname.to_str().unwrap_or("unknown");
let pid = std::process::id();
let uuid = uuid::Uuid::new_v4().to_string();
format!("{hostname},{pid},{uuid}")
}
19 changes: 19 additions & 0 deletions durabletask/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
pub mod helpers;

pub mod durable_task {
tonic::include_proto!("durabletask"); // The string specified here must match the proto package name
}
pub fn add(left: usize, right: usize) -> usize {
left + right
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
}