Skip to content

Commit

Permalink
Support calling ephemeral workers via RPC (#976)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigoo authored Sep 26, 2024
1 parent 36c92d2 commit c4ec00d
Show file tree
Hide file tree
Showing 30 changed files with 5,107 additions and 102 deletions.
31 changes: 28 additions & 3 deletions golem-worker-executor-base/src/durable_host/wasm_rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ impl<Ctx: WorkerCtx> HostWasmRpc for DurableWorkerCtx<Ctx> {

match location.parse_as_golem_urn() {
Some((remote_worker_id, None)) => {
let remote_worker_id = remote_worker_id
.try_into_worker_id()
.ok_or(anyhow!("Must specify a worker name"))?; // TODO: this should not be a requirement here
let remote_worker_id =
generate_unique_local_worker_id(self, remote_worker_id).await?;

let remote_worker_id =
OwnedWorkerId::new(&self.owned_worker_id.account_id, &remote_worker_id);
let demand = self.rpc().create_demand(&remote_worker_id).await;
Expand Down Expand Up @@ -563,6 +563,31 @@ impl<Ctx: WorkerCtx> HostFutureInvokeResult for DurableWorkerCtx<Ctx> {
#[async_trait]
impl<Ctx: WorkerCtx> golem_wasm_rpc::Host for DurableWorkerCtx<Ctx> {}

async fn generate_unique_local_worker_id<Ctx: WorkerCtx>(
ctx: &mut DurableWorkerCtx<Ctx>,
remote_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
match remote_worker_id.clone().try_into_worker_id() {
Some(worker_id) => Ok(worker_id),
None => {
let worker_id = Durability::<Ctx, WorkerId, SerializableError>::wrap(
ctx,
WrappedFunctionType::ReadLocal,
"golem::rpc::wasm-rpc::generate_unique_local_worker_id",
|ctx| {
Box::pin(async move {
ctx.rpc()
.generate_unique_local_worker_id(remote_worker_id)
.await
})
},
)
.await?;
Ok(worker_id)
}
}
}

pub struct WasmRpcEntryPayload {
#[allow(dead_code)]
demand: Box<dyn RpcDemand>,
Expand Down
39 changes: 36 additions & 3 deletions golem-worker-executor-base/src/services/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ use golem_wasm_rpc::WitValue;
use tokio::runtime::Handle;
use tracing::debug;

use golem_common::model::{IdempotencyKey, OwnedWorkerId, WorkerId};
use golem_common::model::{IdempotencyKey, OwnedWorkerId, TargetWorkerId, WorkerId};

use crate::error::GolemError;
use crate::services::events::Events;
use crate::services::shard::ShardService;
use crate::services::worker_proxy::{WorkerProxy, WorkerProxyError};
use crate::services::{
active_workers, blob_store, component, golem_config, key_value, oplog, promise, scheduler,
Expand Down Expand Up @@ -64,6 +65,11 @@ pub trait Rpc {
self_args: &[String],
self_env: &[(String, String)],
) -> Result<(), RpcError>;

async fn generate_unique_local_worker_id(
&self,
target_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError>;
}

#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode)]
Expand Down Expand Up @@ -143,11 +149,18 @@ pub trait RpcDemand: Send + Sync {}

pub struct RemoteInvocationRpc {
worker_proxy: Arc<dyn WorkerProxy + Send + Sync>,
shard_service: Arc<dyn ShardService + Send + Sync>,
}

impl RemoteInvocationRpc {
pub fn new(worker_proxy: Arc<dyn WorkerProxy + Send + Sync>) -> Self {
Self { worker_proxy }
pub fn new(
worker_proxy: Arc<dyn WorkerProxy + Send + Sync>,
shard_service: Arc<dyn ShardService + Send + Sync>,
) -> Self {
Self {
worker_proxy,
shard_service,
}
}
}

Expand Down Expand Up @@ -225,6 +238,17 @@ impl Rpc for RemoteInvocationRpc {
)
.await?)
}

async fn generate_unique_local_worker_id(
&self,
target_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
let current_assignment = self.shard_service.current_assignment()?;
Ok(target_worker_id.into_worker_id(
&current_assignment.shard_ids,
current_assignment.number_of_shards,
))
}
}

pub struct DirectWorkerInvocationRpc<Ctx: WorkerCtx> {
Expand Down Expand Up @@ -571,6 +595,15 @@ impl<Ctx: WorkerCtx> Rpc for DirectWorkerInvocationRpc<Ctx> {
.await
}
}

async fn generate_unique_local_worker_id(
&self,
target_worker_id: TargetWorkerId,
) -> Result<WorkerId, GolemError> {
self.remote_rpc
.generate_unique_local_worker_id(target_worker_id)
.await
}
}

impl RpcDemand for () {}
5 changes: 4 additions & 1 deletion golem-worker-executor-base/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,10 @@ impl Bootstrap<TestWorkerCtx> for ServerBootstrap {
events: Arc<Events>,
) -> anyhow::Result<All<TestWorkerCtx>> {
let rpc = Arc::new(DirectWorkerInvocationRpc::new(
Arc::new(RemoteInvocationRpc::new(worker_proxy.clone())),
Arc::new(RemoteInvocationRpc::new(
worker_proxy.clone(),
shard_service.clone(),
)),
active_workers.clone(),
engine.clone(),
linker.clone(),
Expand Down
67 changes: 66 additions & 1 deletion golem-worker-executor-base/tests/rust_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
// limitations under the License.

use crate::common;
use crate::common::{start, TestContext};
use assert2::check;
use golem_test_framework::dsl::{worker_error_message, TestDslUnsafe};
use golem_wasm_rpc::Value;
use std::collections::HashMap;
use std::time::SystemTime;
use tracing::debug;
use tracing::{debug, info};

#[tokio::test]
#[tracing::instrument]
Expand Down Expand Up @@ -625,3 +626,67 @@ async fn error_message_non_existing_target_component() {
check!(worker_error_message(&create_auction_result.err().unwrap())
.contains("Could not find any component with the given id"));
}

#[tokio::test]
#[tracing::instrument]
async fn ephemeral_worker_invocation_via_rpc1() {
let context = TestContext::new();
let executor = start(&context).await.unwrap();

let ephemeral_component_id = executor.store_ephemeral_component("ephemeral").await;
let caller_component_id = executor.store_component("caller_composed").await;

let mut env = HashMap::new();
env.insert(
"EPHEMERAL_COMPONENT_ID".to_string(),
ephemeral_component_id.to_string(),
);
let caller_worker_id = executor
.start_worker_with(&caller_component_id, "rpc-ephemeral-1", vec![], env)
.await;

let result = executor
.invoke_and_await(&caller_worker_id, "ephemeral-test1", vec![])
.await
.unwrap();

drop(executor);

info!("result is: {result:?}");

match result.into_iter().next() {
Some(Value::List(items)) => {
let pairs = items
.into_iter()
.filter_map(|item| match item {
Value::Tuple(values) if values.len() == 2 => {
let mut iter = values.into_iter();
let key = iter.next();
let value = iter.next();
match (key, value) {
(Some(Value::String(key)), Some(Value::String(value))) => {
Some((key, value))
}
_ => None,
}
}
_ => None,
})
.collect::<Vec<(String, String)>>();

check!(pairs.len() == 3);
let name1 = &pairs[0].0;
let value1 = &pairs[0].1;
let name2 = &pairs[1].0;
let value2 = &pairs[1].1;
let name3 = &pairs[2].0;
let value3 = &pairs[2].1;

check!(name1 == name2);
check!(name2 != name3);
check!(value1 != value2);
check!(value2 != value3);
}
_ => panic!("Unexpected result value"),
}
}
5 changes: 4 additions & 1 deletion golem-worker-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ impl Bootstrap<Context> for ServerBootstrap {
let additional_deps = AdditionalDeps {};

let rpc = Arc::new(DirectWorkerInvocationRpc::new(
Arc::new(RemoteInvocationRpc::new(worker_proxy.clone())),
Arc::new(RemoteInvocationRpc::new(
worker_proxy.clone(),
shard_service.clone(),
)),
active_workers.clone(),
engine.clone(),
linker.clone(),
Expand Down
Binary file modified test-components/caller_composed.wasm
Binary file not shown.
Binary file modified test-components/counters.wasm
Binary file not shown.
Binary file added test-components/ephemeral.wasm
Binary file not shown.
Loading

0 comments on commit c4ec00d

Please sign in to comment.