Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Reduce usage of "any" (#448)" #469

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { WorkflowContextImpl } from "./workflow";
import { DBOSContext, DBOSContextImpl } from "./context";
import { WorkflowContextDebug } from "./debugger/debug_workflow";

/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
export type Communicator<R> = (ctxt: CommunicatorContext, ...args: any[]) => Promise<R>;
/* eslint-disable @typescript-eslint/no-explicit-any */
export type Communicator<T extends any[], R> = (ctxt: CommunicatorContext, ...args: T) => Promise<R>;

export interface CommunicatorConfig {
retriesAllowed?: boolean; // Should failures be retried? (default true)
Expand Down
67 changes: 37 additions & 30 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
DBOSError,
DBOSInitializationError,
DBOSWorkflowConflictUUIDError,
Expand Down Expand Up @@ -67,17 +68,17 @@ export interface DBOSConfig {
}

interface WorkflowInfo {
workflow: Workflow<unknown>;
workflow: Workflow<any, any>;
config: WorkflowConfig;
}

interface TransactionInfo {
transaction: Transaction<unknown>;
transaction: Transaction<any, any>;
config: TransactionConfig;
}

interface CommunicatorInfo {
communicator: Communicator<unknown>;
communicator: Communicator<any, any>;
config: CommunicatorConfig;
}

Expand Down Expand Up @@ -257,15 +258,15 @@ export class DBOSExecutor {
this.registeredOperations.push(...registeredClassOperations);
for (const ro of registeredClassOperations) {
if (ro.workflowConfig) {
const wf = ro.registeredFunction as Workflow<unknown>;
const wf = ro.registeredFunction as Workflow<any, any>;
this.#registerWorkflow(wf, {...ro.workflowConfig});
this.logger.debug(`Registered workflow ${ro.name}`);
} else if (ro.txnConfig) {
const tx = ro.registeredFunction as Transaction<unknown>;
const tx = ro.registeredFunction as Transaction<any, any>;
this.#registerTransaction(tx, ro.txnConfig);
this.logger.debug(`Registered transaction ${ro.name}`);
} else if (ro.commConfig) {
const comm = ro.registeredFunction as Communicator<unknown>;
const comm = ro.registeredFunction as Communicator<any, any>;
this.#registerCommunicator(comm, ro.commConfig);
this.logger.debug(`Registered communicator ${ro.name}`);
}
Expand Down Expand Up @@ -346,7 +347,7 @@ export class DBOSExecutor {

/* WORKFLOW OPERATIONS */

#registerWorkflow<R>(wf: Workflow<R>, config: WorkflowConfig = {}) {
#registerWorkflow<T extends any[], R>(wf: Workflow<T, R>, config: WorkflowConfig = {}) {
if (wf.name === DBOSExecutor.tempWorkflowName || this.workflowInfoMap.has(wf.name)) {
throw new DBOSError(`Repeated workflow name: ${wf.name}`);
}
Expand All @@ -357,7 +358,7 @@ export class DBOSExecutor {
this.workflowInfoMap.set(wf.name, workflowInfo);
}

#registerTransaction<R>(txn: Transaction<R>, params: TransactionConfig = {}) {
#registerTransaction<T extends any[], R>(txn: Transaction<T, R>, params: TransactionConfig = {}) {
if (this.transactionInfoMap.has(txn.name)) {
throw new DBOSError(`Repeated Transaction name: ${txn.name}`);
}
Expand All @@ -368,7 +369,7 @@ export class DBOSExecutor {
this.transactionInfoMap.set(txn.name, txnInfo);
}

#registerCommunicator<R>(comm: Communicator<R>, params: CommunicatorConfig = {}) {
#registerCommunicator<T extends any[], R>(comm: Communicator<T, R>, params: CommunicatorConfig = {}) {
if (this.communicatorInfoMap.has(comm.name)) {
throw new DBOSError(`Repeated Commmunicator name: ${comm.name}`);
}
Expand All @@ -379,15 +380,15 @@ export class DBOSExecutor {
this.communicatorInfoMap.set(comm.name, commInfo);
}

async workflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, ...args: unknown[]): Promise<WorkflowHandle<R>> {
async workflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, ...args: T): Promise<WorkflowHandle<R>> {
if (this.debugMode) {
return this.debugWorkflow(wf, params, undefined, undefined, ...args);
}
return this.internalWorkflow(wf, params, undefined, undefined, ...args);
}

// If callerUUID and functionID are set, it means the workflow is invoked from within a workflow.
async internalWorkflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
async internalWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
const workflowUUID: string = params.workflowUUID ? params.workflowUUID : this.#generateUUID();
const presetUUID: boolean = params.workflowUUID ? true : false;

Expand Down Expand Up @@ -493,7 +494,7 @@ export class DBOSExecutor {
/**
* DEBUG MODE workflow execution, skipping all the recording
*/
async debugWorkflow<R>(wf: Workflow<R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
async debugWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
// In debug mode, we must have a specific workflow UUID.
if (!params.workflowUUID) {
throw new DBOSDebuggerError("Workflow UUID not found!");
Expand Down Expand Up @@ -534,28 +535,28 @@ export class DBOSExecutor {
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
}

async transaction<R>(txn: Transaction<R>, params: WorkflowParams, ...args: unknown[] ): Promise<R> {
async transaction<T extends any[], R>(txn: Transaction<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
// Create a workflow and call transaction.
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.transaction(txn, ...args);
};
return (await this.workflow<R>(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
}

async external<R>(commFn: Communicator<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
async external<T extends any[], R>(commFn: Communicator<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
// Create a workflow and call external.
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.external(commFn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.external, tempWfName: commFn.name }, ...args)).getResult();
}

async send(destinationUUID: string, message: NonNullable<unknown>, topic?: string, idempotencyKey?: string): Promise<void> {
async send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void> {
// Create a workflow and call send.
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: NonNullable<unknown>, topic?: string) => {
return await ctxt.send(destinationUUID, message, topic);
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: T, topic?: string) => {
return await ctxt.send<T>(destinationUUID, message, topic);
};
const workflowUUID = idempotencyKey ? destinationUUID + idempotencyKey : undefined;
return (await this.workflow(temp_workflow, { workflowUUID: workflowUUID, tempWfType: TempWorkflowType.send }, destinationUUID, message, topic)).getResult();
Expand All @@ -564,7 +565,7 @@ export class DBOSExecutor {
/**
* Wait for a workflow to emit an event, then return its value.
*/
async getEvent<T extends NonNullable<unknown>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
async getEvent<T extends NonNullable<any>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
return this.systemDatabase.getEvent(workflowUUID, key, timeoutSeconds);
}

Expand All @@ -584,7 +585,7 @@ export class DBOSExecutor {
* A recovery process that by default runs during executor init time.
* It runs to completion all pending workflows that were executing when the previous executor failed.
*/
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<unknown>[]> {
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<any>[]> {
const pendingWorkflows: string[] = [];
for (const execID of executorIDs) {
if (execID == "local" && process.env.DBOS__VMID) {
Expand All @@ -596,7 +597,7 @@ export class DBOSExecutor {
pendingWorkflows.push(...wIDs);
}

const handlerArray: WorkflowHandle<unknown>[] = [];
const handlerArray: WorkflowHandle<any>[] = [];
for (const workflowUUID of pendingWorkflows) {
try {
handlerArray.push(await this.executeWorkflowUUID(workflowUUID));
Expand All @@ -619,6 +620,7 @@ export class DBOSExecutor {
const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName);

if (wfInfo) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand All @@ -629,15 +631,16 @@ export class DBOSExecutor {
throw new DBOSError(`This should never happen! Cannot find workflow info for a non-temporary workflow! UUID ${workflowUUID}, name ${wfName}`);
}

let temp_workflow: Workflow<unknown>;
let temp_workflow: Workflow<any, any>;
if (nameArr[1] === TempWorkflowType.transaction) {
const txnInfo: TransactionInfo | undefined = this.transactionInfoMap.get(nameArr[2]);
if (!txnInfo) {
this.logger.error(`Cannot find transaction info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.transaction(txnInfo.transaction, ...args);
};
} else if (nameArr[1] === TempWorkflowType.external) {
Expand All @@ -646,18 +649,21 @@ export class DBOSExecutor {
this.logger.error(`Cannot find communicator info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.external(commInfo.communicator, ...args);
};
} else if (nameArr[1] === TempWorkflowType.send) {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
return await ctxt.send(args[0] as string, args[1] as string, args[2] as string);
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return await ctxt.send<any>(args[0], args[1], args[2]);
};
} else {
this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`)
throw new DBOSNotRegisteredError(wfName);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand Down Expand Up @@ -705,7 +711,7 @@ export class DBOSExecutor {
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES ";
let paramCnt = 1;
const values: unknown[] = [];
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
Expand All @@ -726,6 +732,7 @@ export class DBOSExecutor {
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
Expand Down
5 changes: 3 additions & 2 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import { DBOSKafka } from '../kafka/kafka';
import { DBOSScheduler } from '../scheduler/scheduler';

interface ModuleExports {
[key: string]: unknown;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
}

export interface DBOSRuntimeConfig {
Expand Down Expand Up @@ -73,7 +74,7 @@ export class DBOSRuntime {
let exports: ModuleExports;
if (fs.existsSync(operations)) {
const operationsURL = pathToFileURL(operations).href;
exports = (await import(operationsURL)) as ModuleExports;
exports = (await import(operationsURL)) as Promise<ModuleExports>;
} else {
throw new DBOSFailLoadOperationsError(`Failed to load operations from the entrypoint ${entrypoint}`);
}
Expand Down
32 changes: 17 additions & 15 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { transaction_outputs } from "../../schemas/user_db_schema";
import { Transaction, TransactionContextImpl } from "../transaction";
import { Communicator } from "../communicator";
Expand Down Expand Up @@ -50,15 +51,16 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon

invoke<T extends object>(object: T): WFInvokeFuncs<T> {
const ops = getRegisteredOperations(object);
const proxy: Record<string, unknown> = {};

const proxy: any = {};
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
?
(...args: unknown[]) => this.transaction(op.registeredFunction as Transaction<unknown>, ...args)
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.transaction(op.registeredFunction as Transaction<any[], any>, ...args)
: op.commConfig
?
(...args: unknown[]) => this.external(op.registeredFunction as Communicator<unknown>, ...args)
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.external(op.registeredFunction as Communicator<any[], any>, ...args)
: undefined;
}
return proxy as WFInvokeFuncs<T>;
Expand Down Expand Up @@ -97,7 +99,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
* Execute a transactional function in debug mode.
* If a debug proxy is provided, it connects to a debug proxy and everything should be read-only.
*/
async transaction<R>(txn: Transaction<R>, ...args: unknown[]): Promise<R> {
async transaction<T extends any[], R>(txn: Transaction<T, R>, ...args: T): Promise<R> {
const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name);
if (txnInfo === undefined) {
throw new DBOSDebuggerError(`Transaction ${txn.name} not registered!`);
Expand Down Expand Up @@ -166,7 +168,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check.output; // Always return the recorded result.
}

async external<R>(commFn: Communicator<R>, ..._args: unknown[]): Promise<R> {
async external<T extends any[], R>(commFn: Communicator<T, R>, ..._args: T): Promise<R> {
const commConfig = this.#dbosExec.communicatorInfoMap.get(commFn.name);
if (commConfig === undefined) {
throw new DBOSDebuggerError(`Communicator ${commFn.name} not registered!`);
Expand All @@ -185,22 +187,22 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}

// Invoke the debugWorkflow() function instead.
async startChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<R> {
async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

async send(_destinationUUID: string, _message: NonNullable<unknown>, _topic?: string | undefined): Promise<void> {
async send<T extends NonNullable<any>>(_destinationUUID: string, _message: T, _topic?: string | undefined): Promise<void> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -212,7 +214,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return;
}

async recv<T extends NonNullable<unknown>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
async recv<T extends NonNullable<any>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -224,7 +226,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check as T | null;
}

async setEvent(_key: string, _value: NonNullable<unknown>): Promise<void> {
async setEvent<T extends NonNullable<any>>(_key: string, _value: T): Promise<void> {
const functionID: number = this.functionIDGetIncrement();
// Original result must exist during replay.
const check: undefined | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<undefined>(this.workflowUUID, functionID);
Expand All @@ -234,7 +236,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
this.logger.debug("Use recorded setEvent output.");
}

async getEvent<T extends NonNullable<unknown>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
async getEvent<T extends NonNullable<any>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand Down
Loading