diff --git a/.eslintignore b/.eslintignore index c73a9699..93d966f5 100644 --- a/.eslintignore +++ b/.eslintignore @@ -1,4 +1,5 @@ **/node_modules build/ test/data/esm_* +test/data/plugins docs/ diff --git a/docs/generated/api.json b/docs/generated/api.json index e37b3605..3180b5a5 100644 --- a/docs/generated/api.json +++ b/docs/generated/api.json @@ -1913,6 +1913,88 @@ "endIndex": 2 } }, + { + "kind": "PropertySignature", + "canonicalReference": "@openfunction/functions-framework!OpenFunctionContext#postPlugins:member", + "docComment": "/**\n * Optional post function exec plugins.\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "postPlugins?: " + }, + { + "kind": "Reference", + "text": "Array", + "canonicalReference": "!Array:interface" + }, + { + "kind": "Content", + "text": "" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isReadonly": false, + "isOptional": true, + "releaseTag": "Public", + "name": "postPlugins", + "propertyTypeTokenRange": { + "startIndex": 1, + "endIndex": 5 + } + }, + { + "kind": "PropertySignature", + "canonicalReference": "@openfunction/functions-framework!OpenFunctionContext#prePlugins:member", + "docComment": "/**\n * Optional pre function exec plugins.\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "prePlugins?: " + }, + { + "kind": "Reference", + "text": "Array", + "canonicalReference": "!Array:interface" + }, + { + "kind": "Content", + "text": "" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isReadonly": false, + "isOptional": true, + "releaseTag": "Public", + "name": "prePlugins", + "propertyTypeTokenRange": { + "startIndex": 1, + "endIndex": 5 + } + }, { "kind": "PropertySignature", "canonicalReference": "@openfunction/functions-framework!OpenFunctionContext#runtime:member", @@ -2489,6 +2571,234 @@ ], "implementsTokenRanges": [] }, + { + "kind": "Class", + "canonicalReference": "@openfunction/functions-framework!Plugin_2:class", + "docComment": "/**\n * The OpenFunction's plugin template.\n *\n * @public\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "export declare class Plugin " + } + ], + "releaseTag": "Public", + "name": "Plugin_2", + "preserveMemberOrder": false, + "members": [ + { + "kind": "Method", + "canonicalReference": "@openfunction/functions-framework!Plugin_2#execPostHook:member(1)", + "docComment": "/**\n * post main function exec.\n *\n * @param ctx - The openfunction runtime .\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "execPostHook(ctx?: " + }, + { + "kind": "Reference", + "text": "OpenFunctionRuntime", + "canonicalReference": "@openfunction/functions-framework!OpenFunctionRuntime:class" + }, + { + "kind": "Content", + "text": "): " + }, + { + "kind": "Reference", + "text": "Promise", + "canonicalReference": "!Promise:interface" + }, + { + "kind": "Content", + "text": "" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isStatic": false, + "returnTypeTokenRange": { + "startIndex": 3, + "endIndex": 5 + }, + "releaseTag": "Public", + "isProtected": false, + "overloadIndex": 1, + "parameters": [ + { + "parameterName": "ctx", + "parameterTypeTokenRange": { + "startIndex": 1, + "endIndex": 2 + }, + "isOptional": true + } + ], + "isOptional": false, + "name": "execPostHook" + }, + { + "kind": "Method", + "canonicalReference": "@openfunction/functions-framework!Plugin_2#execPreHook:member(1)", + "docComment": "/**\n * pre main function exec.\n *\n * @param ctx - The openfunction runtime .\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "execPreHook(ctx?: " + }, + { + "kind": "Reference", + "text": "OpenFunctionRuntime", + "canonicalReference": "@openfunction/functions-framework!OpenFunctionRuntime:class" + }, + { + "kind": "Content", + "text": "): " + }, + { + "kind": "Reference", + "text": "Promise", + "canonicalReference": "!Promise:interface" + }, + { + "kind": "Content", + "text": "" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isStatic": false, + "returnTypeTokenRange": { + "startIndex": 3, + "endIndex": 5 + }, + "releaseTag": "Public", + "isProtected": false, + "overloadIndex": 1, + "parameters": [ + { + "parameterName": "ctx", + "parameterTypeTokenRange": { + "startIndex": 1, + "endIndex": 2 + }, + "isOptional": true + } + ], + "isOptional": false, + "name": "execPreHook" + }, + { + "kind": "Method", + "canonicalReference": "@openfunction/functions-framework!Plugin_2#get:member(1)", + "docComment": "/**\n * get instance filed value.\n *\n * @param filedName - the instace filedName\n *\n * @returns filed value.\n */\n", + "excerptTokens": [ + { + "kind": "Content", + "text": "get(filedName: " + }, + { + "kind": "Content", + "text": "string" + }, + { + "kind": "Content", + "text": "): " + }, + { + "kind": "Content", + "text": "string" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isStatic": false, + "returnTypeTokenRange": { + "startIndex": 3, + "endIndex": 4 + }, + "releaseTag": "Public", + "isProtected": false, + "overloadIndex": 1, + "parameters": [ + { + "parameterName": "filedName", + "parameterTypeTokenRange": { + "startIndex": 1, + "endIndex": 2 + }, + "isOptional": false + } + ], + "isOptional": false, + "name": "get" + }, + { + "kind": "Property", + "canonicalReference": "@openfunction/functions-framework!Plugin_2.OFN_PLUGIN_NAME:member", + "docComment": "", + "excerptTokens": [ + { + "kind": "Content", + "text": "static OFN_PLUGIN_NAME: " + }, + { + "kind": "Content", + "text": "string" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isReadonly": false, + "isOptional": false, + "releaseTag": "Public", + "name": "OFN_PLUGIN_NAME", + "propertyTypeTokenRange": { + "startIndex": 1, + "endIndex": 2 + }, + "isStatic": true, + "isProtected": false + }, + { + "kind": "Property", + "canonicalReference": "@openfunction/functions-framework!Plugin_2.OFN_PLUGIN_VERSION:member", + "docComment": "", + "excerptTokens": [ + { + "kind": "Content", + "text": "static OFN_PLUGIN_VERSION: " + }, + { + "kind": "Content", + "text": "string" + }, + { + "kind": "Content", + "text": ";" + } + ], + "isReadonly": false, + "isOptional": false, + "releaseTag": "Public", + "name": "OFN_PLUGIN_VERSION", + "propertyTypeTokenRange": { + "startIndex": 1, + "endIndex": 2 + }, + "isStatic": true, + "isProtected": false + } + ], + "implementsTokenRanges": [] + }, { "kind": "Interface", "canonicalReference": "@openfunction/functions-framework!Request_2:interface", diff --git a/docs/generated/api.md b/docs/generated/api.md index 4d25f565..e084c92b 100644 --- a/docs/generated/api.md +++ b/docs/generated/api.md @@ -136,6 +136,8 @@ export interface OpenFunctionContext { name: string; outputs?: OpenFunctionBinding; port?: string; + postPlugins?: Array; + prePlugins?: Array; runtime: `${RuntimeType}` | `${Capitalize}` | `${Uppercase}`; version: string; } @@ -158,6 +160,18 @@ export abstract class OpenFunctionRuntime { protected trigger?: OpenFunctionTrigger; } +// @public +class Plugin_2 { + execPostHook(ctx?: OpenFunctionRuntime): Promise; + execPreHook(ctx?: OpenFunctionRuntime): Promise; + get(filedName: string): string; + // (undocumented) + static OFN_PLUGIN_NAME: string; + // (undocumented) + static OFN_PLUGIN_VERSION: string; +} +export { Plugin_2 as Plugin } + // @public (undocumented) interface Request_2 extends Request_3 { rawBody?: Buffer; diff --git a/src/loader.ts b/src/loader.ts index 99550fe2..5e943c5f 100644 --- a/src/loader.ts +++ b/src/loader.ts @@ -21,10 +21,14 @@ import * as path from 'path'; import * as semver from 'semver'; import * as readPkgUp from 'read-pkg-up'; +import * as fs from 'fs'; import {pathToFileURL} from 'url'; -import {HandlerFunction} from './functions'; +import {HandlerFunction, OpenFunctionRuntime} from './functions'; import {SignatureType} from './types'; import {getRegisteredFunction} from './function_registry'; +import {Plugin} from './openfunction/function_context'; +import {FrameworkOptions} from './options'; +import {forEach} from 'lodash'; // Dynamic import function required to load user code packaged as an // ES module is only available on Node.js v13.2.0 and up. @@ -92,6 +96,7 @@ export async function getUserFunction( } | null> { try { const functionModulePath = getFunctionModulePath(codeLocation); + if (functionModulePath === null) { console.error('Provided code is not a loadable module.'); return null; @@ -193,3 +198,171 @@ function getFunctionModulePath(codeLocation: string): string | null { } return path; } + +type PluginClass = Record; +/** + * Returns user's plugin from function file. + * Returns null if plugin can't be retrieved. + * @return User's plugins or null. + */ +export async function getUserPlugins( + options: FrameworkOptions +): Promise { + // Get plugin set + const pluginSet: Set = new Set(); + if (options.context) { + if (options.context.prePlugins) { + forEach(options.context.prePlugins, plugin => { + typeof plugin === 'string' && pluginSet.add(plugin); + }); + } + if (options.context.postPlugins) { + forEach(options.context.postPlugins, plugin => { + typeof plugin === 'string' && pluginSet.add(plugin); + }); + } + + try { + type Instance = Record; + // Load plugin js files + const instances: Instance = {}; + + const pluginFiles = getPluginFiles(options.sourceLocation); + if (pluginFiles === null) { + console.warn('[warn-!!!] user plugins files load failed '); + options.context.prePlugins = []; + options.context.postPlugins = []; + return options; + } + + // Find plugins class + const tempMap: PluginClass = {}; + for (const pluginFile of pluginFiles) { + const jsMoulde = require(pluginFile); + processJsModule(jsMoulde, tempMap); + } + + // Instance plugin dynamic set ofn_plugin_name + const pluginNames = Array.from(pluginSet.values()); + for (const name of pluginNames) { + const module = tempMap[name]; + if (module) { + const instance = new module(); + instance[Plugin.OFN_PLUGIN_NAME] = module.Name; + instance[Plugin.OFN_PLUGIN_VERSION] = module.Version || 'v1'; + + //Set default method of pre post get + if (!instance.execPreHook) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + instance.execPreHook = (ctx: OpenFunctionRuntime) => { + console.log( + `This plugin ${name} method execPreHook is not implemented.` + ); + }; + } + if (!instance.execPostHook) { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + instance.execPostHook = (ctx: OpenFunctionRuntime) => { + console.log( + `This plugin ${name} method execPostHook is not implemented.` + ); + }; + } + if (!instance.get) { + instance.get = (filedName: string) => { + for (const key in instance) { + if (key === filedName) { + return instance[key]; + } + } + }; + } + instances[name] = instance as Plugin; + } + } + + const prePlugins: Array = []; + const postPlugins: Array = []; + if (options.context.prePlugins) { + forEach(options.context.prePlugins, plugin => { + if (typeof plugin === 'string') { + const instance = instances[plugin]; + typeof instance === 'object' && prePlugins.push(instance); + } + }); + } + if (options.context.postPlugins) { + forEach(options.context.postPlugins, plugin => { + if (typeof plugin === 'string') { + const instance = instances[plugin]; + typeof instance === 'object' && postPlugins.push(instance); + } + }); + } + + options.context.prePlugins = prePlugins; + options.context.postPlugins = postPlugins; + } catch (error) { + console.error('load plugins error reason: \n'); + console.error(error); + } + } + return options; +} + +/** + * Returns resolved path to the dir containing the user plugins. + * Returns null if the path is not exits + * @param codeLocation Directory with user's code. + * @return Resolved path or null. + */ +function getPluginFiles(codeLocation: string): Array | null { + const pluginFiles: Array = []; + try { + const param = path.resolve(codeLocation + '/plugins'); + const files = fs.readdirSync(param); + + for (const file of files) { + pluginFiles.push(require.resolve(path.join(param, file))); + } + } catch (ex) { + const err: Error = ex; + console.error(err.message); + return null; + } + return pluginFiles; +} +/** + * Returns rdetermine whether it is a class. + * Returns boolean is can be class + * @param obj jsmodule. + * @return boolean of it is a class. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function couldBeClass(obj: any): boolean { + return ( + typeof obj === 'function' && + obj.prototype !== undefined && + obj.prototype.constructor === obj && + obj.toString().slice(0, 5) === 'class' + ); +} + +/** + * Process jsMoulde if it can be a plugin class put it into tempMap. + * @param obj jsmodule. + * @param tempMap PluginClass. + */ +// eslint-disable-next-line @typescript-eslint/no-explicit-any +function processJsModule(obj: any, tempMap: PluginClass) { + if (typeof obj === 'object') { + for (const o in obj) { + if (couldBeClass(obj[o]) && obj[o].Name) { + tempMap[obj[o].Name] = obj[o]; + } + } + } + if (couldBeClass(obj) && obj.Name) { + tempMap[obj.Name] = obj; + } +} diff --git a/src/main.ts b/src/main.ts index bee7a1c1..3cc5f864 100644 --- a/src/main.ts +++ b/src/main.ts @@ -26,7 +26,7 @@ import { ContextUtils, } from './openfunction/function_context'; -import {getUserFunction} from './loader'; +import {getUserFunction, getUserPlugins} from './loader'; import {ErrorHandler} from './invoker'; import {getServer} from './server'; import {parseOptions, helpText, OptionsError} from './options'; @@ -69,6 +69,9 @@ export const main = async () => { // DaprServer uses httpTerminator in server.stop() handleShutdown(async () => await server.stop()); + + // Load Plugins + await getUserPlugins(options); } // Then taking sync runtime as the fallback else { diff --git a/src/openfunction/async_server.ts b/src/openfunction/async_server.ts index 358b9ae9..6107f24e 100644 --- a/src/openfunction/async_server.ts +++ b/src/openfunction/async_server.ts @@ -1,4 +1,4 @@ -import {forEach} from 'lodash'; +import {forEach, invoke} from 'lodash'; import {DaprServer} from '@dapr/dapr'; import {OpenFunction} from '../functions'; @@ -23,7 +23,24 @@ export default function ( const ctx = OpenFunctionRuntime.ProxyContext(context); const wrapper = async (data: object) => { + // Exec pre hooks + console.log(context.prePlugins); + if (context.prePlugins) { + await context.prePlugins.reduce(async (_, current) => { + await invoke(current, 'execPreHook', ctx); + return []; + }, Promise.resolve([])); + } + await userFunction(ctx, data); + + // Exec post hooks + if (context.postPlugins) { + await context.postPlugins.reduce(async (_, current) => { + await invoke(current, 'execPostHook', ctx); + return []; + }, Promise.resolve([])); + } }; // Initialize the server with the user's function. diff --git a/src/openfunction/function_context.ts b/src/openfunction/function_context.ts index 450d7c07..fbf4dd23 100644 --- a/src/openfunction/function_context.ts +++ b/src/openfunction/function_context.ts @@ -1,3 +1,5 @@ +import {OpenFunctionRuntime} from './function_runtime'; + /** * The OpenFunction's serving context. * @public @@ -30,6 +32,14 @@ export interface OpenFunctionContext { * Optional output binding object. */ outputs?: OpenFunctionBinding; + /** + * Optional pre function exec plugins. + */ + prePlugins?: Array; + /** + * Optional post function exec plugins. + */ + postPlugins?: Array; } /** @@ -139,3 +149,44 @@ export class ContextUtils { return component?.componentType.split('.')[0] === ComponentType.PubSub; } } + +/** + * The OpenFunction's plugin template. + * @public + */ +export class Plugin { + static OFN_PLUGIN_NAME = 'ofn_plugin_name'; + static OFN_PLUGIN_VERSION = 'ofn_plugin_version'; + /** + * pre main function exec. + * @param ctx - The openfunction runtime . + */ + // eslint-disable-next-line @typescript-eslint/no-unused-vars + public async execPreHook(ctx?: OpenFunctionRuntime) { + console.log( + `This plugin ${this.get( + Plugin.OFN_PLUGIN_NAME + )} method execPreHook is not implemented.` + ); + } + /** + * post main function exec. + * @param ctx - The openfunction runtime . + */ + // eslint-disable-next-line @typescript-eslint/no-unused-vars + public async execPostHook(ctx?: OpenFunctionRuntime) { + console.log( + `This plugin ${this.get( + Plugin.OFN_PLUGIN_NAME + )} method execPostHook is not implemented.` + ); + } + /** + * get instance filed value. + * @param filedName - the instace filedName + * @returns filed value. + */ + public get(filedName: string) { + return filedName; + } +} diff --git a/test/data/plugins/errorMissAll.js b/test/data/plugins/errorMissAll.js new file mode 100644 index 00000000..413b0248 --- /dev/null +++ b/test/data/plugins/errorMissAll.js @@ -0,0 +1,6 @@ +class ErrorPlugin{ + static Version = "v1"; + static Name = "error-miss-all-plugin"; +} + +module.exports = ErrorPlugin; diff --git a/test/data/plugins/errorMissName.js b/test/data/plugins/errorMissName.js new file mode 100644 index 00000000..c1266036 --- /dev/null +++ b/test/data/plugins/errorMissName.js @@ -0,0 +1,22 @@ +class ErrorPlugin{ + static Version = "v1"; + // static Name = "error-plugin" + constructor(){ + console.log(`init error plugins`); + } + async execPreHook(ctx){ + console.log(`-----------error plugin pre hook-----------`); + } + execPostHook(ctx){ + console.log(`-----------error plugin post hook-----------`); + } + get(filedName){ + for(let key in this){ + if(key === filedName){ + return this[key]; + } + } + } +} + +module.exports = {ErrorPlugin}; diff --git a/test/data/plugins/errorMissVersion.js b/test/data/plugins/errorMissVersion.js new file mode 100644 index 00000000..aaf3087c --- /dev/null +++ b/test/data/plugins/errorMissVersion.js @@ -0,0 +1,22 @@ +class ErrorPlugin{ + // static Version = "v1" + static Name = "error-miss-version-plugin" + constructor(){ + console.log(`init error plugins`); + } + async execPreHook(ctx){ + console.log(`-----------error plugin pre hook-----------`); + } + execPostHook(ctx){ + console.log(`-----------error plugin post hook-----------`); + } + get(filedName){ + for(let key in this){ + if(key === filedName){ + return this[key]; + } + } + } +} + +module.exports = {ErrorPlugin}; diff --git a/test/data/plugins/plugindemo.js b/test/data/plugins/plugindemo.js new file mode 100644 index 00000000..6fd4ea12 --- /dev/null +++ b/test/data/plugins/plugindemo.js @@ -0,0 +1,32 @@ +function sleep(){ + return new Promise(resolve => setTimeout(resolve,3000)); +} +class DemoPlugin{ + static Version = "v1"; + static Name = "demo-plugin"; + id = '666'; + constructor(){ + console.log(`init demo plugins`); + } + async execPreHook(ctx){ + console.log(`-----------demo plugin pre hook-----------`); + ctx['pre'] = 'pre-exec'; + await sleep(); + console.log(`-----------pre sleep 3----------`) + } + async execPostHook(ctx){ + console.log(`-----------demo plugin post hook-----------`); + ctx['post'] = 'post-exec'; + console.log(`-----------send post-----------`); + } + get(filedName){ + for(let key in this){ + if(key === filedName){ + return this[key]; + } + } + } +} + +// module.exports = {DemoPlugin}; +exports.DemoPlugin = DemoPlugin; diff --git a/test/data/test_data/async_plugin.ts b/test/data/test_data/async_plugin.ts new file mode 100644 index 00000000..c281a314 --- /dev/null +++ b/test/data/test_data/async_plugin.ts @@ -0,0 +1,111 @@ +import {OpenFunctionContext} from '../../../src/openfunction/function_context'; +import {FrameworkOptions} from '../../../src/options'; + +export const TEST_CONTEXT: OpenFunctionContext = { + name: 'test-context', + version: '1.0.0', + runtime: 'Async', + port: '8080', + inputs: { + cron: { + uri: 'cron_input', + componentName: 'binding-cron', + componentType: 'bindings.cron', + }, + mqtt_binding: { + uri: 'default', + componentName: 'binding-mqtt', + componentType: 'bindings.mqtt', + }, + mqtt_sub: { + uri: 'webup', + componentName: 'pubsub-mqtt', + componentType: 'pubsub.mqtt', + }, + }, + outputs: { + cron: { + uri: 'cron_output', + operation: 'delete', + componentName: 'binding-cron', + componentType: 'bindings.cron', + }, + localfs: { + uri: 'localstorage', + operation: 'create', + componentName: 'binding-localfs', + componentType: 'bindings.localstorage', + metadata: { + fileName: 'output-file.txt', + }, + }, + mqtt_pub: { + uri: 'webup_pub', + componentName: 'pubsub-mqtt', + componentType: 'pubsub.mqtt', + }, + }, +}; +export const TEST_PLUGIN_OPTIONS: FrameworkOptions = { + port: '', + target: '', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'test-context-plugin', + version: '1.0.0', + runtime: 'Async', + port: '8080', + inputs: { + cron: { + uri: 'cron_input', + componentName: 'binding-cron', + componentType: 'bindings.cron', + }, + mqtt_binding: { + uri: 'default', + componentName: 'binding-mqtt', + componentType: 'bindings.mqtt', + }, + mqtt_sub: { + uri: 'webup', + componentName: 'pubsub-mqtt', + componentType: 'pubsub.mqtt', + }, + }, + outputs: { + cron: { + uri: 'cron_output', + operation: 'delete', + componentName: 'binding-cron', + componentType: 'bindings.cron', + }, + localfs: { + uri: 'localstorage', + operation: 'create', + componentName: 'binding-localfs', + componentType: 'bindings.localstorage', + metadata: { + fileName: 'output-file.txt', + }, + }, + mqtt_pub: { + uri: 'webup_pub', + componentName: 'pubsub-mqtt', + componentType: 'pubsub.mqtt', + }, + }, + prePlugins: ['demo-plugin'], + postPlugins: ['demo-plugin'], + }, +}; +export const TEST_PAYLOAD = {data: 'hello world'}; +export const TEST_CLOUD_EVENT = { + specversion: '1.0', + id: 'test-1234-1234', + type: 'ce.openfunction', + source: 'https://github.com/OpenFunction/functions-framework-nodejs', + traceparent: '00-65088630f09e0a5359677a7429456db7-97f23477fb2bf5ec-01', + data: TEST_PAYLOAD, +}; diff --git a/test/integration/async_server.ts b/test/integration/async_server.ts index 23d5a4c2..e9cddd53 100644 --- a/test/integration/async_server.ts +++ b/test/integration/async_server.ts @@ -6,72 +6,20 @@ import {get, isEmpty} from 'lodash'; import * as shell from 'shelljs'; import * as MQTT from 'aedes'; -import {OpenFunctionContext} from '../../src/openfunction/function_context'; import getAysncServer from '../../src/openfunction/async_server'; -const TEST_CONTEXT: OpenFunctionContext = { - name: 'test-context', - version: '1.0.0', - runtime: 'Async', - port: '8080', - inputs: { - cron: { - uri: 'cron_input', - componentName: 'binding-cron', - componentType: 'bindings.cron', - }, - mqtt_binding: { - uri: 'default', - componentName: 'binding-mqtt', - componentType: 'bindings.mqtt', - }, - mqtt_sub: { - uri: 'webup', - componentName: 'pubsub-mqtt', - componentType: 'pubsub.mqtt', - }, - }, - outputs: { - cron: { - uri: 'cron_output', - operation: 'delete', - componentName: 'binding-cron', - componentType: 'bindings.cron', - }, - localfs: { - uri: 'localstorage', - operation: 'create', - componentName: 'binding-localfs', - componentType: 'bindings.localstorage', - metadata: { - fileName: 'output-file.txt', - }, - }, - mqtt_pub: { - uri: 'webup_pub', - componentName: 'pubsub-mqtt', - componentType: 'pubsub.mqtt', - }, - }, -}; - -const TEST_PAYLOAD = {data: 'hello world'}; -const TEST_CLOUD_EVENT = { - specversion: '1.0', - id: 'test-1234-1234', - type: 'ce.openfunction', - source: 'https://github.com/OpenFunction/functions-framework-nodejs', - traceparent: '00-65088630f09e0a5359677a7429456db7-97f23477fb2bf5ec-01', - data: TEST_PAYLOAD, -}; +import { + TEST_CLOUD_EVENT, + TEST_CONTEXT, + TEST_PAYLOAD, +} from '../data/test_data/async_plugin'; describe('OpenFunction - Async - Binding', () => { const APPID = 'async.dapr'; const broker = MQTT.Server(); - + const server = createServer(broker.handle); before(done => { // Start simple plain MQTT server via aedes - const server = createServer(broker.handle); server.listen(1883, () => { // Try to run Dapr sidecar and listen for the async server shell.exec( @@ -90,7 +38,7 @@ describe('OpenFunction - Async - Binding', () => { shell.exec(`dapr stop ${APPID}`, { silent: true, }); - + server.close(); broker.close(done); }); diff --git a/test/integration/async_server_plugin.ts b/test/integration/async_server_plugin.ts new file mode 100644 index 00000000..c7e228ea --- /dev/null +++ b/test/integration/async_server_plugin.ts @@ -0,0 +1,99 @@ +/* eslint-disable no-restricted-properties */ +import {deepStrictEqual, ifError} from 'assert'; +import {createServer} from 'net'; + +import {get, isEmpty} from 'lodash'; +import * as shell from 'shelljs'; +import * as MQTT from 'aedes'; + +import getAysncServer from '../../src/openfunction/async_server'; +import {getUserPlugins} from '../../src/loader'; +import assert = require('assert'); +import { + TEST_CLOUD_EVENT, + TEST_CONTEXT, + TEST_PAYLOAD, + TEST_PLUGIN_OPTIONS, +} from '../data/test_data/async_plugin'; + +describe('OpenFunction - Async - Binding with plugin', () => { + const APPID = 'async.dapr'; + const broker = MQTT.Server(); + const server = createServer(broker.handle); + + before(done => { + // Start simple plain MQTT server via aedes + server.listen(1883, () => { + // Try to run Dapr sidecar and listen for the async server + shell.exec( + `dapr run -H 3500 -G 50001 -p ${TEST_CONTEXT.port} -d ./test/data/components/async -a ${APPID} --log-level debug`, + { + silent: true, + async: true, + } + ); + done(); + }); + }); + + after(done => { + // Stop dapr sidecar process + shell.exec(`dapr stop ${APPID}`, { + silent: true, + }); + server.close(); + broker.close(done); + }); + + it('mqtt sub w/ pub output with demo plugin', done => { + const app = getAysncServer((ctx, data) => { + if (isEmpty(data)) return; + + const context: any = ctx as any; + assert(context['pre'] === 'pre-exec'); + context['pre'] = 'main-exec'; + + // Assert that user function receives correct data from input binding + deepStrictEqual(data, TEST_PAYLOAD); + console.log(data); + // Then forward received data to output channel + const output = 'mqtt_pub'; + broker.subscribe( + get(TEST_PLUGIN_OPTIONS.context!, `outputs.${output}.uri`), + // eslint-disable-next-line @typescript-eslint/no-unused-vars + (packet, _) => { + const payload = JSON.parse(Buffer.from(packet.payload).toString()); + deepStrictEqual(payload.data, TEST_PAYLOAD); + app + .stop() + .then(() => { + assert(context['pre'] === 'main-exec'); + assert(context['post'] === 'post-exec'); + }) + .finally(done); + }, + () => { + ctx.send(TEST_PAYLOAD, output); + } + ); + }, TEST_PLUGIN_OPTIONS.context!); + + // First, we start the async server + app.start().then(async () => { + await getUserPlugins(TEST_PLUGIN_OPTIONS); + console.log(TEST_PLUGIN_OPTIONS); + // Then, we send a cloudevent format message to server + broker.publish( + { + cmd: 'publish', + topic: TEST_PLUGIN_OPTIONS.context!.inputs!.mqtt_sub.uri!, + payload: JSON.stringify(TEST_CLOUD_EVENT), + qos: 0, + retain: false, + dup: false, + }, + err => ifError(err) + ); + }); + }); +}); diff --git a/test/loader.ts b/test/loader.ts index 07b97d1e..35bc528c 100644 --- a/test/loader.ts +++ b/test/loader.ts @@ -18,6 +18,8 @@ import * as semver from 'semver'; import * as functions from '../src/functions'; import * as loader from '../src/loader'; import * as FunctionRegistry from '../src/function_registry'; +import {FrameworkOptions} from '../src/options'; +import {Plugin} from '../src'; describe('loading function', () => { interface TestData { @@ -131,3 +133,201 @@ describe('loading function', () => { assert.strictEqual(loadedFunction?.signatureType, 'cloudevent'); }); }); + +describe('loading plugins', () => { + interface ExceptData { + prePlugins: Array; + postPlugins: Array; + } + + interface TestData { + options: FrameworkOptions; + except: ExceptData; + } + const testData: TestData[] = [ + { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'demo', + version: '', + runtime: 'ASYNC', + prePlugins: ['demo-plugin'], + postPlugins: ['demo-plugin'], + }, + }, + except: { + prePlugins: ['demo-plugin'], + postPlugins: ['demo-plugin'], + }, + }, + { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'demo', + version: '', + runtime: 'ASYNC', + prePlugins: ['demo-plugin'], + postPlugins: [], + }, + }, + except: { + prePlugins: ['demo-plugin'], + postPlugins: [], + }, + }, + { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'demo', + version: '', + runtime: 'ASYNC', + prePlugins: [], + postPlugins: [], + }, + }, + except: { + prePlugins: [], + postPlugins: [], + }, + }, + { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'error', + version: '', + runtime: 'ASYNC', + prePlugins: ['error-plugin'], + postPlugins: ['error-plugin'], + }, + }, + except: { + prePlugins: [], + postPlugins: [], + }, + }, + { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'error', + version: '', + runtime: 'ASYNC', + prePlugins: ['error-miss-version-plugin', 'demo-plugin'], + postPlugins: ['error-miss-version-plugin'], + }, + }, + except: { + prePlugins: ['error-miss-version-plugin', 'demo-plugin'], + postPlugins: ['error-miss-version-plugin'], + }, + }, + ]; + + it('load exits plugins', async () => { + for (const test of testData) { + const options = await loader.getUserPlugins(test.options); + const current: ExceptData = { + prePlugins: [], + postPlugins: [], + }; + + options.context!.prePlugins!.forEach(item => { + assert(typeof item === 'object'); + assert(item.get(Plugin.OFN_PLUGIN_VERSION) === 'v1'); + current.prePlugins.push(item.get(Plugin.OFN_PLUGIN_NAME)); + }); + options.context!.postPlugins!.forEach(item => { + assert(typeof item === 'object'); + assert(item.get(Plugin.OFN_PLUGIN_VERSION) === 'v1'); + current.postPlugins.push(item.get(Plugin.OFN_PLUGIN_NAME)); + }); + + assert.deepStrictEqual(current, test.except); + } + }); + + const test: TestData = { + options: { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'error', + version: '', + runtime: 'ASYNC', + prePlugins: [''], + postPlugins: [''], + }, + }, + except: { + prePlugins: [''], + postPlugins: [''], + }, + }; + + function copyAndSet(name: string): TestData { + const data: TestData = JSON.parse(JSON.stringify(test)); + data.options.context!.prePlugins![0] = name; + data.options.context!.postPlugins![0] = name; + data.except.postPlugins[0] = name; + data.except.prePlugins[0] = name; + return data; + } + + it('user plugin miss all', async () => { + const data = copyAndSet('error-miss-all-plugin'); + const options = await loader.getUserPlugins(data.options); + assert(typeof options.context!.prePlugins![0] === 'object'); + assert( + options.context!.prePlugins![0].get(Plugin.OFN_PLUGIN_NAME) === + 'error-miss-all-plugin' + ); + assert(options.context!.prePlugins![0].execPreHook); + assert(options.context!.prePlugins![0].execPostHook); + }); + + it('load multi plugins ', async () => { + const data: FrameworkOptions = { + port: '8080', + target: 'helloWorld', + sourceLocation: process.cwd() + '/test/data', + signatureType: 'event', + printHelp: false, + context: { + name: 'demo', + version: '', + runtime: 'ASYNC', + prePlugins: ['demo-plugin', 'error-miss-all-plugin'], + postPlugins: ['demo-plugin', 'error-miss-all-plugin'], + }, + }; + assert.ok(await loader.getUserPlugins(data)); + console.log(data); + }); +}); diff --git a/test/options.ts b/test/options.ts index f7cf727a..ab7a5b84 100644 --- a/test/options.ts +++ b/test/options.ts @@ -125,6 +125,78 @@ describe('parseOptions', () => { printHelp: false, }, }, + { + name: 'respects all env vars', + cliOpts: ['bin/node', '/index.js'], + envVars: { + PORT: '1234', + FUNCTION_TARGET: 'helloWorld', + FUNCTION_SIGNATURE_TYPE: 'cloudevent', + FUNCTION_SOURCE: '/source', + FUNC_CONTEXT: + '{ "name": "foo", "version": "1.0.0", "runtime": "Knative" }', + }, + expectedOptions: { + port: '1234', + target: 'helloWorld', + sourceLocation: resolve('/source'), + signatureType: 'cloudevent', + context: {name: 'foo', version: '1.0.0', runtime: 'Knative'}, + printHelp: false, + }, + }, + { + name: 'respects all env vars with empty plugins', + cliOpts: ['bin/node', '/index.js'], + envVars: { + PORT: '1234', + FUNCTION_TARGET: 'helloWorld', + FUNCTION_SIGNATURE_TYPE: 'cloudevent', + FUNCTION_SOURCE: '/source', + FUNC_CONTEXT: + '{ "name": "foo", "version": "1.0.0", "runtime": "Knative", "prePlugins": [] , "postPlugins": []}', + }, + expectedOptions: { + port: '1234', + target: 'helloWorld', + sourceLocation: resolve('/source'), + signatureType: 'cloudevent', + context: { + name: 'foo', + version: '1.0.0', + runtime: 'Knative', + prePlugins: [], + postPlugins: [], + }, + printHelp: false, + }, + }, + { + name: 'respects all env vars with plugins', + cliOpts: ['bin/node', '/index.js'], + envVars: { + PORT: '1234', + FUNCTION_TARGET: 'helloWorld', + FUNCTION_SIGNATURE_TYPE: 'cloudevent', + FUNCTION_SOURCE: '/source', + FUNC_CONTEXT: + '{ "name": "foo", "version": "1.0.0", "runtime": "Knative", "prePlugins": ["test-plugin"] , "postPlugins": ["test-plugin"]}', + }, + expectedOptions: { + port: '1234', + target: 'helloWorld', + sourceLocation: resolve('/source'), + signatureType: 'cloudevent', + context: { + name: 'foo', + version: '1.0.0', + runtime: 'Knative', + prePlugins: ['test-plugin'], + postPlugins: ['test-plugin'], + }, + printHelp: false, + }, + }, ]; testData.forEach(testCase => {