Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce usage of "any" #448

Merged
merged 6 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { WorkflowContextImpl } from "./workflow";
import { DBOSContext, DBOSContextImpl } from "./context";
import { WorkflowContextDebug } from "./debugger/debug_workflow";

/* eslint-disable @typescript-eslint/no-explicit-any */
export type Communicator<T extends any[], R> = (ctxt: CommunicatorContext, ...args: T) => Promise<R>;
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
export type Communicator<R> = (ctxt: CommunicatorContext, ...args: any[]) => Promise<R>;

export interface CommunicatorConfig {
retriesAllowed?: boolean; // Should failures be retried? (default true)
Expand Down
67 changes: 30 additions & 37 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
import {
DBOSError,
DBOSInitializationError,
DBOSWorkflowConflictUUIDError,
Expand Down Expand Up @@ -68,17 +67,17 @@ export interface DBOSConfig {
}

interface WorkflowInfo {
workflow: Workflow<any, any>;
workflow: Workflow<unknown>;
config: WorkflowConfig;
}

interface TransactionInfo {
transaction: Transaction<any, any>;
transaction: Transaction<unknown>;
config: TransactionConfig;
}

interface CommunicatorInfo {
communicator: Communicator<any, any>;
communicator: Communicator<unknown>;
config: CommunicatorConfig;
}

Expand Down Expand Up @@ -258,15 +257,15 @@ export class DBOSExecutor {
this.registeredOperations.push(...registeredClassOperations);
for (const ro of registeredClassOperations) {
if (ro.workflowConfig) {
const wf = ro.registeredFunction as Workflow<any, any>;
const wf = ro.registeredFunction as Workflow<unknown>;
this.#registerWorkflow(wf, {...ro.workflowConfig});
this.logger.debug(`Registered workflow ${ro.name}`);
} else if (ro.txnConfig) {
const tx = ro.registeredFunction as Transaction<any, any>;
const tx = ro.registeredFunction as Transaction<unknown>;
this.#registerTransaction(tx, ro.txnConfig);
this.logger.debug(`Registered transaction ${ro.name}`);
} else if (ro.commConfig) {
const comm = ro.registeredFunction as Communicator<any, any>;
const comm = ro.registeredFunction as Communicator<unknown>;
this.#registerCommunicator(comm, ro.commConfig);
this.logger.debug(`Registered communicator ${ro.name}`);
}
Expand Down Expand Up @@ -347,7 +346,7 @@ export class DBOSExecutor {

/* WORKFLOW OPERATIONS */

#registerWorkflow<T extends any[], R>(wf: Workflow<T, R>, config: WorkflowConfig = {}) {
#registerWorkflow<R>(wf: Workflow<R>, config: WorkflowConfig = {}) {
if (wf.name === DBOSExecutor.tempWorkflowName || this.workflowInfoMap.has(wf.name)) {
throw new DBOSError(`Repeated workflow name: ${wf.name}`);
}
Expand All @@ -358,7 +357,7 @@ export class DBOSExecutor {
this.workflowInfoMap.set(wf.name, workflowInfo);
}

#registerTransaction<T extends any[], R>(txn: Transaction<T, R>, params: TransactionConfig = {}) {
#registerTransaction<R>(txn: Transaction<R>, params: TransactionConfig = {}) {
if (this.transactionInfoMap.has(txn.name)) {
throw new DBOSError(`Repeated Transaction name: ${txn.name}`);
}
Expand All @@ -369,7 +368,7 @@ export class DBOSExecutor {
this.transactionInfoMap.set(txn.name, txnInfo);
}

#registerCommunicator<T extends any[], R>(comm: Communicator<T, R>, params: CommunicatorConfig = {}) {
#registerCommunicator<R>(comm: Communicator<R>, params: CommunicatorConfig = {}) {
if (this.communicatorInfoMap.has(comm.name)) {
throw new DBOSError(`Repeated Commmunicator name: ${comm.name}`);
}
Expand All @@ -380,15 +379,15 @@ export class DBOSExecutor {
this.communicatorInfoMap.set(comm.name, commInfo);
}

async workflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, ...args: T): Promise<WorkflowHandle<R>> {
async workflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, ...args: unknown[]): Promise<WorkflowHandle<R>> {
if (this.debugMode) {
return this.debugWorkflow(wf, params, undefined, undefined, ...args);
}
return this.internalWorkflow(wf, params, undefined, undefined, ...args);
}

// If callerUUID and functionID are set, it means the workflow is invoked from within a workflow.
async internalWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async internalWorkflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const workflowUUID: string = params.workflowUUID ? params.workflowUUID : this.#generateUUID();
const presetUUID: boolean = params.workflowUUID ? true : false;

Expand Down Expand Up @@ -494,7 +493,7 @@ export class DBOSExecutor {
/**
* DEBUG MODE workflow execution, skipping all the recording
*/
async debugWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async debugWorkflow<R>(wf: Workflow<R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
// In debug mode, we must have a specific workflow UUID.
if (!params.workflowUUID) {
throw new DBOSDebuggerError("Workflow UUID not found!");
Expand Down Expand Up @@ -535,28 +534,28 @@ export class DBOSExecutor {
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
}

async transaction<T extends any[], R>(txn: Transaction<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, params: WorkflowParams, ...args: unknown[] ): Promise<R> {
// Create a workflow and call transaction.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.transaction(txn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
return (await this.workflow<R>(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
}

async external<T extends any[], R>(commFn: Communicator<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
// Create a workflow and call external.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.external(commFn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.external, tempWfName: commFn.name }, ...args)).getResult();
}

async send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void> {
async send(destinationUUID: string, message: NonNullable<unknown>, topic?: string, idempotencyKey?: string): Promise<void> {
// Create a workflow and call send.
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: T, topic?: string) => {
return await ctxt.send<T>(destinationUUID, message, topic);
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: NonNullable<unknown>, topic?: string) => {
return await ctxt.send(destinationUUID, message, topic);
};
const workflowUUID = idempotencyKey ? destinationUUID + idempotencyKey : undefined;
return (await this.workflow(temp_workflow, { workflowUUID: workflowUUID, tempWfType: TempWorkflowType.send }, destinationUUID, message, topic)).getResult();
Expand All @@ -565,7 +564,7 @@ export class DBOSExecutor {
/**
* Wait for a workflow to emit an event, then return its value.
*/
async getEvent<T extends NonNullable<any>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
async getEvent<T extends NonNullable<unknown>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
return this.systemDatabase.getEvent(workflowUUID, key, timeoutSeconds);
}

Expand All @@ -585,7 +584,7 @@ export class DBOSExecutor {
* A recovery process that by default runs during executor init time.
* It runs to completion all pending workflows that were executing when the previous executor failed.
*/
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<any>[]> {
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<unknown>[]> {
const pendingWorkflows: string[] = [];
for (const execID of executorIDs) {
if (execID == "local" && process.env.DBOS__VMID) {
Expand All @@ -597,7 +596,7 @@ export class DBOSExecutor {
pendingWorkflows.push(...wIDs);
}

const handlerArray: WorkflowHandle<any>[] = [];
const handlerArray: WorkflowHandle<unknown>[] = [];
for (const workflowUUID of pendingWorkflows) {
try {
handlerArray.push(await this.executeWorkflowUUID(workflowUUID));
Expand All @@ -620,7 +619,6 @@ export class DBOSExecutor {
const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName);

if (wfInfo) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand All @@ -631,16 +629,15 @@ export class DBOSExecutor {
throw new DBOSError(`This should never happen! Cannot find workflow info for a non-temporary workflow! UUID ${workflowUUID}, name ${wfName}`);
}

let temp_workflow: Workflow<any, any>;
let temp_workflow: Workflow<unknown>;
if (nameArr[1] === TempWorkflowType.transaction) {
const txnInfo: TransactionInfo | undefined = this.transactionInfoMap.get(nameArr[2]);
if (!txnInfo) {
this.logger.error(`Cannot find transaction info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.transaction(txnInfo.transaction, ...args);
};
} else if (nameArr[1] === TempWorkflowType.external) {
Expand All @@ -649,21 +646,18 @@ export class DBOSExecutor {
this.logger.error(`Cannot find communicator info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.external(commInfo.communicator, ...args);
};
} else if (nameArr[1] === TempWorkflowType.send) {
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return await ctxt.send<any>(args[0], args[1], args[2]);
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
return await ctxt.send(args[0] as string, args[1] as string, args[2] as string);
};
} else {
this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`)
throw new DBOSNotRegisteredError(wfName);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand Down Expand Up @@ -711,7 +705,7 @@ export class DBOSExecutor {
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const values: unknown[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
Expand All @@ -732,7 +726,6 @@ export class DBOSExecutor {
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
Expand Down
5 changes: 2 additions & 3 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import { DBOSKafka } from '../kafka/kafka';
import { DBOSScheduler } from '../scheduler/scheduler';

interface ModuleExports {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
[key: string]: unknown;
}

export interface DBOSRuntimeConfig {
Expand Down Expand Up @@ -74,7 +73,7 @@ export class DBOSRuntime {
let exports: ModuleExports;
if (fs.existsSync(operations)) {
const operationsURL = pathToFileURL(operations).href;
exports = (await import(operationsURL)) as Promise<ModuleExports>;
exports = (await import(operationsURL)) as ModuleExports;
} else {
throw new DBOSFailLoadOperationsError(`Failed to load operations from the entrypoint ${entrypoint}`);
}
Expand Down
32 changes: 15 additions & 17 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { transaction_outputs } from "../../schemas/user_db_schema";
import { Transaction, TransactionContextImpl } from "../transaction";
import { Communicator } from "../communicator";
Expand Down Expand Up @@ -51,16 +50,15 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon

invoke<T extends object>(object: T): WFInvokeFuncs<T> {
const ops = getRegisteredOperations(object);
const proxy: Record<string, unknown> = {};

const proxy: any = {};
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.transaction(op.registeredFunction as Transaction<any[], any>, ...args)
?
(...args: unknown[]) => this.transaction(op.registeredFunction as Transaction<unknown>, ...args)
: op.commConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.external(op.registeredFunction as Communicator<any[], any>, ...args)
?
(...args: unknown[]) => this.external(op.registeredFunction as Communicator<unknown>, ...args)
: undefined;
}
return proxy as WFInvokeFuncs<T>;
Expand Down Expand Up @@ -99,7 +97,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
* Execute a transactional function in debug mode.
* If a debug proxy is provided, it connects to a debug proxy and everything should be read-only.
*/
async transaction<T extends any[], R>(txn: Transaction<T, R>, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, ...args: unknown[]): Promise<R> {
const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name);
if (txnInfo === undefined) {
throw new DBOSDebuggerError(`Transaction ${txn.name} not registered!`);
Expand Down Expand Up @@ -168,7 +166,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check.output; // Always return the recorded result.
}

async external<T extends any[], R>(commFn: Communicator<T, R>, ..._args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, ..._args: unknown[]): Promise<R> {
const commConfig = this.#dbosExec.communicatorInfoMap.get(commFn.name);
if (commConfig === undefined) {
throw new DBOSDebuggerError(`Communicator ${commFn.name} not registered!`);
Expand All @@ -187,22 +185,22 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}

// Invoke the debugWorkflow() function instead.
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async startChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
async invokeChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async childWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

async send<T extends NonNullable<any>>(_destinationUUID: string, _message: T, _topic?: string | undefined): Promise<void> {
async send(_destinationUUID: string, _message: NonNullable<unknown>, _topic?: string | undefined): Promise<void> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -214,7 +212,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return;
}

async recv<T extends NonNullable<any>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
async recv<T extends NonNullable<unknown>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -226,7 +224,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check as T | null;
}

async setEvent<T extends NonNullable<any>>(_key: string, _value: T): Promise<void> {
async setEvent(_key: string, _value: NonNullable<unknown>): Promise<void> {
const functionID: number = this.functionIDGetIncrement();
// Original result must exist during replay.
const check: undefined | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<undefined>(this.workflowUUID, functionID);
Expand All @@ -236,7 +234,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
this.logger.debug("Use recorded setEvent output.");
}

async getEvent<T extends NonNullable<any>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
async getEvent<T extends NonNullable<unknown>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand Down
Loading