diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a2fd58..1b560ba 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -15,12 +15,12 @@ on: env: CARGO_TERM_COLOR: always CARGO_TOKEN: ${{ secrets.CRATES_IO_TOKEN }} - PROTOC_VERSION: '3.x' - RUST_TOOLCHAIN: '1.70.0' + PROTOC_VERSION: 3.x + RUST_TOOLCHAIN: 1.76.0 jobs: lint: - name: Lint + name: Lint runs-on: ubuntu-latest steps: @@ -42,7 +42,7 @@ jobs: build: - name: Build + name: Build runs-on: ubuntu-latest steps: @@ -63,7 +63,7 @@ jobs: run: cargo build --examples - name: Run Tests run: cargo test --all-targets - + publish: name: Publish runs-on: ubuntu-latest @@ -86,5 +86,3 @@ jobs: run: cargo publish --manifest-path macros/Cargo.toml --token ${{ env.CARGO_TOKEN }} - name: cargo publish run: cargo publish --token ${{ env.CARGO_TOKEN }} - - diff --git a/Cargo.toml b/Cargo.toml index 855a72f..fd17c5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,28 +9,27 @@ description = "Rust SDK for dapr" readme = "README.md" keywords = ["microservices", "dapr"] - [dependencies] dapr-macros = {version="0.14.0", path = "macros" } futures = "0.3" -tonic = "0.8" -prost = "0.11" +tonic = "0.11.0" +prost = "0.12.3" bytes = "1" -prost-types = "0.11" +prost-types = "0.12.3" async-trait = "0.1" -env_logger = "0.10" +env_logger = "0.11.2" log = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" -axum = {version = "0.6.19", features = ["default", "headers"] } +axum = "0.7.4" tokio = { version = "1.29", features = ["sync"] } chrono = "0.4.24" [build-dependencies] -tonic-build = "0.8" +tonic-build = "0.11.0" [dev-dependencies] -axum-test = "12.1.0" +axum-test = "14.3.0" once_cell = "1.18.0" tokio = { version = "1", features = ["full"] } uuid = { version = "1.4.0", features = ["v4"] } diff --git a/examples/actors/README.md b/examples/actors/README.md index c690f4d..8b5c5a6 100644 --- a/examples/actors/README.md +++ b/examples/actors/README.md @@ -1,6 +1,6 @@ # Actor Example -This example demonstrates the Dapr actor framework. To author an actor, +This example demonstrates the Dapr actor framework. To author an actor, 1. Create a struc decorated with the `#[dapr::actor]` macro to house your custom actor methods that map to [Axum handlers](https://docs.rs/axum/latest/axum/handler/index.html), use [Axum extractors](https://docs.rs/axum/latest/axum/extract/index.html) to access the incoming request and return an [`impl IntoResponse`](https://docs.rs/axum/latest/axum/response/trait.IntoResponse.html). Use the `DaprJson` extractor to deserialize the request from Json coming from a Dapr sidecar. @@ -10,24 +10,24 @@ Use the `DaprJson` extractor to deserialize the request from Json coming from a id: String, client: ActorContextClient } - + #[derive(Serialize, Deserialize)] pub struct MyRequest { pub name: String, } - + #[derive(Serialize, Deserialize)] pub struct MyResponse { pub available: bool, - } + } impl MyActor { - fn do_stuff(&self, DaprJson(data): DaprJson) -> Json { - println!("doing stuff with {}", data.name); - Json(MyResponse { - available: true + fn do_stuff(&self, DaprJson(data): DaprJson) -> Json { + println!("doing stuff with {}", data.name); + Json(MyResponse { + available: true }) - } + } } ``` @@ -40,14 +40,14 @@ Use the `DaprJson` extractor to deserialize the request from Json coming from a 1. Implement the `Actor` trait. This trait exposes the following methods: - `on_activate` - Called when an actor is activated on a host - `on_deactivate` - Called when an actor is deactivated on a host - - `on_reminder` - Called when a reminder is recieved from the Dapr sidecar - - `on_timer` - Called when a timer is recieved from the Dapr sidecar + - `on_reminder` - Called when a reminder is received from the Dapr sidecar + - `on_timer` - Called when a timer is received from the Dapr sidecar ```rust #[async_trait] impl Actor for MyActor { - + async fn on_activate(&self) -> Result<(), ActorError> { println!("on_activate {}", self.id); Ok(()) @@ -60,7 +60,7 @@ Use the `DaprJson` extractor to deserialize the request from Json coming from a } ``` -1. An actor host requires an Http server to recieve callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies +1. An actor host requires an Http server to receive callbacks from the Dapr sidecar. The `DaprHttpServer` object implements this functionality and also encapsulates the actor runtime to service any hosted actors. Use the `register_actor` method to register an actor type to be serviced, this method takes an `ActorTypeRegistration` which specifies - The actor type name (used by Actor clients), and concrete struct - A factory to construct a new instance of that actor type when one is required to be activated by the runtime. The parameters passed to the factory will be the actor type, actor ID, and a Dapr client for managing state, timers and reminders for the actor. - The methods that you would like to expose to external clients. @@ -68,10 +68,10 @@ Use the `DaprJson` extractor to deserialize the request from Json coming from a ```rust let mut dapr_server = dapr::server::DaprHttpServer::new(); - dapr_server.register_actor(ActorTypeRegistration::new::("MyActor", + dapr_server.register_actor(ActorTypeRegistration::new::("MyActor", Box::new(|actor_type, id, client| Arc::new(MyActor{ - actor_type, - id, + actor_type, + id, client }))) .register_method("do_stuff", MyActor::do_stuff) diff --git a/examples/actors/server.rs b/examples/actors/server.rs index 0af7adc..d55c65f 100644 --- a/examples/actors/server.rs +++ b/examples/actors/server.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use axum::Json; use dapr::server::{ actor::{ - context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, ActorError, + axum::Json, context_client::ActorContextClient, runtime::ActorTypeRegistration, Actor, + ActorError, }, utils::DaprJson, }; diff --git a/macros/Cargo.toml b/macros/Cargo.toml index 0757a48..4a038ed 100644 --- a/macros/Cargo.toml +++ b/macros/Cargo.toml @@ -9,6 +9,6 @@ proc-macro = true [dependencies] async-trait = "0.1" log = "0.4" -axum = "0.6.19" +axum = "0.7.4" syn = {version="2.0.29",features=["full"]} quote = "1.0.8" diff --git a/macros/src/lib.rs b/macros/src/lib.rs index d3bb009..57c8559 100644 --- a/macros/src/lib.rs +++ b/macros/src/lib.rs @@ -15,14 +15,14 @@ pub fn actor(_attr: TokenStream, item: TokenStream) -> TokenStream { let mut result = TokenStream::from(quote!( #[async_trait::async_trait] - impl axum::extract::FromRequestParts for &#actor_struct_name { + impl dapr::server::actor::axum::extract::FromRequestParts for &#actor_struct_name { type Rejection = dapr::server::actor::ActorRejection; async fn from_request_parts( - parts: &mut axum::http::request::Parts, + parts: &mut dapr::server::actor::axum::http::request::Parts, state: &dapr::server::actor::runtime::ActorState, ) -> Result { - let path = match axum::extract::Path::::from_request_parts(parts, state).await { + let path = match dapr::server::actor::axum::extract::Path::::from_request_parts(parts, state).await { Ok(path) => path, Err(e) => { log::error!("Error getting path: {}", e); diff --git a/src/server/actor/mod.rs b/src/server/actor/mod.rs index 8bf9fcb..42f4a32 100644 --- a/src/server/actor/mod.rs +++ b/src/server/actor/mod.rs @@ -5,6 +5,8 @@ use std::{error::Error, fmt::Display, sync::Arc}; use self::context_client::ActorContextClient; +pub use axum; + pub mod context_client; pub mod runtime; diff --git a/src/server/actor/tests.rs b/src/server/actor/tests.rs index 7080efb..89cbab1 100644 --- a/src/server/actor/tests.rs +++ b/src/server/actor/tests.rs @@ -1,8 +1,4 @@ -use std::{ - collections::HashMap, - net::{SocketAddr, TcpListener}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use axum::{Json, Router}; @@ -15,7 +11,7 @@ use dapr_macros::actor; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use serde_json::json; -use tokio::sync::Mutex; +use tokio::{net::TcpListener, sync::Mutex}; use uuid::Uuid; #[derive(Serialize, Deserialize, Debug, PartialEq)] @@ -68,13 +64,13 @@ impl MyActor { #[tokio::test] async fn test_actor_invoke() { - let dapr_port = get_available_port().unwrap(); + let dapr_port = get_available_port().await.unwrap(); let fake_sidecar = tokio::spawn(async move { let sidecar = Router::new(); - _ = axum::Server::bind(&SocketAddr::from(([127, 0, 0, 1], dapr_port))) - .serve(sidecar.into_make_service()) - .await; + let address = format!("127.0.0.1:{dapr_port}"); + let listener = TcpListener::bind(address).await.unwrap(); + _ = axum::serve(listener, sidecar.into_make_service()).await; }); tokio::task::yield_now().await; @@ -140,13 +136,13 @@ async fn test_actor_invoke() { #[tokio::test] async fn test_actor_deactivate() { - let dapr_port = get_available_port().unwrap(); + let dapr_port = get_available_port().await.unwrap(); let fake_sidecar = tokio::spawn(async move { let sidecar = Router::new(); - _ = axum::Server::bind(&SocketAddr::from(([127, 0, 0, 1], dapr_port))) - .serve(sidecar.into_make_service()) - .await; + let address = format!("127.0.0.1:{dapr_port}"); + let listener = TcpListener::bind(address).await.unwrap(); + _ = axum::serve(listener, sidecar.into_make_service()).await; }); tokio::task::yield_now().await; @@ -246,13 +242,11 @@ impl TestState { static TEST_STATE: Lazy = Lazy::new(TestState::new); -fn get_available_port() -> Option { - (8000..9000).find(|port| port_is_available(*port)) -} - -fn port_is_available(port: u16) -> bool { - match TcpListener::bind(("127.0.0.1", port)) { - Ok(_) => true, - Err(_) => false, +async fn get_available_port() -> Option { + for port in 8000..9000 { + if TcpListener::bind(format!("127.0.0.1:{port}")).await.is_ok() { + return Some(port); + } } + None } diff --git a/src/server/http.rs b/src/server/http.rs index e0746c2..f0e0063 100644 --- a/src/server/http.rs +++ b/src/server/http.rs @@ -6,7 +6,8 @@ use axum::{ Json, Router, }; use futures::{Future, FutureExt}; -use std::{net::SocketAddr, pin::Pin, sync::Arc}; +use std::{pin::Pin, sync::Arc}; +use tokio::net::TcpListener; use super::super::client::TonicClient; use super::actor::runtime::{ActorRuntime, ActorTypeRegistration}; @@ -136,9 +137,10 @@ impl DaprHttpServer { .parse() .unwrap_or(8080); - let addr = SocketAddr::from(([127, 0, 0, 1], port.unwrap_or(default_port))); + let address = format!("127.0.0.1:{}", port.unwrap_or(default_port)); + let listener = TcpListener::bind(address).await.unwrap(); - let server = axum::Server::bind(&addr).serve(app.into_make_service()); + let server = axum::serve(listener, app.into_make_service()); let final_result = match self.shutdown_signal.take() { Some(signal) => { diff --git a/src/server/utils.rs b/src/server/utils.rs index f88057b..44b5717 100644 --- a/src/server/utils.rs +++ b/src/server/utils.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; use axum::{ - body::HttpBody, + body::Body, extract::FromRequest, http::{Request, StatusCode}, response::IntoResponse, - BoxError, }; use serde::de::DeserializeOwned; @@ -18,17 +17,14 @@ pub enum JsonRejection { } #[async_trait] -impl FromRequest for DaprJson +impl FromRequest for DaprJson where T: DeserializeOwned, - B: HttpBody + Send + 'static, - B::Data: Send, - B::Error: Into, S: Send + Sync, { type Rejection = JsonRejection; - async fn from_request(req: Request, state: &S) -> Result { + async fn from_request(req: Request, state: &S) -> Result { let bytes = match axum::body::Bytes::from_request(req, state).await { Ok(bytes) => bytes, Err(e) => {