Skip to content

Commit 5b07f5e

Browse files
committed
Expose invocation id of calls
1 parent 56a9899 commit 5b07f5e

File tree

5 files changed

+93
-22
lines changed

5 files changed

+93
-22
lines changed

packages/restate-sdk/src/common_api.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ export type {
1818
Rand,
1919
GenericCall,
2020
GenericSend,
21+
InvocationId,
22+
InvocationHandle,
23+
InvocationPromise,
2124
} from "./context.js";
2225
export { CombineablePromise } from "./context.js";
2326

packages/restate-sdk/src/context.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ export interface Context extends RestateContext {
537537

538538
genericCall<REQ = Uint8Array, RES = Uint8Array>(
539539
call: GenericCall<REQ, RES>
540-
): CombineablePromise<RES>;
540+
): InvocationPromise<RES>;
541541

542542
genericSend<REQ = Uint8Array>(call: GenericSend<REQ>): void;
543543

@@ -633,6 +633,14 @@ export type CombineablePromise<T> = Promise<T> & {
633633
orTimeout(millis: number): CombineablePromise<T>;
634634
};
635635

636+
export type InvocationId = string & { __brand: "InvocationId" };
637+
638+
export type InvocationHandle = {
639+
invocationId(): Promise<InvocationId>;
640+
};
641+
642+
export type InvocationPromise<T> = CombineablePromise<T> & InvocationHandle;
643+
636644
export const CombineablePromise = {
637645
/**
638646
* Creates a Promise that is resolved with an array of results when all of the provided Promises

packages/restate-sdk/src/context_impl.ts

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import type {
1717
DurablePromise,
1818
GenericCall,
1919
GenericSend,
20+
InvocationPromise,
21+
InvocationId,
2022
ObjectContext,
2123
Rand,
2224
Request,
@@ -63,8 +65,10 @@ import {
6365
extractContext,
6466
pendingPromise,
6567
PromisesExecutor,
68+
RestateInvocationPromise,
6669
RestateCombinatorPromise,
6770
RestatePendingPromise,
71+
InvocationPendingPromise,
6872
RestateSinglePromise,
6973
} from "./promises.js";
7074
import { InputPump, OutputPump } from "./io.js";
@@ -168,13 +172,14 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
168172
//
169173
public genericCall<REQ = Uint8Array, RES = Uint8Array>(
170174
call: GenericCall<REQ, RES>
171-
): CombineablePromise<RES> {
175+
): InvocationPromise<RES> {
172176
const requestSerde: Serde<REQ> =
173177
call.inputSerde ?? (serde.binary as Serde<REQ>);
174178
const responseSerde: Serde<RES> =
175179
call.outputSerde ?? (serde.binary as Serde<RES>);
176180

177-
return this.processCompletableEntry((vm) => {
181+
try {
182+
const vm = this.coreVm;
178183
const parameter = requestSerde.serialize(call.parameter);
179184
const call_handles = vm.sys_call(
180185
call.service,
@@ -188,13 +193,25 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
188193
: []
189194
);
190195

191-
// TODO eventually we return this promise back to the user
192-
// const invocationIdPromise = this.createInvocationIdPromise(
193-
// call_handles.invocation_id_completion_id
194-
// );
196+
const invocationIdHandle = call_handles.invocation_id_completion_id;
197+
const invocationHandle = () => {
198+
// create a promise, on demand that resolves to an invocation id if needed.
199+
return this.createInvocationIdPromise(invocationIdHandle);
200+
};
201+
202+
const callHandle = call_handles.call_completion_id;
195203

196-
return call_handles.call_completion_id;
197-
}, completeUsing(SuccessWithSerde(responseSerde), Failure));
204+
return new RestateInvocationPromise(
205+
this,
206+
callHandle,
207+
completeUsing(SuccessWithSerde(responseSerde), Failure),
208+
invocationHandle
209+
);
210+
} catch (e) {
211+
this.handleInvocationEndError(e);
212+
// We return a pending promise to avoid the caller to see the error.
213+
return new InvocationPendingPromise(this);
214+
}
198215
}
199216

200217
public genericSend<REQ = Uint8Array>(send: GenericSend<REQ>) {
@@ -228,7 +245,7 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
228245

229246
private createInvocationIdPromise(
230247
handle: number
231-
): RestateSinglePromise<string> {
248+
): RestateSinglePromise<InvocationId> {
232249
return new RestateSinglePromise(this, handle, completeUsing(InvocationId));
233250
}
234251

@@ -696,17 +713,25 @@ const VoidAsUndefined: Completer = (value, prom) => {
696713
return false;
697714
};
698715

699-
function SuccessWithSerde<T>(serde?: Serde<T>): Completer {
716+
function SuccessWithSerde<T>(
717+
serde?: Serde<T>,
718+
transform?: <U>(success: T) => U
719+
): Completer {
700720
return (value, prom) => {
701-
if (typeof value === "object" && "Success" in value) {
702-
if (!serde) {
703-
prom.resolve(defaultSerde<T>().deserialize(value.Success));
704-
} else {
705-
prom.resolve(serde.deserialize(value.Success));
706-
}
707-
return true;
721+
if (typeof value !== "object" || !("Success" in value)) {
722+
return false;
708723
}
709-
return false;
724+
let val: T;
725+
if (serde) {
726+
val = serde.deserialize(value.Success);
727+
} else {
728+
val = defaultSerde<T>().deserialize(value.Success);
729+
}
730+
if (transform) {
731+
val = transform(val);
732+
}
733+
prom.resolve(val);
734+
return true;
710735
};
711736
}
712737

packages/restate-sdk/src/promises.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,11 @@
1111

1212
/* eslint-disable @typescript-eslint/no-explicit-any */
1313

14-
import type { CombineablePromise } from "./context.js";
14+
import type {
15+
CombineablePromise,
16+
InvocationId,
17+
InvocationPromise,
18+
} from "./context.js";
1519
import type * as vm from "./endpoint/handlers/vm/sdk_shared_core_wasm_bindings.js";
1620
import { CancelledError, TimeoutError } from "./types/errors.js";
1721
import { CompletablePromise } from "./utils/completable_promise.js";
@@ -191,6 +195,24 @@ export class RestateSinglePromise<T> extends AbstractRestatePromise<T> {
191195
readonly [Symbol.toStringTag] = "RestateSinglePromise";
192196
}
193197

198+
export class RestateInvocationPromise<T>
199+
extends RestateSinglePromise<T>
200+
implements InvocationPromise<T>
201+
{
202+
constructor(
203+
ctx: ContextImpl,
204+
handle: number,
205+
completer: (value: AsyncResultValue, prom: CompletablePromise<T>) => void,
206+
private readonly invocationIdSupplier: () => Promise<InvocationId>
207+
) {
208+
super(ctx, handle, completer);
209+
}
210+
211+
invocationId(): Promise<InvocationId> {
212+
return this.invocationIdSupplier();
213+
}
214+
}
215+
194216
export class RestateCombinatorPromise extends AbstractRestatePromise<any> {
195217
private state: PromiseState = PromiseState.NOT_COMPLETED;
196218
private readonly combinatorPromise: Promise<any>;
@@ -278,6 +300,19 @@ export class RestatePendingPromise<T> implements RestatePromise<T> {
278300
readonly [Symbol.toStringTag] = "RestatePendingPromise";
279301
}
280302

303+
export class InvocationPendingPromise<T>
304+
extends RestatePendingPromise<T>
305+
implements InvocationPromise<T>
306+
{
307+
constructor(ctx: ContextImpl) {
308+
super(ctx);
309+
}
310+
311+
invocationId(): Promise<InvocationId> {
312+
return pendingPromise();
313+
}
314+
}
315+
281316
/**
282317
* Promises executor, gluing VM with I/O and Promises given to user space.
283318
*/

packages/restate-sdk/src/types/rpc.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,10 @@
1313
/* eslint-disable @typescript-eslint/no-namespace */
1414
/* eslint-disable @typescript-eslint/ban-types */
1515
import type {
16-
CombineablePromise,
1716
Context,
1817
GenericCall,
1918
GenericSend,
19+
InvocationPromise,
2020
ObjectContext,
2121
ObjectSharedContext,
2222
WorkflowContext,
@@ -218,7 +218,7 @@ export type Client<M> = {
218218
) => PromiseLike<infer O>
219219
? (
220220
...args: [...P, ...[opts?: Opts<InferArg<P>, O>]]
221-
) => CombineablePromise<O>
221+
) => InvocationPromise<O>
222222
: never;
223223
};
224224

0 commit comments

Comments
 (0)