Skip to content

Commit 759b32b

Browse files
authored
Add support for headers for service-to-service calls (#467)
1 parent c266c69 commit 759b32b

File tree

10 files changed

+461
-34
lines changed

10 files changed

+461
-34
lines changed

package-lock.json

Lines changed: 357 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
"prettier": "^2.8.4",
4747
"release-it": "^17.3.0",
4848
"typescript": "^5.4.5",
49-
"vitest": "^1.6.0"
49+
"vitest": "^1.6.0",
50+
"wasm-pack": "^0.13.1",
51+
"wasm-pack-inline": "^0.1.2"
5052
},
5153
"engines": {
5254
"node": ">= 18.13"

packages/restate-sdk/src/context.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ export type GenericCall<REQ, RES> = {
228228
method: string;
229229
parameter: REQ;
230230
key?: string;
231+
headers?: Record<string, string>;
231232
inputSerde?: Serde<REQ>;
232233
outputSerde?: Serde<RES>;
233234
};
@@ -242,6 +243,7 @@ export type GenericSend<REQ> = {
242243
method: string;
243244
parameter: REQ;
244245
key?: string;
246+
headers?: Record<string, string>;
245247
inputSerde?: Serde<REQ>;
246248
delay?: number;
247249
};

packages/restate-sdk/src/context_impl.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import type {
6060
} from "node:stream/web";
6161
import type { ReadableStreamReadResult } from "stream/web";
6262
import type { CompletablePromise } from "./utils/completable_promise.js";
63+
import { WasmHeader } from "./endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js";
6364

6465
export type InternalCombineablePromise<T> = CombineablePromise<T> & {
6566
asyncResultHandle: number;
@@ -213,7 +214,17 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
213214
return this.processCompletableEntry(
214215
(vm) => {
215216
const parameter = requestSerde.serialize(call.parameter);
216-
return vm.sys_call(call.service, call.method, parameter, call.key);
217+
return vm.sys_call(
218+
call.service,
219+
call.method,
220+
parameter,
221+
call.key,
222+
call.headers
223+
? Object.entries(call.headers).map(
224+
([key, value]) => new WasmHeader(key, value)
225+
)
226+
: []
227+
);
217228
},
218229
(asyncResultValue) => {
219230
if (
@@ -248,7 +259,18 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
248259
delay = BigInt(send.delay);
249260
}
250261

251-
vm.sys_send(send.service, send.method, parameter, send.key, delay);
262+
void vm.sys_send(
263+
send.service,
264+
send.method,
265+
parameter,
266+
send.key,
267+
send.headers
268+
? Object.entries(send.headers).map(
269+
([key, value]) => new WasmHeader(key, value)
270+
)
271+
: [],
272+
delay
273+
);
252274
});
253275
}
254276

@@ -736,6 +758,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
736758
| { Success: Uint8Array }
737759
| { Failure: vm.WasmFailure }
738760
| { StateKeys: string[] }
761+
| { InvocationId: string }
739762
| { CombinatorResult: number[] }
740763
) => T
741764
): LazyContextPromise<T> {
@@ -759,6 +782,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
759782
| { Success: Uint8Array }
760783
| { Failure: vm.WasmFailure }
761784
| { StateKeys: string[] }
785+
| { InvocationId: string }
762786
| { CombinatorResult: number[] }
763787
) => T
764788
): Promise<T> {

packages/restate-sdk/src/endpoint/handlers/vm/sdk_shared_core_wasm_bindings.d.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ export interface WasmFailure {
2222
message: string;
2323
}
2424

25+
export type WasmSendHandle = number;
26+
2527
export interface WasmExponentialRetryConfig {
2628
initial_interval: number | undefined;
2729
factor: number;
@@ -41,6 +43,7 @@ export type WasmAsyncResultValue =
4143
| { Success: Uint8Array }
4244
| { Failure: WasmFailure }
4345
| { StateKeys: string[] }
46+
| { InvocationId: string }
4447
| { CombinatorResult: WasmAsyncResultHandle[] };
4548

4649
export type WasmRunEnterResult =
@@ -181,29 +184,34 @@ export class WasmVM {
181184
* @param {string} service
182185
* @param {string} handler
183186
* @param {Uint8Array} buffer
184-
* @param {string | undefined} [key]
187+
* @param {string | undefined} key
188+
* @param {(WasmHeader)[]} headers
185189
* @returns {number}
186190
*/
187191
sys_call(
188192
service: string,
189193
handler: string,
190194
buffer: Uint8Array,
191-
key?: string
195+
key: string | undefined,
196+
headers: WasmHeader[]
192197
): number;
193198
/**
194199
* @param {string} service
195200
* @param {string} handler
196201
* @param {Uint8Array} buffer
197-
* @param {string | undefined} [key]
202+
* @param {string | undefined} key
203+
* @param {(WasmHeader)[]} headers
198204
* @param {bigint | undefined} [delay]
205+
* @returns {WasmSendHandle}
199206
*/
200207
sys_send(
201208
service: string,
202209
handler: string,
203210
buffer: Uint8Array,
204-
key?: string,
211+
key: string | undefined,
212+
headers: WasmHeader[],
205213
delay?: bigint
206-
): void;
214+
): WasmSendHandle;
207215
/**
208216
* @returns {WasmAwakeable}
209217
*/

packages/restate-sdk/src/endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js

Lines changed: 21 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk-shared-core-wasm-bindings/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sdk-shared-core-wasm-bindings/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ default = ["console_error_panic_hook"]
1212

1313
[dependencies]
1414
wasm-bindgen = "0.2.84"
15-
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "771df58ed7fa8976f18a865496e1f3ee0dae73f4", features = ["request_identity"] }
15+
restate-sdk-shared-core = { git = "https://github.com/restatedev/sdk-shared-core.git", rev = "3c342c43e91514aaa20c7ee42295162842b223ab", features = ["request_identity"] }
1616
console_error_panic_hook = { version = "0.1.7", optional = true }
1717
serde = { version = "1.0.210", features = ["derive"] }
1818
serde-wasm-bindgen = "0.6.5"
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
To build:
22

3-
* wasm-pack build --target web
3+
* npx wasm-pack build --target web
44
* npx wasm-pack-inline ./pkg --dir ../packages/restate-sdk/src/endpoint/handlers/vm --name sdk_shared_core_wasm_bindings

sdk-shared-core-wasm-bindings/src/lib.rs

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use js_sys::Uint8Array;
22
use restate_sdk_shared_core::{
33
AsyncResultAccessTracker, AsyncResultCombinator, AsyncResultHandle, AsyncResultState, CoreVM,
44
Error, Header, HeaderMap, IdentityVerifier, Input, NonEmptyValue, ResponseHead, RetryPolicy,
5-
RunEnterResult, RunExitResult, SuspendedOrVMError, TakeOutputResult, Target, TerminalFailure,
6-
VMOptions, Value, VM,
5+
RunEnterResult, RunExitResult, SendHandle, SuspendedOrVMError, TakeOutputResult, Target,
6+
TerminalFailure, VMOptions, Value, VM,
77
};
88
use serde::{Deserialize, Serialize};
99
use std::convert::{Infallible, Into};
@@ -41,14 +41,9 @@ extern "C" {
4141
fn vm_log(level: LogLevel, s: &str);
4242
}
4343

44+
#[derive(Default)]
4445
pub struct MakeWebConsoleWriter {}
4546

46-
impl Default for MakeWebConsoleWriter {
47-
fn default() -> Self {
48-
MakeWebConsoleWriter {}
49-
}
50-
}
51-
5247
impl<'a> MakeWriter<'a> for MakeWebConsoleWriter {
5348
type Writer = ConsoleWriter;
5449

@@ -155,6 +150,15 @@ impl From<Header> for WasmHeader {
155150
}
156151
}
157152

153+
impl From<WasmHeader> for Header {
154+
fn from(h: WasmHeader) -> Self {
155+
Header {
156+
key: h.key.into(),
157+
value: h.value.into(),
158+
}
159+
}
160+
}
161+
158162
#[wasm_bindgen(getter_with_clone)]
159163
pub struct WasmResponseHead {
160164
#[wasm_bindgen(readonly)]
@@ -203,6 +207,16 @@ impl From<WasmFailure> for JsValue {
203207
}
204208
}
205209

210+
#[derive(Tsify, Serialize, Deserialize)]
211+
#[tsify(into_wasm_abi, from_wasm_abi)]
212+
pub struct WasmSendHandle(u32);
213+
214+
impl From<SendHandle> for WasmSendHandle {
215+
fn from(value: SendHandle) -> Self {
216+
WasmSendHandle(value.into())
217+
}
218+
}
219+
206220
#[derive(Tsify, Serialize, Deserialize)]
207221
#[tsify(into_wasm_abi, from_wasm_abi)]
208222
pub struct WasmExponentialRetryConfig {
@@ -285,6 +299,7 @@ pub enum WasmAsyncResultValue {
285299
),
286300
Failure(WasmFailure),
287301
StateKeys(Vec<String>),
302+
InvocationId(String),
288303
CombinatorResult(Vec<WasmAsyncResultHandle>),
289304
}
290305

@@ -377,6 +392,10 @@ impl WasmVM {
377392
Some(Value::Success(b)) => WasmAsyncResultValue::Success(b.to_vec().into()),
378393
Some(Value::Failure(f)) => WasmAsyncResultValue::Failure(f.into()),
379394
Some(Value::StateKeys(keys)) => WasmAsyncResultValue::StateKeys(keys),
395+
Some(Value::InvocationId(invocation_id)) => {
396+
WasmAsyncResultValue::InvocationId(invocation_id)
397+
}
398+
380399
Some(Value::CombinatorResult(handles)) => WasmAsyncResultValue::CombinatorResult(
381400
handles.into_iter().map(Into::into).collect(),
382401
),
@@ -435,38 +454,46 @@ impl WasmVM {
435454
handler: String,
436455
buffer: js_sys::Uint8Array,
437456
key: Option<String>,
457+
headers: Vec<WasmHeader>,
438458
) -> Result<WasmAsyncResultHandle, WasmFailure> {
439459
self.vm
440460
.sys_call(
441461
Target {
442462
service,
443463
handler,
444464
key,
465+
idempotency_key: None,
466+
headers: headers.into_iter().map(Header::from).collect(),
445467
},
446468
buffer.to_vec().into(),
447469
)
448470
.map(Into::into)
449471
.map_err(Into::into)
450472
}
451473

474+
#[allow(clippy::too_many_arguments)]
452475
pub fn sys_send(
453476
&mut self,
454477
service: String,
455478
handler: String,
456479
buffer: Uint8Array,
457480
key: Option<String>,
481+
headers: Vec<WasmHeader>,
458482
delay: Option<u64>,
459-
) -> Result<(), WasmFailure> {
483+
) -> Result<WasmSendHandle, WasmFailure> {
460484
self.vm
461485
.sys_send(
462486
Target {
463487
service,
464488
handler,
465489
key,
490+
idempotency_key: None,
491+
headers: headers.into_iter().map(Header::from).collect(),
466492
},
467493
buffer.to_vec().into(),
468494
delay.map(|delay| duration_since_unix_epoch() + Duration::from_millis(delay)),
469495
)
496+
.map(Into::into)
470497
.map_err(Into::into)
471498
}
472499

0 commit comments

Comments
 (0)