diff --git a/.gitignore b/.gitignore index ac35a78..f2d507b 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,6 @@ tmp/ # VS code local history .history/ +*.DS_Store + +tsbuild/ diff --git a/README.md b/README.md index f7f92b4..947cbab 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,7 @@ * [Examples](#examples) * [Basic Examples](#basicExamples) * [Basic Examples](#basicExamples) -* [Advanced] (#advanced) +* [Advanced](#advanced) * [Basic Examples](#basicExamples) * [Basic Examples](#basicExamples) * [Contributing](#contributing) diff --git a/examples/routingExample.js b/examples/routingExample.js new file mode 100644 index 0000000..3b2201f --- /dev/null +++ b/examples/routingExample.js @@ -0,0 +1,29 @@ +import { Node } from '../src' + +// znode3 +// /\ +// / \ +// / \ +// znode1 znode2 + +(async function () { + let znode3 = new Node({bind: 'tcp://127.0.0.1:3000'}) + let znode1 = new Node() + let znode2 = new Node() + + await znode3.bind() + await znode1.connect({ address: znode3.getAddress() }) + await znode2.connect({ address: znode3.getAddress() }) + + znode2.onRequest('foo', ({ body, reply }) => { + console.log(body) + reply('reply from znode2.') + }) + + let rep = await znode1.requestAny({ + event: 'foo', + data: 'request from znode1.' + }) + + console.log(rep) +}()) \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index f6a2739..60b80aa 100644 --- a/package-lock.json +++ b/package-lock.json @@ -935,6 +935,11 @@ "@hapi/hoek": "6.x.x" } }, + "@sfast/pattern-emitter-ts": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@sfast/pattern-emitter-ts/-/pattern-emitter-ts-0.0.9.tgz", + "integrity": "sha512-ZIg/k6jDGHzr9SVD4EIUM/+ayMyJsamVZiOYlEZmlFW9J4GEPh7vmt76x7s85cEJ0JALylAUZWhl/M+Y2+Zi7A==" + }, "@snyk/composer-lockfile-parser": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/@snyk/composer-lockfile-parser/-/composer-lockfile-parser-1.0.2.tgz", @@ -3345,25 +3350,29 @@ "dependencies": { "abbrev": { "version": "1.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/abbrev/-/abbrev-1.1.1.tgz", + "integrity": "sha512-nne9/IiQ/hzIhY6pdDnbBtz7DjPTKrY00P/zvPSm5pOFkl6xuGrGnXn/VtTNNfNtAfZ9/1RtehkszU9qcTii0Q==", "dev": true, "optional": true }, "ansi-regex": { "version": "2.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-2.1.1.tgz", + "integrity": "sha1-w7M6te42DYbg5ijwRorn7yfWVN8=", "dev": true, "optional": true }, "aproba": { "version": "1.2.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/aproba/-/aproba-1.2.0.tgz", + "integrity": "sha512-Y9J6ZjXtoYh8RnXVCMOU/ttDmk1aBjunq9vO0ta5x85WDQiQfUF9sIPBITdbiiIVcBo03Hi3jMxigBtsddlXRw==", "dev": true, "optional": true }, "are-we-there-yet": { "version": "1.1.5", - "bundled": true, + "resolved": "https://registry.npmjs.org/are-we-there-yet/-/are-we-there-yet-1.1.5.tgz", + "integrity": "sha512-5hYdAkZlcG8tOLujVDTgCT+uPX0VnpAH28gWsLfzpXYm7wP6mp5Q/gYyR7YQ0cKVJcXJnl3j2kpBan13PtQf6w==", "dev": true, "optional": true, "requires": { @@ -3373,13 +3382,15 @@ }, "balanced-match": { "version": "1.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/balanced-match/-/balanced-match-1.0.0.tgz", + "integrity": "sha1-ibTRmasr7kneFk6gK4nORi1xt2c=", "dev": true, "optional": true }, "brace-expansion": { "version": "1.1.11", - "bundled": true, + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-1.1.11.tgz", + "integrity": "sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==", "dev": true, "optional": true, "requires": { @@ -3389,37 +3400,43 @@ }, "chownr": { "version": "1.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/chownr/-/chownr-1.1.1.tgz", + "integrity": "sha512-j38EvO5+LHX84jlo6h4UzmOwi0UgW61WRyPtJz4qaadK5eY3BTS5TY/S1Stc3Uk2lIM6TPevAlULiEJwie860g==", "dev": true, "optional": true }, "code-point-at": { "version": "1.1.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/code-point-at/-/code-point-at-1.1.0.tgz", + "integrity": "sha1-DQcLTQQ6W+ozovGkDi7bPZpMz3c=", "dev": true, "optional": true }, "concat-map": { "version": "0.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/concat-map/-/concat-map-0.0.1.tgz", + "integrity": "sha1-2Klr13/Wjfd5OnMDajug1UBdR3s=", "dev": true, "optional": true }, "console-control-strings": { "version": "1.1.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/console-control-strings/-/console-control-strings-1.1.0.tgz", + "integrity": "sha1-PXz0Rk22RG6mRL9LOVB/mFEAjo4=", "dev": true, "optional": true }, "core-util-is": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", + "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=", "dev": true, "optional": true }, "debug": { "version": "4.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/debug/-/debug-4.1.1.tgz", + "integrity": "sha512-pYAIzeRo8J6KPEaJ0VWOh5Pzkbw/RetuzehGM7QRRX5he4fPHx2rdKMB256ehJCkX+XRQm16eZLqLNS8RSZXZw==", "dev": true, "optional": true, "requires": { @@ -3428,25 +3445,29 @@ }, "deep-extend": { "version": "0.6.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/deep-extend/-/deep-extend-0.6.0.tgz", + "integrity": "sha512-LOHxIOaPYdHlJRtCQfDIVZtfw/ufM8+rVj649RIHzcm/vGwQRXFt6OPqIFWsm2XEMrNIEtWR64sY1LEKD2vAOA==", "dev": true, "optional": true }, "delegates": { "version": "1.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/delegates/-/delegates-1.0.0.tgz", + "integrity": "sha1-hMbhWbgZBP3KWaDvRM2HDTElD5o=", "dev": true, "optional": true }, "detect-libc": { "version": "1.0.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/detect-libc/-/detect-libc-1.0.3.tgz", + "integrity": "sha1-+hN8S9aY7fVc1c0CrFWfkaTEups=", "dev": true, "optional": true }, "fs-minipass": { "version": "1.2.5", - "bundled": true, + "resolved": "https://registry.npmjs.org/fs-minipass/-/fs-minipass-1.2.5.tgz", + "integrity": "sha512-JhBl0skXjUPCFH7x6x61gQxrKyXsxB5gcgePLZCwfyCGGsTISMoIeObbrvVeP6Xmyaudw4TT43qV2Gz+iyd2oQ==", "dev": true, "optional": true, "requires": { @@ -3455,13 +3476,15 @@ }, "fs.realpath": { "version": "1.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/fs.realpath/-/fs.realpath-1.0.0.tgz", + "integrity": "sha1-FQStJSMVjKpA20onh8sBQRmU6k8=", "dev": true, "optional": true }, "gauge": { "version": "2.7.4", - "bundled": true, + "resolved": "https://registry.npmjs.org/gauge/-/gauge-2.7.4.tgz", + "integrity": "sha1-LANAXHU4w51+s3sxcCLjJfsBi/c=", "dev": true, "optional": true, "requires": { @@ -3477,7 +3500,8 @@ }, "glob": { "version": "7.1.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/glob/-/glob-7.1.3.tgz", + "integrity": "sha512-vcfuiIxogLV4DlGBHIUOwI0IbrJ8HWPc4MU7HzviGeNho/UJDfi6B5p3sHeWIQ0KGIU0Jpxi5ZHxemQfLkkAwQ==", "dev": true, "optional": true, "requires": { @@ -3491,13 +3515,15 @@ }, "has-unicode": { "version": "2.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/has-unicode/-/has-unicode-2.0.1.tgz", + "integrity": "sha1-4Ob+aijPUROIVeCG0Wkedx3iqLk=", "dev": true, "optional": true }, "iconv-lite": { "version": "0.4.24", - "bundled": true, + "resolved": "https://registry.npmjs.org/iconv-lite/-/iconv-lite-0.4.24.tgz", + "integrity": "sha512-v3MXnZAcvnywkTUEZomIActle7RXXeedOR31wwl7VlyoXO4Qi9arvSenNQWne1TcRwhCL1HwLI21bEqdpj8/rA==", "dev": true, "optional": true, "requires": { @@ -3506,7 +3532,8 @@ }, "ignore-walk": { "version": "3.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/ignore-walk/-/ignore-walk-3.0.1.tgz", + "integrity": "sha512-DTVlMx3IYPe0/JJcYP7Gxg7ttZZu3IInhuEhbchuqneY9wWe5Ojy2mXLBaQFUQmo0AW2r3qG7m1mg86js+gnlQ==", "dev": true, "optional": true, "requires": { @@ -3515,7 +3542,8 @@ }, "inflight": { "version": "1.0.6", - "bundled": true, + "resolved": "https://registry.npmjs.org/inflight/-/inflight-1.0.6.tgz", + "integrity": "sha1-Sb1jMdfQLQwJvJEKEHW6gWW1bfk=", "dev": true, "optional": true, "requires": { @@ -3525,19 +3553,22 @@ }, "inherits": { "version": "2.0.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz", + "integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=", "dev": true, "optional": true }, "ini": { "version": "1.3.5", - "bundled": true, + "resolved": "https://registry.npmjs.org/ini/-/ini-1.3.5.tgz", + "integrity": "sha512-RZY5huIKCMRWDUqZlEi72f/lmXKMvuszcMBduliQ3nnWbx9X/ZBQO7DijMEYS9EhHBb2qacRUMtC7svLwe0lcw==", "dev": true, "optional": true }, "is-fullwidth-code-point": { "version": "1.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/is-fullwidth-code-point/-/is-fullwidth-code-point-1.0.0.tgz", + "integrity": "sha1-754xOG8DGn8NZDr4L95QxFfvAMs=", "dev": true, "optional": true, "requires": { @@ -3546,13 +3577,15 @@ }, "isarray": { "version": "1.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/isarray/-/isarray-1.0.0.tgz", + "integrity": "sha1-u5NdSFgsuhaMBoNJV6VKPgcSTxE=", "dev": true, "optional": true }, "minimatch": { "version": "3.0.4", - "bundled": true, + "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.0.4.tgz", + "integrity": "sha512-yJHVQEhyqPLUTgt9B83PXu6W3rx4MvvHvSUvToogpwoGDOUQ+yDrR0HRot+yOCdCO7u4hX3pWft6kWBBcqh0UA==", "dev": true, "optional": true, "requires": { @@ -3561,13 +3594,15 @@ }, "minimist": { "version": "0.0.8", - "bundled": true, + "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.8.tgz", + "integrity": "sha1-hX/Kv8M5fSYluCKCYuhqp6ARsF0=", "dev": true, "optional": true }, "minipass": { "version": "2.3.5", - "bundled": true, + "resolved": "https://registry.npmjs.org/minipass/-/minipass-2.3.5.tgz", + "integrity": "sha512-Gi1W4k059gyRbyVUZQ4mEqLm0YIUiGYfvxhF6SIlk3ui1WVxMTGfGdQ2SInh3PDrRTVvPKgULkpJtT4RH10+VA==", "dev": true, "optional": true, "requires": { @@ -3577,7 +3612,8 @@ }, "minizlib": { "version": "1.2.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/minizlib/-/minizlib-1.2.1.tgz", + "integrity": "sha512-7+4oTUOWKg7AuL3vloEWekXY2/D20cevzsrNT2kGWm+39J9hGTCBv8VI5Pm5lXZ/o3/mdR4f8rflAPhnQb8mPA==", "dev": true, "optional": true, "requires": { @@ -3586,7 +3622,8 @@ }, "mkdirp": { "version": "0.5.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/mkdirp/-/mkdirp-0.5.1.tgz", + "integrity": "sha1-MAV0OOrGz3+MR2fzhkjWaX11yQM=", "dev": true, "optional": true, "requires": { @@ -3595,13 +3632,15 @@ }, "ms": { "version": "2.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.1.tgz", + "integrity": "sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg==", "dev": true, "optional": true }, "needle": { "version": "2.3.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/needle/-/needle-2.3.0.tgz", + "integrity": "sha512-QBZu7aAFR0522EyaXZM0FZ9GLpq6lvQ3uq8gteiDUp7wKdy0lSd2hPlgFwVuW1CBkfEs9PfDQsQzZghLs/psdg==", "dev": true, "optional": true, "requires": { @@ -3612,7 +3651,8 @@ }, "node-pre-gyp": { "version": "0.12.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/node-pre-gyp/-/node-pre-gyp-0.12.0.tgz", + "integrity": "sha512-4KghwV8vH5k+g2ylT+sLTjy5wmUOb9vPhnM8NHvRf9dHmnW/CndrFXy2aRPaPST6dugXSdHXfeaHQm77PIz/1A==", "dev": true, "optional": true, "requires": { @@ -3630,7 +3670,8 @@ }, "nopt": { "version": "4.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/nopt/-/nopt-4.0.1.tgz", + "integrity": "sha1-0NRoWv1UFRk8jHUFYC0NF81kR00=", "dev": true, "optional": true, "requires": { @@ -3640,13 +3681,15 @@ }, "npm-bundled": { "version": "1.0.6", - "bundled": true, + "resolved": "https://registry.npmjs.org/npm-bundled/-/npm-bundled-1.0.6.tgz", + "integrity": "sha512-8/JCaftHwbd//k6y2rEWp6k1wxVfpFzB6t1p825+cUb7Ym2XQfhwIC5KwhrvzZRJu+LtDE585zVaS32+CGtf0g==", "dev": true, "optional": true }, "npm-packlist": { "version": "1.4.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/npm-packlist/-/npm-packlist-1.4.1.tgz", + "integrity": "sha512-+TcdO7HJJ8peiiYhvPxsEDhF3PJFGUGRcFsGve3vxvxdcpO2Z4Z7rkosRM0kWj6LfbK/P0gu3dzk5RU1ffvFcw==", "dev": true, "optional": true, "requires": { @@ -3656,7 +3699,8 @@ }, "npmlog": { "version": "4.1.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/npmlog/-/npmlog-4.1.2.tgz", + "integrity": "sha512-2uUqazuKlTaSI/dC8AzicUck7+IrEaOnN/e0jd3Xtt1KcGpwx30v50mL7oPyr/h9bL3E4aZccVwpwP+5W9Vjkg==", "dev": true, "optional": true, "requires": { @@ -3668,19 +3712,22 @@ }, "number-is-nan": { "version": "1.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/number-is-nan/-/number-is-nan-1.0.1.tgz", + "integrity": "sha1-CXtgK1NCKlIsGvuHkDGDNpQaAR0=", "dev": true, "optional": true }, "object-assign": { "version": "4.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", + "integrity": "sha1-IQmtx5ZYh8/AXLvUQsrIv7s2CGM=", "dev": true, "optional": true }, "once": { "version": "1.4.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", + "integrity": "sha1-WDsap3WWHUsROsF9nFC6753Xa9E=", "dev": true, "optional": true, "requires": { @@ -3689,19 +3736,22 @@ }, "os-homedir": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/os-homedir/-/os-homedir-1.0.2.tgz", + "integrity": "sha1-/7xJiDNuDoM94MFox+8VISGqf7M=", "dev": true, "optional": true }, "os-tmpdir": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/os-tmpdir/-/os-tmpdir-1.0.2.tgz", + "integrity": "sha1-u+Z0BseaqFxc/sdm/lc0VV36EnQ=", "dev": true, "optional": true }, "osenv": { "version": "0.1.5", - "bundled": true, + "resolved": "https://registry.npmjs.org/osenv/-/osenv-0.1.5.tgz", + "integrity": "sha512-0CWcCECdMVc2Rw3U5w9ZjqX6ga6ubk1xDVKxtBQPK7wis/0F2r9T6k4ydGYhecl7YUBxBVxhL5oisPsNxAPe2g==", "dev": true, "optional": true, "requires": { @@ -3711,19 +3761,22 @@ }, "path-is-absolute": { "version": "1.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/path-is-absolute/-/path-is-absolute-1.0.1.tgz", + "integrity": "sha1-F0uSaHNVNP+8es5r9TpanhtcX18=", "dev": true, "optional": true }, "process-nextick-args": { "version": "2.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz", + "integrity": "sha512-MtEC1TqN0EU5nephaJ4rAtThHtC86dNN9qCuEhtshvpVBkAW5ZO7BASN9REnF9eoXGcRub+pFuKEpOHE+HbEMw==", "dev": true, "optional": true }, "rc": { "version": "1.2.8", - "bundled": true, + "resolved": "https://registry.npmjs.org/rc/-/rc-1.2.8.tgz", + "integrity": "sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==", "dev": true, "optional": true, "requires": { @@ -3735,7 +3788,8 @@ "dependencies": { "minimist": { "version": "1.2.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", + "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=", "dev": true, "optional": true } @@ -3743,7 +3797,8 @@ }, "readable-stream": { "version": "2.3.6", - "bundled": true, + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz", + "integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==", "dev": true, "optional": true, "requires": { @@ -3758,7 +3813,8 @@ }, "rimraf": { "version": "2.6.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-2.6.3.tgz", + "integrity": "sha512-mwqeW5XsA2qAejG46gYdENaxXjx9onRNCfn7L0duuP4hCuTIi/QO7PDK07KJfp1d+izWPrzEJDcSqBa0OZQriA==", "dev": true, "optional": true, "requires": { @@ -3767,43 +3823,50 @@ }, "safe-buffer": { "version": "5.1.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", "dev": true, "optional": true }, "safer-buffer": { "version": "2.1.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", + "integrity": "sha512-YZo3K82SD7Riyi0E1EQPojLz7kpepnSQI9IyPbHHg1XXXevb5dJI7tpyN2ADxGcQbHG7vcyRHk0cbwqcQriUtg==", "dev": true, "optional": true }, "sax": { "version": "1.2.4", - "bundled": true, + "resolved": "https://registry.npmjs.org/sax/-/sax-1.2.4.tgz", + "integrity": "sha512-NqVDv9TpANUjFm0N8uM5GxL36UgKi9/atZw+x7YFnQ8ckwFGKrl4xX4yWtrey3UJm5nP1kUbnYgLopqWNSRhWw==", "dev": true, "optional": true }, "semver": { "version": "5.7.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/semver/-/semver-5.7.0.tgz", + "integrity": "sha512-Ya52jSX2u7QKghxeoFGpLwCtGlt7j0oY9DYb5apt9nPlJ42ID+ulTXESnt/qAQcoSERyZ5sl3LDIOw0nAn/5DA==", "dev": true, "optional": true }, "set-blocking": { "version": "2.0.0", - "bundled": true, + "resolved": "https://registry.npmjs.org/set-blocking/-/set-blocking-2.0.0.tgz", + "integrity": "sha1-BF+XgtARrppoA93TgrJDkrPYkPc=", "dev": true, "optional": true }, "signal-exit": { "version": "3.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.2.tgz", + "integrity": "sha1-tf3AjxKH6hF4Yo5BXiUTK3NkbG0=", "dev": true, "optional": true }, "string-width": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/string-width/-/string-width-1.0.2.tgz", + "integrity": "sha1-EYvfW4zcUaKn5w0hHgfisLmxB9M=", "dev": true, "optional": true, "requires": { @@ -3814,7 +3877,8 @@ }, "string_decoder": { "version": "1.1.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", + "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", "dev": true, "optional": true, "requires": { @@ -3823,7 +3887,8 @@ }, "strip-ansi": { "version": "3.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/strip-ansi/-/strip-ansi-3.0.1.tgz", + "integrity": "sha1-ajhfuIU9lS1f8F0Oiq+UJ43GPc8=", "dev": true, "optional": true, "requires": { @@ -3832,13 +3897,15 @@ }, "strip-json-comments": { "version": "2.0.1", - "bundled": true, + "resolved": "https://registry.npmjs.org/strip-json-comments/-/strip-json-comments-2.0.1.tgz", + "integrity": "sha1-PFMZQukIwml8DsNEhYwobHygpgo=", "dev": true, "optional": true }, "tar": { "version": "4.4.8", - "bundled": true, + "resolved": "https://registry.npmjs.org/tar/-/tar-4.4.8.tgz", + "integrity": "sha512-LzHF64s5chPQQS0IYBn9IN5h3i98c12bo4NCO7e0sGM2llXQ3p2FGC5sdENN4cTW48O915Sh+x+EXx7XW96xYQ==", "dev": true, "optional": true, "requires": { @@ -3853,13 +3920,15 @@ }, "util-deprecate": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/util-deprecate/-/util-deprecate-1.0.2.tgz", + "integrity": "sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=", "dev": true, "optional": true }, "wide-align": { "version": "1.1.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/wide-align/-/wide-align-1.1.3.tgz", + "integrity": "sha512-QGkOQc8XL6Bt5PwnsExKBPuMKBxnGxWWW3fU55Xt4feHozMUhdUMaBCk290qpm/wG5u/RSKzwdAC4i51YigihA==", "dev": true, "optional": true, "requires": { @@ -3868,13 +3937,15 @@ }, "wrappy": { "version": "1.0.2", - "bundled": true, + "resolved": "https://registry.npmjs.org/wrappy/-/wrappy-1.0.2.tgz", + "integrity": "sha1-tSQ9jz7BqjXxNkYFvA0QNuMKtp8=", "dev": true, "optional": true }, "yallist": { "version": "3.0.3", - "bundled": true, + "resolved": "https://registry.npmjs.org/yallist/-/yallist-3.0.3.tgz", + "integrity": "sha512-S+Zk8DEWE6oKpV+vI3qWkaK+jSbIK86pCwe2IF/xwIpQ8jEuxpw9NyaGjmp9+BoJv5FV2piqCDcoCtStppiq2A==", "dev": true, "optional": true } diff --git a/package.json b/package.json index c6c0fe0..1de4794 100644 --- a/package.json +++ b/package.json @@ -49,6 +49,7 @@ "homepage": "https://github.com/sfast/zeronode#readme", "dependencies": { "@babel/runtime": "^7.4.5", + "@sfast/pattern-emitter-ts": "^0.0.9", "animal-id": "0.0.1", "bluebird": "^3.5.5", "buffer-alloc": "^1.2.0", diff --git a/src/client.js b/src/client.js index f7405c3..25ffba6 100644 --- a/src/client.js +++ b/src/client.js @@ -102,7 +102,7 @@ export default class Client extends DealerSocket { request ({ event, data, timeout, mainEvent } = {}) { let server = this.getServerActor() - + // this is first request, and there is no need to check if server online or not if (mainEvent && event === events.CLIENT_CONNECTED) { return super.request({ event, data, timeout, mainEvent }) @@ -113,6 +113,7 @@ export default class Client extends DealerSocket { return Promise.reject(new ZeronodeError({ socketId: this.getId(), error: serverOfflineError, code: ErrorCodes.SERVER_IS_OFFLINE })) } + // console.log("AVAR::REQUEST server", server); return super.request({ event, data, timeout, to: server.getId(), mainEvent }) } @@ -124,6 +125,7 @@ export default class Client extends DealerSocket { return Promise.reject(new ZeronodeError({ socketId: this.getId(), error: serverOfflineError, code: ErrorCodes.SERVER_IS_OFFLINE })) } + // console.log("AVAR::TICK server", server); super.tick({ event, data, to: server.getId(), mainEvent }) } } diff --git a/src/metric.js b/src/metric.js index 44c1ed6..1f07d47 100644 --- a/src/metric.js +++ b/src/metric.js @@ -326,7 +326,7 @@ export default class Metric { if (!request) return request.success = true - request.duration = envelop.data.duration + request.duration = envelop.context.duration request.size.push(envelop.size) sendRequestCollection.update(request) @@ -344,7 +344,7 @@ export default class Metric { if (!request) return request.error = true - request.duration = envelop.data.duration + request.duration = envelop.context.duration request.size.push(envelop.size) sendRequestCollection.update(request) } diff --git a/src/node.js b/src/node.js index 8b965ab..285442d 100644 --- a/src/node.js +++ b/src/node.js @@ -6,7 +6,8 @@ import _ from 'underscore' import Promise from 'bluebird' import md5 from 'md5' import animal from 'animal-id' -import { EventEmitter } from 'events' +// import { EventEmitter } from 'events' +import { PatternEmitter } from "@sfast/pattern-emitter-ts"; import { ZeronodeError, ErrorCodes } from './errors' import NodeUtils from './utils' @@ -14,7 +15,7 @@ import Server from './server' import Client from './client' import Metric from './metric' import { events } from './enum' -import { Enum, Watchers } from './sockets' +import { Enum } from './sockets' let MetricType = Enum.MetricType @@ -26,7 +27,7 @@ let defaultLogger = winston.createLogger({ ] }) -export default class Node extends EventEmitter { +export default class Node extends PatternEmitter { constructor ({ id, bind, options, config } = {}) { super() @@ -55,8 +56,12 @@ export default class Node extends EventEmitter { nodeServer: null, nodeClients: new Map(), nodeClientsAddressIndex: new Map(), - tickWatcherMap: new Map(), - requestWatcherMap: new Map() + + // tickWatcherMap: new Map(), + // requestWatcherMap: new Map(), + + tickPEmitter: new PatternEmitter(), + requestPEmitter: new PatternEmitter(), } _private.set(this, _scope) @@ -273,15 +278,18 @@ export default class Node extends EventEmitter { onRequest (requestEvent, fn) { let _scope = _private.get(this) - let { requestWatcherMap, nodeClients, nodeServer } = _scope + // let { requestWatcherMap, nodeClients, nodeServer } = _scope + let { requestPEmitter, nodeClients, nodeServer } = _scope - let requestWatcher = requestWatcherMap.get(requestEvent) - if (!requestWatcher) { - requestWatcher = new Watchers(requestEvent) - requestWatcherMap.set(requestEvent, requestWatcher) - } + // let requestWatcher = requestWatcherMap.get(requestEvent) + // if (!requestWatcher) { + // requestWatcher = new Watchers(requestEvent) + // requestWatcherMap.set(requestEvent, requestWatcher) + // } - requestWatcher.addFn(fn) + // requestWatcher.addFn(fn) + + requestPEmitter.on(requestEvent, fn); nodeServer.onRequest(requestEvent, fn) @@ -298,23 +306,33 @@ export default class Node extends EventEmitter { client.offRequest(requestEvent, fn) }) - let requestWatcher = _scope.requestWatcherMap.get(requestEvent) - if (requestWatcher) { - requestWatcher.removeFn(fn) + if(!fn) { + _scope.requestPEmitter.removeAllListeners(requestEvent) + } + else { + _scope.requestPEmitter.off(requestEvent, fn) } + + // let requestWatcher = _scope.requestWatcherMap.get(requestEvent) + // if (requestWatcher) { + // requestWatcher.removeFn(fn) + // } } onTick (event, fn) { let _scope = _private.get(this) - let { tickWatcherMap, nodeClients, nodeServer } = _scope + // let { tickWatcherMap, nodeClients, nodeServer } = _scope + let { tickPEmitter, nodeClients, nodeServer } = _scope - let tickWatcher = tickWatcherMap.get(event) - if (!tickWatcher) { - tickWatcher = new Watchers(event) - tickWatcherMap.set(event, tickWatcher) - } + // ** AVAR::commented + // let tickWatcher = tickWatcherMap.get(event) + // if (!tickWatcher) { + // tickWatcher = new Watchers(event) + // tickWatcherMap.set(event, tickWatcher) + // } - tickWatcher.addFn(fn) + // tickWatcher.addFn(fn) + tickPEmitter.on(event, fn); // ** _scope.nodeServer is constructed in Node constructor nodeServer.onTick(event, fn) @@ -326,15 +344,22 @@ export default class Node extends EventEmitter { offTick (event, fn) { let _scope = _private.get(this) - _scope.nodeServer.offTick(event) + _scope.nodeServer.offTick(event, fn) _scope.nodeClients.forEach((client) => { client.offTick(event, fn) }, this) - let tickWatcher = _scope.tickWatcherMap.get(event) - if (tickWatcher) { - tickWatcher.removeFn(fn) + if(!fn) { + _scope.tickPEmitter.removeAllListeners(event) + } + else { + _scope.tickPEmitter.off(event, fn) } + + // let tickWatcher = _scope.tickWatcherMap.get(event) + // if (tickWatcher) { + // tickWatcher.removeFn(fn) + // } } async request ({ to, event, data, timeout } = {}) { @@ -549,34 +574,61 @@ function _addExistingListenersToClient (client) { let _scope = _private.get(this) // ** adding previously added onTick-s for this client to - _scope.tickWatcherMap.forEach((tickWatcher, event) => { - // ** TODO what about order of functions ? - tickWatcher.getFnMap().forEach((index, fn) => { - client.onTick(event, this::fn) - }, this) - }, this) + _scope.tickPEmitter.listeners.forEach((eventArg, fnMap) => { + fnMap.forEach((fn) => { + client.onTick(event, this::fn) + }); + }, this); + + // ** adding previously added onTick-s for this client to + // _scope.tickWatcherMap.forEach((tickWatcher, event) => { + // // ** TODO what about order of functions ? + // tickWatcher.getFnMap().forEach((index, fn) => { + // client.onTick(event, this::fn) + // }, this) + // }, this) + + _scope.requestPEmitter.listeners.forEach((eventArg, fnMap) => { + fnMap.forEach((fn) => { + client.onRequest(event, this::fn) + }); +}, this); + // ** adding previously added onRequests-s for this client to - _scope.requestWatcherMap.forEach((requestWatcher, requestEvent) => { - // ** TODO what about order of functions ? - requestWatcher.getFnMap().forEach((index, fn) => { - client.onRequest(requestEvent, this::fn) - }, this) - }, this) + + // ** adding previously added onRequests-s for this client to + // _scope.requestWatcherMap.forEach((requestWatcher, requestEvent) => { + // // ** TODO what about order of functions ? + // requestWatcher.getFnMap().forEach((index, fn) => { + // client.onRequest(requestEvent, this::fn) + // }, this) + // }, this) } function _removeClientAllListeners (client) { let _scope = _private.get(this) // ** removing all handlers - _scope.tickWatcherMap.forEach((tickWatcher, event) => { - client.offTick(event) - }, this) + // _scope.tickWatcherMap.forEach((tickWatcher, event) => { + // client.offTick(event) + // }, this) + + // ** removing all handlers + _scope.tickPEmitter.listeners.forEach((eventArg, fnMap) => { + client.offTick(eventArg) + }); // ** removing all handlers - _scope.requestWatcherMap.forEach((requestWatcher, requestEvent) => { - client.offRequest(requestEvent) - }, this) + // _scope.requestWatcherMap.forEach((requestWatcher, requestEvent) => { + // client.offRequest(requestEvent) + // }, this) + + // ** removing all handlers + _scope.tickPEmitter.listeners.forEach((eventArg, fnMap) => { + client.offTick(eventArg) + }); + } function _attachMetricsHandlers (socket, metric) { diff --git a/src/sockets/dealer.js b/src/sockets/dealer.js index c1d2c56..ea39218 100644 --- a/src/sockets/dealer.js +++ b/src/sockets/dealer.js @@ -187,7 +187,7 @@ export default class DealerSocket extends Socket { socket.close() } - getSocketMsg (envelop) { - return envelop.getBuffer() + getSocketMsg (buffer) { + return buffer } } diff --git a/src/sockets/enum.js b/src/sockets/enum.js index df40813..fe7eec6 100644 --- a/src/sockets/enum.js +++ b/src/sockets/enum.js @@ -6,7 +6,8 @@ let EnvelopType = { TICK: 1, REQUEST: 2, RESPONSE: 3, - ERROR: 4 + ERROR: 4, + ROUTE: 5 } let MetricType = { diff --git a/src/sockets/envelope.js b/src/sockets/envelope.js index c4105b6..0ca1848 100644 --- a/src/sockets/envelope.js +++ b/src/sockets/envelope.js @@ -24,10 +24,10 @@ class Parse { } } -const lengthSize = 1 +const lengthSize = 2 export default class Envelop { - constructor ({ type, id = '', tag = '', data, owner = '', recipient = '', mainEvent }) { + constructor ({ type, id = '', tag = '', data, owner = '', recipient = '', mainEvent, context }) { if (type) { this.setType(type) } @@ -35,6 +35,7 @@ export default class Envelop { this.id = id || crypto.randomBytes(20).toString('hex') this.tag = tag this.mainEvent = mainEvent + this.context = context || {} if (data) { this.data = data @@ -49,6 +50,7 @@ export default class Envelop { type: this.type, id: this.id, tag: this.tag, + context: this.context, data: this.data, owner: this.owner, recipient: this.recipient, @@ -56,6 +58,18 @@ export default class Envelop { } } + toMetaJSON () { + return { + type: this.type, + id: this.id, + tag: this.tag, + context: this.context, + owner: this.owner, + recipient: this.recipient, + mainEvent: this.mainEvent + } + } + /** * * @param buffer @@ -69,7 +83,9 @@ export default class Envelop { * recipientLength: 4, * recipient: recipientLength, * tagLength: 4, - * tag: tagLength + * tag: tagLength, + * contextLength: 4, + * context: contextLength * @return {{mainEvent: boolean, type, id: string, owner: string, recipient: string, tag: string}} */ static readMetaFromBuffer (buffer) { @@ -78,22 +94,37 @@ export default class Envelop { let type = buffer.readInt8(1) let idStart = 2 + lengthSize - let idLength = buffer.readInt8(idStart - lengthSize) + let idLength = buffer.readUInt16BE(idStart - lengthSize) let id = buffer.slice(idStart, idStart + idLength).toString('hex') let ownerStart = lengthSize + idStart + idLength - let ownerLength = buffer.readInt8(ownerStart - lengthSize) + let ownerLength = buffer.readUInt16BE(ownerStart - lengthSize) let owner = buffer.slice(ownerStart, ownerStart + ownerLength).toString('utf8').replace(/\0/g, '') let recipientStart = lengthSize + ownerStart + ownerLength - let recipientLength = buffer.readInt8(recipientStart - lengthSize) + let recipientLength = buffer.readUInt16BE(recipientStart - lengthSize) let recipient = buffer.slice(recipientStart, recipientStart + recipientLength).toString('utf8').replace(/\0/g, '') let tagStart = lengthSize + recipientStart + recipientLength - let tagLength = buffer.readInt8(tagStart - lengthSize) + let tagLength = buffer.readUInt16BE(tagStart - lengthSize) let tag = buffer.slice(tagStart, tagStart + tagLength).toString('utf8').replace(/\0/g, '') - return { mainEvent, type, id, owner, recipient, tag } + // ** parsing context + let contextStart = lengthSize + tagStart + tagLength + let contextLength = buffer.readUInt16BE(contextStart - lengthSize) + + let context = {} + try { + context = JSON.parse(buffer.slice(contextStart, contextStart + contextLength).toString('utf8').replace(/\0/g, '')) + } catch (err) { + console.log("2222222AVAR::readMetaFromBuffer ERRORORORORO") + // if its not parsable than assign an empty object + context = {} + } + + let dataSize = Envelop.getDataBufferSize(buffer) + + return { mainEvent, type, id, owner, recipient, tag, context, size: dataSize } } static readDataFromBuffer (buffer) { @@ -111,22 +142,28 @@ export default class Envelop { return null } + static getDataBufferSize(buffer) { + let metaLength = Envelop.getMetaLength(buffer) + return buffer.length - metaLength; + } + static fromBuffer (buffer) { - let { id, type, owner, recipient, tag, mainEvent } = Envelop.readMetaFromBuffer(buffer) - let envelop = new Envelop({ type, id, tag, owner, recipient, mainEvent }) + let { id, type, owner, recipient, tag, mainEvent, context } = Envelop.readMetaFromBuffer(buffer) + let envelop = new Envelop({ type, id, tag, owner, recipient, mainEvent, context }) let envelopData = Envelop.readDataFromBuffer(buffer) if (envelopData) { envelop.setData(envelopData) } + envelop.size = buffer.length return envelop } static stringToBuffer (str, encryption) { let strLength = Buffer.byteLength(str, encryption) let lengthBuffer = BufferAlloc(lengthSize) - lengthBuffer.writeInt8(strLength) + lengthBuffer.writeUInt16BE(strLength) let strBuffer = BufferAlloc(strLength) strBuffer.write(str, 0, strLength, encryption) return Buffer.concat([lengthBuffer, strBuffer]) @@ -135,8 +172,8 @@ export default class Envelop { static getMetaLength (buffer) { let length = 2 - _.each(_.range(4), () => { - length += lengthSize + buffer.readInt8(length) + _.each(_.range(5), () => { + length += lengthSize + buffer.readUInt16BE(length) }) return length @@ -165,6 +202,9 @@ export default class Envelop { let tagBuffer = Envelop.stringToBuffer(this.tag.toString(), 'utf-8') bufferArray.push(tagBuffer) + let contextBuffer = Envelop.stringToBuffer(JSON.stringify(this.context || {})) + bufferArray.push(contextBuffer) + if (this.data) { bufferArray.push(Parse.dataToBuffer(this.data)) } @@ -206,6 +246,14 @@ export default class Envelop { this.type = type } + getContext() { + return this.context + } + + setContext(context = {}) { + this.context = { ...this.context, ...context } + } + // ** data of envelop getData (data) { diff --git a/src/sockets/index.js b/src/sockets/index.js index ef6bf17..9fe3558 100644 --- a/src/sockets/index.js +++ b/src/sockets/index.js @@ -2,4 +2,3 @@ export { default as Router } from './router' export { default as Dealer } from './dealer' export { default as SocketEvent } from './events' export { default as Enum } from './enum' -export { default as Watchers } from './watchers' diff --git a/src/sockets/router.js b/src/sockets/router.js index f618e3a..07aacea 100644 --- a/src/sockets/router.js +++ b/src/sockets/router.js @@ -120,7 +120,7 @@ export default class RouterSocket extends Socket { return super.tick(envelop) } - getSocketMsg (envelop) { - return [envelop.getRecipient(), '', envelop.getBuffer()] + getSocketMsg (buffer, recipient) { + return [recipient, '', buffer] } } diff --git a/src/sockets/socket.js b/src/sockets/socket.js index ba6dede..ac8b862 100644 --- a/src/sockets/socket.js +++ b/src/sockets/socket.js @@ -1,13 +1,12 @@ import _ from 'underscore' import animal from 'animal-id' -import EventEmitter from 'pattern-emitter' +import { PatternEmitter } from '@sfast/pattern-emitter-ts' import { ZeronodeError, ErrorCodes } from '../errors' import SocketEvent from './events' import Envelop from './envelope' import { EnvelopType, MetricType, Timeouts } from './enum' -import Watchers from './watchers' let _private = new WeakMap() @@ -27,6 +26,7 @@ const nop = () => {} * * @param envelop: Object * @param type: Enum(-1 = timeout, 0 = send, 1 = got) + * @todo envelop arg should be replaced with envelopeMeta which can contain also metrics part */ function emitMetric (envelop, type = 0) { let event = '' @@ -51,6 +51,8 @@ function emitMetric (envelop, type = 0) { event = !type ? MetricType.SEND_REPLY_ERROR : MetricType.GOT_REPLY_ERROR } + // ** emitting metric event and envelop meta + // TODO should be enhanced to contain envelop.data.duration and envelop.size of data this.emit(event, envelop) } @@ -65,7 +67,7 @@ function buildSocketEventHandler (eventName) { return this::handler } -class Socket extends EventEmitter { +class Socket extends PatternEmitter { static generateSocketId () { return animal.getId() } @@ -91,13 +93,17 @@ class Socket extends EventEmitter { isDebugMode: false, monitorRestartInterval: null, requests: new Map(), - requestWatcherMap: { - main: new Map(), - custom: new Map() + // requestWatcherMap: { + // main: new PatternEmitter(), + // custom: new PatternEmitter() + // }, + requestEmitter: { + main: new PatternEmitter(), + custom: new PatternEmitter() }, tickEmitter: { - main: new EventEmitter(), - custom: new EventEmitter() + main: new PatternEmitter(), + custom: new PatternEmitter() } } @@ -153,7 +159,7 @@ class Socket extends EventEmitter { this.logger = logger || console } - debugMode (val) { + debugMode(val) { let _scope = _private.get(this) if (val) { _scope.isDebugMode = !!val @@ -162,7 +168,7 @@ class Socket extends EventEmitter { } } - request (envelop, reqTimeout) { + request(envelop, reqTimeout) { let { id, requests, metric, config } = _private.get(this) reqTimeout = reqTimeout || config.REQUEST_TIMEOUT || Timeouts.REQUEST_TIMEOUT @@ -187,7 +193,8 @@ class Socket extends EventEmitter { }, reqTimeout) requests.set(envelopId, { resolve: resolve, reject: reject, timeout: timeout, sendTime: process.hrtime() }) - this.sendEnvelop(envelop) + let envelopBuffer = envelop.getBuffer() + this.sendEnvelop(envelopBuffer) }) } @@ -198,13 +205,16 @@ class Socket extends EventEmitter { throw new ZeronodeError({ socketId, error: socketNotOnlineError, code: ErrorCodes.SOCKET_ISNOT_ONLINE }) } - this.sendEnvelop(envelop) + // console.log("Tick:: socketId=", socketId, envelop.toMetaJSON()); + let envelopBuffer = envelop.getBuffer() + this.sendEnvelop(envelopBuffer) } - sendEnvelop (envelop) { + sendEnvelop (envelopBuffer) { let { socket, metric } = _private.get(this) - let msg = this.getSocketMsg(envelop) - let envelopJSON = envelop.toJSON() + let envelopJSON = Envelop.readMetaFromBuffer(envelopBuffer) + // console.log("AVAR::LOG:: sendEnvelop envelopJSON = ", envelopJSON); + let msg = this.getSocketMsg(envelopBuffer, envelopJSON.recipient) if (msg instanceof Buffer) { envelopJSON.size = msg.length @@ -270,35 +280,17 @@ class Socket extends EventEmitter { } onRequest (endpoint, fn, main = false) { + if(!_.isFunction(fn)) return // ** function will called with argument request = {body, reply} - if (!(endpoint instanceof RegExp)) { - endpoint = endpoint.toString() - } - let { requestWatcherMap } = _private.get(this) - let watcherMap = main ? requestWatcherMap.main : requestWatcherMap.custom - - let requestWatcher = watcherMap.get(endpoint) - - if (!requestWatcher) { - requestWatcher = new Watchers(endpoint) - watcherMap.set(endpoint, requestWatcher) - } - - requestWatcher.addFn(fn) + let { requestEmitter } = _private.get(this) + main ? requestEmitter.main.on(endpoint, fn) : requestEmitter.custom.on(endpoint, fn) } offRequest (endpoint, fn, main = false) { - let { requestWatcherMap } = _private.get(this) - let watcherMap = main ? requestWatcherMap.main : requestWatcherMap.custom - - if (_.isFunction(fn)) { - let endpointWatcher = watcherMap.get(endpoint) - if (!endpointWatcher) return - endpointWatcher.removeFn(fn) - return - } + let { requestEmitter } = _private.get(this) + let eventRequestEmitter = main ? requestEmitter.main : requestEmitter.custom - watcherMap.delete(endpoint) + return _.isFunction(fn) ? eventRequestEmitter.removeListener(endpoint, fn) : eventRequestEmitter.removeAllListeners(endpoint) } onTick (event, fn, main = false) { @@ -310,12 +302,7 @@ class Socket extends EventEmitter { let { tickEmitter } = _private.get(this) let eventTickEmitter = main ? tickEmitter.main : tickEmitter.custom - if (_.isFunction(fn)) { - eventTickEmitter.removeListener(event, fn) - return - } - - eventTickEmitter.removeAllListeners(event) + return _.isFunction(fn) ? eventTickEmitter.removeListener(event, fn) : eventTickEmitter.removeAllListeners(event) } } @@ -326,18 +313,34 @@ class Socket extends EventEmitter { function onSocketMessage (empty, envelopBuffer) { let { metric, tickEmitter } = _private.get(this) - let { type, id, owner, recipient, tag, mainEvent } = Envelop.readMetaFromBuffer(envelopBuffer) - let envelop = new Envelop({ type, id, owner, recipient, tag, mainEvent }) - let envelopData = Envelop.readDataFromBuffer(envelopBuffer) - envelop.setData(envelopData) + // ** reading metadata of the envelope + let envelopMetaJSON = Envelop.readMetaFromBuffer(envelopBuffer) + // console.log("AVAR::LOG envelopMetaJSON" ,envelopMetaJSON); + let { type, id, owner, recipient, tag, mainEvent, context, size } = envelopMetaJSON + + // let dataBuffer = Envelop.getDataBuffer(buffer) + // return dataBuffer ? Parse.bufferToData(dataBuffer) : null + + // let envelop = Envelop.fromBuffer(envelopBuffer); + // let envelop = new Envelop({ type, id, owner, recipient, tag, mainEvent, context }) + // envelop.size = envelopBuffer.length + // let envelopData = Envelop.readDataFromBuffer(envelopBuffer) + // envelop.setData(envelopData) - let envelopJSON = envelop.toJSON() - envelopJSON.size = envelopBuffer.length + // console.log("AVAR::LOG this.getId()" , this.getId(), " recipient=", recipient); + + let envelop; switch (type) { case EnvelopType.TICK: - metric(envelopJSON, 1) + metric(envelopMetaJSON, 1) + + if (recipient && this.getId() !== recipient) { + // ** in this case we need to redirect it to the right place + return this::routeEnvelopeHandler(envelopBuffer); + } + let envelopData = Envelop.readDataFromBuffer(envelopBuffer) if (mainEvent) { tickEmitter.main.emit(tag, envelopData) } else { @@ -348,18 +351,30 @@ function onSocketMessage (empty, envelopBuffer) { } break case EnvelopType.REQUEST: - metric(envelopJSON, 1) + metric(envelopMetaJSON, 1) + if (recipient && this.getId() !== recipient) { + // ** in this case we need to redirect it to the right place + return this::routeEnvelopeHandler(envelopBuffer); + } + // ** if metric is enabled then emit it + envelop = Envelop.fromBuffer(envelopBuffer); this::syncEnvelopHandler(envelop) break case EnvelopType.RESPONSE: case EnvelopType.ERROR: - envelop.size = envelopBuffer.length + if (recipient && this.getId() !== recipient) { + // ** in this case we need to redirect it to the right place + return this::routeEnvelopeHandler(envelopBuffer); + } + + envelop = Envelop.fromBuffer(envelopBuffer); this::responseEnvelopHandler(envelop) break } } +// ** TODO here we are passing envelope without data buffer function syncEnvelopHandler (envelop) { let self = this let getTime = process.hrtime() @@ -379,16 +394,20 @@ function syncEnvelopHandler (envelop) { envelop.setRecipient(prevOwner) envelop.setOwner(self.getId()) envelop.setType(EnvelopType.RESPONSE) - envelop.setData({ getTime, replyTime: process.hrtime(), data: response }) - self.sendEnvelop(envelop) + envelop.setContext({ getTime, replyTime: process.hrtime() }) + envelop.setData({ data: response }) + + + self.sendEnvelop(envelop.getBuffer()) }, error: (err) => { envelop.setRecipient(prevOwner) envelop.setOwner(self.getId()) envelop.setType(EnvelopType.ERROR) - envelop.setData({ getTime, replyTime: process.hrtime(), data: err }) + envelop.setContext({ getTime, replyTime: process.hrtime() }); + envelop.setData({ data: err }) - self.sendEnvelop(envelop) + self.sendEnvelop(envelop.getBuffer()) }, next: (err) => { if (err) { @@ -410,29 +429,14 @@ function syncEnvelopHandler (envelop) { function determineHandlersByTag (tag, main = false) { let handlers = [] - let { requestWatcherMap } = _private.get(this) - let watcherMap = main ? requestWatcherMap.main : requestWatcherMap.custom + let { requestEmitter } = _private.get(this) + let requestPEmitter = main ? requestEmitter.main : requestEmitter.custom - for (let endpoint of watcherMap.keys()) { - if (endpoint instanceof RegExp) { - if (endpoint.test(tag)) { - watcherMap.get(endpoint).getFnMap().forEach((index, fnKey) => { - handlers.push({ index, fnKey }) - }) - } - } else if (endpoint === tag) { - watcherMap.get(endpoint).getFnMap().forEach((index, fnKey) => { - handlers.push({ index, fnKey }) - }) - } - } - - return handlers.sort((a, b) => { - return a.index - b.index - }).map((ob) => ob.fnKey) + return requestPEmitter.listenersByEventType(tag); } function responseEnvelopHandler (envelop) { + // console.log("AVAR::responseEnvelopHandler", envelop) let { requests, metric } = _private.get(this) let id = envelop.getId() @@ -446,18 +450,18 @@ function responseEnvelopHandler (envelop) { // ** getTime is the time when message arrives to server // ** replyTime is the time when message is send from server - let gotReplyMetric = envelop.toJSON() - let { getTime, replyTime } = gotReplyMetric.data + let envelopMeta = envelop.toMetaJSON() + // console.log("------AVAR::responseEnvelopHandler envelopMeta", envelopMeta) + let { getTime, replyTime } = envelopMeta.context let duration = _calculateLatency({ sendTime, getTime, replyTime, replyGetTime: process.hrtime() }) - gotReplyMetric.data = { - data: gotReplyMetric.data, + // ** TODO maybe router should aggregate in a different way + envelopMeta.context = { + ...envelopMeta.context, duration } - gotReplyMetric.size = envelop.size - - metric(gotReplyMetric, 1) + metric(envelopMeta, 1) clearTimeout(timeout) requests.delete(id) @@ -467,6 +471,41 @@ function responseEnvelopHandler (envelop) { envelop.getType() === EnvelopType.ERROR ? reject(data) : resolve(data) } +function routeEnvelopeHandler(envelopBuffer) { + // ** reading metadata of the envelope + let envelopMetaJSON = Envelop.readMetaFromBuffer(envelopBuffer) + + let { type, id, owner, recipient, tag, mainEvent, context, size } = envelopMetaJSON + + // let dataBuffer = Envelop.getDataBuffer(buffer) + // return dataBuffer ? Parse.bufferToData(dataBuffer) : null + + // let envelop = Envelop.fromBuffer(envelopBuffer); + // let envelop = new Envelop({ type, id, owner, recipient, tag, mainEvent, context }) + // envelop.size = envelopBuffer.length + // let envelopData = Envelop.readDataFromBuffer(envelopBuffer) + // envelop.setData(envelopData) + + // console.log("AVAR::LOG this.getId()" , this.getId(), " recipient=", recipient); + + // ----- + switch (type) { + case EnvelopType.TICK: + // TODO maybe we can update metric on router about the best behaving services + // metric(envelopMetaJSON, 1) + this.sendEnvelop(envelopBuffer); + break + case EnvelopType.REQUEST: + // TODO::AVAR ROUTING timeout of zeronode + this.sendEnvelop(envelopBuffer); + break + case EnvelopType.RESPONSE: + case EnvelopType.ERROR: + this.sendEnvelop(envelopBuffer); + break + } +} + // ** exports export { SocketEvent } export { Socket } diff --git a/src/sockets/watchers.js b/src/sockets/watchers.js deleted file mode 100644 index 579a645..0000000 --- a/src/sockets/watchers.js +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Created by avar on 7/11/17. - */ -import { isFunction } from 'underscore' - -let index = 1 - -export default class Watchers { - constructor (tag) { - this._tag = tag - this._fnMap = new Map() - } - - getFnMap () { - return this._fnMap - } - - addFn (fn) { - if (isFunction(fn)) { - this._fnMap.set(fn, index) - index++ - } - } - - removeFn (fn) { - if (isFunction(fn)) { - this._fnMap.delete(fn) - return - } - - this._fnMap.clear() - } -} diff --git a/test/client-server.js b/test/client-server.js index 56d40cb..a6bfa79 100644 --- a/test/client-server.js +++ b/test/client-server.js @@ -7,10 +7,9 @@ const address = 'tcp://127.0.0.1:5001' describe('Client/Server', () => { let client, server - beforeEach((done) => { - client = new Client({}) - server = new Server({}) - done() + beforeEach(async () => { + client = new Client() + server = new Server() }) afterEach(async () => { diff --git a/test/oneToOne.js b/test/oneToOne.js index 51dd6df..528fec0 100644 --- a/test/oneToOne.js +++ b/test/oneToOne.js @@ -74,7 +74,7 @@ describe('oneToOne, failures', () => { serverNode.offRequest('foo') await clientNode.request({ to: serverNode.getId(), event: 'foo', data: 'bar', timeout: 200 }) return Promise.reject('fail') - } catch (err) { + } catch (err) { assert.include(err.message, 'timeouted') } }) @@ -83,14 +83,11 @@ describe('oneToOne, failures', () => { try { await serverNode.bind() await clientNode.connect({ address: serverNode.getAddress() }) - let fooListener = ({ body, reply }) => { reply(body) } - serverNode.onRequest('foo', fooListener) serverNode.offRequest('foo', fooListener) - await clientNode.request({ to: serverNode.getId(), event: 'foo', data: 'bar', timeout: 200 }) return Promise.reject('fail') } catch (err) { @@ -156,7 +153,7 @@ describe('oneToOne, failures', () => { await clientNode.connect({ address: serverNode.getAddress() }) await clientNode.disconnect(serverNode.getAddress()) clientNode.tick({ to: serverNode.getId(), event: 'foo', data: 'bar' }) - } catch (err) { + } catch (err) { assert.equal(err.code, ErrorCodes.NODE_NOT_FOUND) } })