Skip to content

Commit

Permalink
Reduce usage of "any" (#448)
Browse files Browse the repository at this point in the history
This PR aims to reduce the usage of `any` in the codebase

---------

Co-authored-by: Demetris Manikas <dmanikas@admin.grnet.gr>
  • Loading branch information
demetris-manikas and Demetris Manikas authored May 22, 2024
1 parent 8e9d254 commit 8f1b5d4
Show file tree
Hide file tree
Showing 14 changed files with 128 additions and 160 deletions.
4 changes: 2 additions & 2 deletions src/communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { WorkflowContextImpl } from "./workflow";
import { DBOSContext, DBOSContextImpl } from "./context";
import { WorkflowContextDebug } from "./debugger/debug_workflow";

/* eslint-disable @typescript-eslint/no-explicit-any */
export type Communicator<T extends any[], R> = (ctxt: CommunicatorContext, ...args: T) => Promise<R>;
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
export type Communicator<R> = (ctxt: CommunicatorContext, ...args: any[]) => Promise<R>;

export interface CommunicatorConfig {
retriesAllowed?: boolean; // Should failures be retried? (default true)
Expand Down
67 changes: 30 additions & 37 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import {
import {
DBOSError,
DBOSInitializationError,
DBOSWorkflowConflictUUIDError,
Expand Down Expand Up @@ -68,17 +67,17 @@ export interface DBOSConfig {
}

interface WorkflowInfo {
workflow: Workflow<any, any>;
workflow: Workflow<unknown>;
config: WorkflowConfig;
}

interface TransactionInfo {
transaction: Transaction<any, any>;
transaction: Transaction<unknown>;
config: TransactionConfig;
}

interface CommunicatorInfo {
communicator: Communicator<any, any>;
communicator: Communicator<unknown>;
config: CommunicatorConfig;
}

Expand Down Expand Up @@ -258,15 +257,15 @@ export class DBOSExecutor {
this.registeredOperations.push(...registeredClassOperations);
for (const ro of registeredClassOperations) {
if (ro.workflowConfig) {
const wf = ro.registeredFunction as Workflow<any, any>;
const wf = ro.registeredFunction as Workflow<unknown>;
this.#registerWorkflow(wf, {...ro.workflowConfig});
this.logger.debug(`Registered workflow ${ro.name}`);
} else if (ro.txnConfig) {
const tx = ro.registeredFunction as Transaction<any, any>;
const tx = ro.registeredFunction as Transaction<unknown>;
this.#registerTransaction(tx, ro.txnConfig);
this.logger.debug(`Registered transaction ${ro.name}`);
} else if (ro.commConfig) {
const comm = ro.registeredFunction as Communicator<any, any>;
const comm = ro.registeredFunction as Communicator<unknown>;
this.#registerCommunicator(comm, ro.commConfig);
this.logger.debug(`Registered communicator ${ro.name}`);
}
Expand Down Expand Up @@ -347,7 +346,7 @@ export class DBOSExecutor {

/* WORKFLOW OPERATIONS */

#registerWorkflow<T extends any[], R>(wf: Workflow<T, R>, config: WorkflowConfig = {}) {
#registerWorkflow<R>(wf: Workflow<R>, config: WorkflowConfig = {}) {
if (wf.name === DBOSExecutor.tempWorkflowName || this.workflowInfoMap.has(wf.name)) {
throw new DBOSError(`Repeated workflow name: ${wf.name}`);
}
Expand All @@ -358,7 +357,7 @@ export class DBOSExecutor {
this.workflowInfoMap.set(wf.name, workflowInfo);
}

#registerTransaction<T extends any[], R>(txn: Transaction<T, R>, params: TransactionConfig = {}) {
#registerTransaction<R>(txn: Transaction<R>, params: TransactionConfig = {}) {
if (this.transactionInfoMap.has(txn.name)) {
throw new DBOSError(`Repeated Transaction name: ${txn.name}`);
}
Expand All @@ -369,7 +368,7 @@ export class DBOSExecutor {
this.transactionInfoMap.set(txn.name, txnInfo);
}

#registerCommunicator<T extends any[], R>(comm: Communicator<T, R>, params: CommunicatorConfig = {}) {
#registerCommunicator<R>(comm: Communicator<R>, params: CommunicatorConfig = {}) {
if (this.communicatorInfoMap.has(comm.name)) {
throw new DBOSError(`Repeated Commmunicator name: ${comm.name}`);
}
Expand All @@ -380,15 +379,15 @@ export class DBOSExecutor {
this.communicatorInfoMap.set(comm.name, commInfo);
}

async workflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, ...args: T): Promise<WorkflowHandle<R>> {
async workflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, ...args: unknown[]): Promise<WorkflowHandle<R>> {
if (this.debugMode) {
return this.debugWorkflow(wf, params, undefined, undefined, ...args);
}
return this.internalWorkflow(wf, params, undefined, undefined, ...args);
}

// If callerUUID and functionID are set, it means the workflow is invoked from within a workflow.
async internalWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async internalWorkflow<R>(wf: Workflow<R>, params: InternalWorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const workflowUUID: string = params.workflowUUID ? params.workflowUUID : this.#generateUUID();
const presetUUID: boolean = params.workflowUUID ? true : false;

Expand Down Expand Up @@ -494,7 +493,7 @@ export class DBOSExecutor {
/**
* DEBUG MODE workflow execution, skipping all the recording
*/
async debugWorkflow<T extends any[], R>(wf: Workflow<T, R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: T): Promise<WorkflowHandle<R>> {
async debugWorkflow<R>(wf: Workflow<R>, params: WorkflowParams, callerUUID?: string, callerFunctionID?: number, ...args: unknown[]): Promise<WorkflowHandle<R>> {
// In debug mode, we must have a specific workflow UUID.
if (!params.workflowUUID) {
throw new DBOSDebuggerError("Workflow UUID not found!");
Expand Down Expand Up @@ -535,28 +534,28 @@ export class DBOSExecutor {
return new InvokedHandle(this.systemDatabase, workflowPromise, workflowUUID, wf.name, callerUUID, callerFunctionID);
}

async transaction<T extends any[], R>(txn: Transaction<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, params: WorkflowParams, ...args: unknown[] ): Promise<R> {
// Create a workflow and call transaction.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.transaction(txn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
return (await this.workflow<R>(temp_workflow, { ...params, tempWfType: TempWorkflowType.transaction, tempWfName: txn.name }, ...args)).getResult();
}

async external<T extends any[], R>(commFn: Communicator<T, R>, params: WorkflowParams, ...args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, params: WorkflowParams, ...args: unknown[]): Promise<R> {
// Create a workflow and call external.
const temp_workflow = async (ctxt: WorkflowContext, ...args: T) => {
const temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
return await ctxtImpl.external(commFn, ...args);
};
return (await this.workflow(temp_workflow, { ...params, tempWfType: TempWorkflowType.external, tempWfName: commFn.name }, ...args)).getResult();
}

async send<T extends NonNullable<any>>(destinationUUID: string, message: T, topic?: string, idempotencyKey?: string): Promise<void> {
async send(destinationUUID: string, message: NonNullable<unknown>, topic?: string, idempotencyKey?: string): Promise<void> {
// Create a workflow and call send.
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: T, topic?: string) => {
return await ctxt.send<T>(destinationUUID, message, topic);
const temp_workflow = async (ctxt: WorkflowContext, destinationUUID: string, message: NonNullable<unknown>, topic?: string) => {
return await ctxt.send(destinationUUID, message, topic);
};
const workflowUUID = idempotencyKey ? destinationUUID + idempotencyKey : undefined;
return (await this.workflow(temp_workflow, { workflowUUID: workflowUUID, tempWfType: TempWorkflowType.send }, destinationUUID, message, topic)).getResult();
Expand All @@ -565,7 +564,7 @@ export class DBOSExecutor {
/**
* Wait for a workflow to emit an event, then return its value.
*/
async getEvent<T extends NonNullable<any>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
async getEvent<T extends NonNullable<unknown>>(workflowUUID: string, key: string, timeoutSeconds: number = DBOSExecutor.defaultNotificationTimeoutSec): Promise<T | null> {
return this.systemDatabase.getEvent(workflowUUID, key, timeoutSeconds);
}

Expand All @@ -585,7 +584,7 @@ export class DBOSExecutor {
* A recovery process that by default runs during executor init time.
* It runs to completion all pending workflows that were executing when the previous executor failed.
*/
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<any>[]> {
async recoverPendingWorkflows(executorIDs: string[] = ["local"]): Promise<WorkflowHandle<unknown>[]> {
const pendingWorkflows: string[] = [];
for (const execID of executorIDs) {
if (execID == "local" && process.env.DBOS__VMID) {
Expand All @@ -597,7 +596,7 @@ export class DBOSExecutor {
pendingWorkflows.push(...wIDs);
}

const handlerArray: WorkflowHandle<any>[] = [];
const handlerArray: WorkflowHandle<unknown>[] = [];
for (const workflowUUID of pendingWorkflows) {
try {
handlerArray.push(await this.executeWorkflowUUID(workflowUUID));
Expand All @@ -620,7 +619,6 @@ export class DBOSExecutor {
const wfInfo: WorkflowInfo | undefined = this.workflowInfoMap.get(wfStatus.workflowName);

if (wfInfo) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(wfInfo.workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand All @@ -631,16 +629,15 @@ export class DBOSExecutor {
throw new DBOSError(`This should never happen! Cannot find workflow info for a non-temporary workflow! UUID ${workflowUUID}, name ${wfName}`);
}

let temp_workflow: Workflow<any, any>;
let temp_workflow: Workflow<unknown>;
if (nameArr[1] === TempWorkflowType.transaction) {
const txnInfo: TransactionInfo | undefined = this.transactionInfoMap.get(nameArr[2]);
if (!txnInfo) {
this.logger.error(`Cannot find transaction info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.transaction(txnInfo.transaction, ...args);
};
} else if (nameArr[1] === TempWorkflowType.external) {
Expand All @@ -649,21 +646,18 @@ export class DBOSExecutor {
this.logger.error(`Cannot find communicator info for UUID ${workflowUUID}, name ${nameArr[2]}`);
throw new DBOSNotRegisteredError(nameArr[2]);
}
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
const ctxtImpl = ctxt as WorkflowContextImpl;
// eslint-disable-next-line @typescript-eslint/no-unsafe-return, @typescript-eslint/no-unsafe-argument
return await ctxtImpl.external(commInfo.communicator, ...args);
};
} else if (nameArr[1] === TempWorkflowType.send) {
temp_workflow = async (ctxt: WorkflowContext, ...args: any[]) => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return await ctxt.send<any>(args[0], args[1], args[2]);
temp_workflow = async (ctxt: WorkflowContext, ...args: unknown[]) => {
return await ctxt.send(args[0] as string, args[1] as string, args[2] as string);
};
} else {
this.logger.error(`Unrecognized temporary workflow! UUID ${workflowUUID}, name ${wfName}`)
throw new DBOSNotRegisteredError(wfName);
}
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
return this.workflow(temp_workflow, { workflowUUID: workflowUUID, parentCtx: parentCtx ?? undefined }, ...inputs);
}

Expand Down Expand Up @@ -711,7 +705,7 @@ export class DBOSExecutor {
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot, created_at) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const values: unknown[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
Expand All @@ -732,7 +726,6 @@ export class DBOSExecutor {
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
Expand Down
5 changes: 2 additions & 3 deletions src/dbos-runtime/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ import { DBOSKafka } from '../kafka/kafka';
import { DBOSScheduler } from '../scheduler/scheduler';

interface ModuleExports {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
[key: string]: any;
[key: string]: unknown;
}

export interface DBOSRuntimeConfig {
Expand Down Expand Up @@ -74,7 +73,7 @@ export class DBOSRuntime {
let exports: ModuleExports;
if (fs.existsSync(operations)) {
const operationsURL = pathToFileURL(operations).href;
exports = (await import(operationsURL)) as Promise<ModuleExports>;
exports = (await import(operationsURL)) as ModuleExports;
} else {
throw new DBOSFailLoadOperationsError(`Failed to load operations from the entrypoint ${entrypoint}`);
}
Expand Down
32 changes: 15 additions & 17 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { transaction_outputs } from "../../schemas/user_db_schema";
import { Transaction, TransactionContextImpl } from "../transaction";
import { Communicator } from "../communicator";
Expand Down Expand Up @@ -51,16 +50,15 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon

invoke<T extends object>(object: T): WFInvokeFuncs<T> {
const ops = getRegisteredOperations(object);
const proxy: Record<string, unknown> = {};

const proxy: any = {};
for (const op of ops) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
proxy[op.name] = op.txnConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.transaction(op.registeredFunction as Transaction<any[], any>, ...args)
?
(...args: unknown[]) => this.transaction(op.registeredFunction as Transaction<unknown>, ...args)
: op.commConfig
? // eslint-disable-next-line @typescript-eslint/no-unsafe-argument
(...args: any[]) => this.external(op.registeredFunction as Communicator<any[], any>, ...args)
?
(...args: unknown[]) => this.external(op.registeredFunction as Communicator<unknown>, ...args)
: undefined;
}
return proxy as WFInvokeFuncs<T>;
Expand Down Expand Up @@ -99,7 +97,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
* Execute a transactional function in debug mode.
* If a debug proxy is provided, it connects to a debug proxy and everything should be read-only.
*/
async transaction<T extends any[], R>(txn: Transaction<T, R>, ...args: T): Promise<R> {
async transaction<R>(txn: Transaction<R>, ...args: unknown[]): Promise<R> {
const txnInfo = this.#dbosExec.transactionInfoMap.get(txn.name);
if (txnInfo === undefined) {
throw new DBOSDebuggerError(`Transaction ${txn.name} not registered!`);
Expand Down Expand Up @@ -168,7 +166,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check.output; // Always return the recorded result.
}

async external<T extends any[], R>(commFn: Communicator<T, R>, ..._args: T): Promise<R> {
async external<R>(commFn: Communicator<R>, ..._args: unknown[]): Promise<R> {
const commConfig = this.#dbosExec.communicatorInfoMap.get(commFn.name);
if (commConfig === undefined) {
throw new DBOSDebuggerError(`Communicator ${commFn.name} not registered!`);
Expand All @@ -187,22 +185,22 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}

// Invoke the debugWorkflow() function instead.
async startChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async startChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
const funcId = this.functionIDGetIncrement();
const childUUID: string = this.workflowUUID + "-" + funcId;
return this.#dbosExec.debugWorkflow(wf, { parentCtx: this, workflowUUID: childUUID }, this.workflowUUID, funcId, ...args);
}

async invokeChildWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<R> {
async invokeChildWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<R> {
return this.startChildWorkflow(wf, ...args).then((handle) => handle.getResult());
}

// Deprecated
async childWorkflow<T extends any[], R>(wf: Workflow<T, R>, ...args: T): Promise<WorkflowHandle<R>> {
async childWorkflow<R>(wf: Workflow<R>, ...args: unknown[]): Promise<WorkflowHandle<R>> {
return this.startChildWorkflow(wf, ...args);
}

async send<T extends NonNullable<any>>(_destinationUUID: string, _message: T, _topic?: string | undefined): Promise<void> {
async send(_destinationUUID: string, _message: NonNullable<unknown>, _topic?: string | undefined): Promise<void> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -214,7 +212,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return;
}

async recv<T extends NonNullable<any>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
async recv<T extends NonNullable<unknown>>(_topic?: string | undefined, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand All @@ -226,7 +224,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
return check as T | null;
}

async setEvent<T extends NonNullable<any>>(_key: string, _value: T): Promise<void> {
async setEvent(_key: string, _value: NonNullable<unknown>): Promise<void> {
const functionID: number = this.functionIDGetIncrement();
// Original result must exist during replay.
const check: undefined | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<undefined>(this.workflowUUID, functionID);
Expand All @@ -236,7 +234,7 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
this.logger.debug("Use recorded setEvent output.");
}

async getEvent<T extends NonNullable<any>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
async getEvent<T extends NonNullable<unknown>>(_workflowUUID: string, _key: string, _timeoutSeconds?: number | undefined): Promise<T | null> {
const functionID: number = this.functionIDGetIncrement();

// Original result must exist during replay.
Expand Down
Loading

0 comments on commit 8f1b5d4

Please sign in to comment.