From cb28050e96956fb245979243f0cf520cd451d300 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Fri, 4 Mar 2022 14:18:37 +0800 Subject: [PATCH 01/16] Issue 354: Nodejs StreamManager Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .gitignore | 9 +- Cargo.toml | 3 +- nodejs/.prettierrc | 8 + nodejs/Cargo.toml | 33 ++ nodejs/README.md | 11 + nodejs/package-lock.json | 460 +++++++++++++++++++++++++++ nodejs/package.json | 34 ++ nodejs/src/lib.rs | 45 +++ nodejs/src/stream_manager.rs | 561 +++++++++++++++++++++++++++++++++ nodejs/src/util.rs | 26 ++ nodejs/stream_manager.ts | 210 ++++++++++++ nodejs/tests/stream_manager.ts | 60 ++++ nodejs/tsconfig.json | 10 + 13 files changed, 1468 insertions(+), 2 deletions(-) create mode 100644 nodejs/.prettierrc create mode 100644 nodejs/Cargo.toml create mode 100644 nodejs/README.md create mode 100644 nodejs/package-lock.json create mode 100644 nodejs/package.json create mode 100644 nodejs/src/lib.rs create mode 100644 nodejs/src/stream_manager.rs create mode 100644 nodejs/src/util.rs create mode 100644 nodejs/stream_manager.ts create mode 100644 nodejs/tests/stream_manager.ts create mode 100644 nodejs/tsconfig.json diff --git a/.gitignore b/.gitignore index 98991e2ac..d8cccf123 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,11 @@ pravega-*.tgz __pycache__/ book/book html/ -python_binding/.tox/ \ No newline at end of file +python_binding/.tox/ + +# These are temp files of Nodejs bindings +nodejs/target +nodejs/index.node +**/node_modules +**/.DS_Store +npm-debug.log* diff --git a/Cargo.toml b/Cargo.toml index ecbf219ad..951196c03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,8 @@ members = [ "config", "examples", "macros", - "pravegactl" + "pravegactl", + "nodejs" ] [dependencies] diff --git a/nodejs/.prettierrc b/nodejs/.prettierrc new file mode 100644 index 000000000..bc39e1a0e --- /dev/null +++ b/nodejs/.prettierrc @@ -0,0 +1,8 @@ +{ + "arrowParens": "avoid", + "tabWidth": 4, + "singleQuote": true, + "trailingComma": "es5", + "printWidth": 120, + "endOfLine": "auto" +} \ No newline at end of file diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml new file mode 100644 index 000000000..1e1bb4930 --- /dev/null +++ b/nodejs/Cargo.toml @@ -0,0 +1,33 @@ +[package] +authors = ["Pravega Community"] +categories = ["network-programming"] +description = "Pravega client" +edition = "2018" +exclude = ["index.node"] +keywords = ["streaming", "client", "pravega"] +license = "Apache-2.0" +name = "pravega_client" +readme = "README.md" +repository = "https://github.com/pravega/pravega-client-rust" +version = "0.1.0" + +[lib] +crate-type = ["cdylib"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures = "0.3" +pravega-client = {path = "../", version = "0.3"} +pravega-client-config = {path = "../config", version = "0.3"} +pravega-client-retry = {path = "../retry", version = "0.3"} +pravega-client-shared = {path = "../shared", version = "0.3"} +pravega-controller-client = {path = "../controller-client", version = "0.3"} +tracing = "0.1.17" +tracing-futures = "0.2.4" +tracing-subscriber = "0.2.2" + +[dependencies.neon] +default-features = false +features = ["napi-6"] +version = "0.9" diff --git a/nodejs/README.md b/nodejs/README.md new file mode 100644 index 000000000..93e726535 --- /dev/null +++ b/nodejs/README.md @@ -0,0 +1,11 @@ +# Pravega Nodejs client. + +This project provides a way to interact with [Pravega](http://pravega.io) with a Nodejs client. + +Pravega is an open source distributed storage service implementing Streams. It offers Stream as the main primitive for the foundation of reliable storage systems: a high-performance, durable, elastic, and unlimited append-only byte stream with strict ordering and consistency. + +This project supports interaction with Pravega for Nodejs versions 16. + +Only `StreamManager` is ready for use now. Example usage can be found in `./tests` and you may run it with `node --loader ts-node/esm tests/stream_manager.ts`. + +Everything else is WORKING IN PROGRESS! diff --git a/nodejs/package-lock.json b/nodejs/package-lock.json new file mode 100644 index 000000000..f44c16dba --- /dev/null +++ b/nodejs/package-lock.json @@ -0,0 +1,460 @@ +{ + "name": "pravega_client", + "version": "0.1.0", + "lockfileVersion": 2, + "requires": true, + "packages": { + "": { + "name": "pravega_client", + "version": "0.1.0", + "hasInstallScript": true, + "license": "Apache-2.0", + "dependencies": { + "chai": "^4.3.6" + }, + "devDependencies": { + "@types/node": "^17.0.21", + "cargo-cp-artifact": "^0.1", + "ts-node": "^10.6.0", + "typescript": "^4.6.2" + } + }, + "node_modules/@cspotcode/source-map-consumer": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", + "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", + "dev": true, + "engines": { + "node": ">= 12" + } + }, + "node_modules/@cspotcode/source-map-support": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", + "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", + "dev": true, + "dependencies": { + "@cspotcode/source-map-consumer": "0.8.0" + }, + "engines": { + "node": ">=12" + } + }, + "node_modules/@tsconfig/node10": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", + "dev": true + }, + "node_modules/@tsconfig/node12": { + "version": "1.0.9", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", + "dev": true + }, + "node_modules/@tsconfig/node14": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", + "dev": true + }, + "node_modules/@tsconfig/node16": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", + "dev": true + }, + "node_modules/@types/node": { + "version": "17.0.21", + "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.21.tgz", + "integrity": "sha512-DBZCJbhII3r90XbQxI8Y9IjjiiOGlZ0Hr32omXIZvwwZ7p4DMMXGrKXVyPfuoBOri9XNtL0UK69jYIBIsRX3QQ==", + "dev": true + }, + "node_modules/acorn": { + "version": "8.7.0", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.0.tgz", + "integrity": "sha512-V/LGr1APy+PXIwKebEWrkZPwoeoF+w1jiOBUmuxuiUIaOHtob8Qc9BTrYo7VuI5fR8tqsy+buA2WFooR5olqvQ==", + "dev": true, + "bin": { + "acorn": "bin/acorn" + }, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/acorn-walk": { + "version": "8.2.0", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", + "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", + "dev": true, + "engines": { + "node": ">=0.4.0" + } + }, + "node_modules/arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true + }, + "node_modules/assertion-error": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", + "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==", + "engines": { + "node": "*" + } + }, + "node_modules/cargo-cp-artifact": { + "version": "0.1.6", + "resolved": "https://registry.npmjs.org/cargo-cp-artifact/-/cargo-cp-artifact-0.1.6.tgz", + "integrity": "sha512-CQw0doK/aaF7j041666XzuilHxqMxaKkn+I5vmBsd8SAwS0cO5CqVEVp0xJwOKstyqWZ6WK4Ww3O6p26x/Goyg==", + "dev": true, + "bin": { + "cargo-cp-artifact": "bin/cargo-cp-artifact.js" + } + }, + "node_modules/chai": { + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.6.tgz", + "integrity": "sha512-bbcp3YfHCUzMOvKqsztczerVgBKSsEijCySNlHHbX3VG1nskvqjz5Rfso1gGwD6w6oOV3eI60pKuMOV5MV7p3Q==", + "dependencies": { + "assertion-error": "^1.1.0", + "check-error": "^1.0.2", + "deep-eql": "^3.0.1", + "get-func-name": "^2.0.0", + "loupe": "^2.3.1", + "pathval": "^1.1.1", + "type-detect": "^4.0.5" + }, + "engines": { + "node": ">=4" + } + }, + "node_modules/check-error": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", + "integrity": "sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=", + "engines": { + "node": "*" + } + }, + "node_modules/create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true + }, + "node_modules/deep-eql": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", + "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==", + "dependencies": { + "type-detect": "^4.0.0" + }, + "engines": { + "node": ">=0.12" + } + }, + "node_modules/diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true, + "engines": { + "node": ">=0.3.1" + } + }, + "node_modules/get-func-name": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.0.tgz", + "integrity": "sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=", + "engines": { + "node": "*" + } + }, + "node_modules/loupe": { + "version": "2.3.4", + "resolved": "https://registry.npmjs.org/loupe/-/loupe-2.3.4.tgz", + "integrity": "sha512-OvKfgCC2Ndby6aSTREl5aCCPTNIzlDfQZvZxNUrBrihDhL3xcrYegTblhmEiCrg2kKQz4XsFIaemE5BF4ybSaQ==", + "dependencies": { + "get-func-name": "^2.0.0" + } + }, + "node_modules/make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true + }, + "node_modules/pathval": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.1.tgz", + "integrity": "sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==", + "engines": { + "node": "*" + } + }, + "node_modules/ts-node": { + "version": "10.6.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.6.0.tgz", + "integrity": "sha512-CJen6+dfOXolxudBQXnVjRVvYTmTWbyz7cn+xq2XTsvnaXbHqr4gXSCNbS2Jj8yTZMuGwUoBESLaOkLascVVvg==", + "dev": true, + "dependencies": { + "@cspotcode/source-map-support": "0.7.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.0", + "yn": "3.1.1" + }, + "bin": { + "ts-node": "dist/bin.js", + "ts-node-cwd": "dist/bin-cwd.js", + "ts-node-script": "dist/bin-script.js", + "ts-node-transpile-only": "dist/bin-transpile.js", + "ts-script": "dist/bin-script-deprecated.js" + }, + "peerDependencies": { + "@swc/core": ">=1.2.50", + "@swc/wasm": ">=1.2.50", + "@types/node": "*", + "typescript": ">=2.7" + }, + "peerDependenciesMeta": { + "@swc/core": { + "optional": true + }, + "@swc/wasm": { + "optional": true + } + } + }, + "node_modules/type-detect": { + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", + "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", + "engines": { + "node": ">=4" + } + }, + "node_modules/typescript": { + "version": "4.6.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.6.2.tgz", + "integrity": "sha512-HM/hFigTBHZhLXshn9sN37H085+hQGeJHJ/X7LpBWLID/fbc2acUMfU+lGD98X81sKP+pFa9f0DZmCwB9GnbAg==", + "dev": true, + "bin": { + "tsc": "bin/tsc", + "tsserver": "bin/tsserver" + }, + "engines": { + "node": ">=4.2.0" + } + }, + "node_modules/v8-compile-cache-lib": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz", + "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==", + "dev": true + }, + "node_modules/yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true, + "engines": { + "node": ">=6" + } + } + }, + "dependencies": { + "@cspotcode/source-map-consumer": { + "version": "0.8.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-consumer/-/source-map-consumer-0.8.0.tgz", + "integrity": "sha512-41qniHzTU8yAGbCp04ohlmSrZf8bkf/iJsl3V0dRGsQN/5GFfx+LbCSsCpp2gqrqjTVg/K6O8ycoV35JIwAzAg==", + "dev": true + }, + "@cspotcode/source-map-support": { + "version": "0.7.0", + "resolved": "https://registry.npmjs.org/@cspotcode/source-map-support/-/source-map-support-0.7.0.tgz", + "integrity": "sha512-X4xqRHqN8ACt2aHVe51OxeA2HjbcL4MqFqXkrmQszJ1NOUuUu5u6Vqx/0lZSVNku7velL5FC/s5uEAj1lsBMhA==", + "dev": true, + "requires": { + "@cspotcode/source-map-consumer": "0.8.0" + } + }, + "@tsconfig/node10": { + "version": "1.0.8", + "resolved": "https://registry.npmjs.org/@tsconfig/node10/-/node10-1.0.8.tgz", + "integrity": "sha512-6XFfSQmMgq0CFLY1MslA/CPUfhIL919M1rMsa5lP2P097N2Wd1sSX0tx1u4olM16fLNhtHZpRhedZJphNJqmZg==", + "dev": true + }, + "@tsconfig/node12": { + "version": "1.0.9", + "resolved": "https://registry.npmjs.org/@tsconfig/node12/-/node12-1.0.9.tgz", + "integrity": "sha512-/yBMcem+fbvhSREH+s14YJi18sp7J9jpuhYByADT2rypfajMZZN4WQ6zBGgBKp53NKmqI36wFYDb3yaMPurITw==", + "dev": true + }, + "@tsconfig/node14": { + "version": "1.0.1", + "resolved": "https://registry.npmjs.org/@tsconfig/node14/-/node14-1.0.1.tgz", + "integrity": "sha512-509r2+yARFfHHE7T6Puu2jjkoycftovhXRqW328PDXTVGKihlb1P8Z9mMZH04ebyajfRY7dedfGynlrFHJUQCg==", + "dev": true + }, + "@tsconfig/node16": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/@tsconfig/node16/-/node16-1.0.2.tgz", + "integrity": "sha512-eZxlbI8GZscaGS7kkc/trHTT5xgrjH3/1n2JDwusC9iahPKWMRvRjJSAN5mCXviuTGQ/lHnhvv8Q1YTpnfz9gA==", + "dev": true + }, + "@types/node": { + "version": "17.0.21", + "resolved": "https://registry.npmjs.org/@types/node/-/node-17.0.21.tgz", + "integrity": "sha512-DBZCJbhII3r90XbQxI8Y9IjjiiOGlZ0Hr32omXIZvwwZ7p4DMMXGrKXVyPfuoBOri9XNtL0UK69jYIBIsRX3QQ==", + "dev": true + }, + "acorn": { + "version": "8.7.0", + "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.7.0.tgz", + "integrity": "sha512-V/LGr1APy+PXIwKebEWrkZPwoeoF+w1jiOBUmuxuiUIaOHtob8Qc9BTrYo7VuI5fR8tqsy+buA2WFooR5olqvQ==", + "dev": true + }, + "acorn-walk": { + "version": "8.2.0", + "resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz", + "integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==", + "dev": true + }, + "arg": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/arg/-/arg-4.1.3.tgz", + "integrity": "sha512-58S9QDqG0Xx27YwPSt9fJxivjYl432YCwfDMfZ+71RAqUrZef7LrKQZ3LHLOwCS4FLNBplP533Zx895SeOCHvA==", + "dev": true + }, + "assertion-error": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", + "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==" + }, + "cargo-cp-artifact": { + "version": "0.1.6", + "resolved": "https://registry.npmjs.org/cargo-cp-artifact/-/cargo-cp-artifact-0.1.6.tgz", + "integrity": "sha512-CQw0doK/aaF7j041666XzuilHxqMxaKkn+I5vmBsd8SAwS0cO5CqVEVp0xJwOKstyqWZ6WK4Ww3O6p26x/Goyg==", + "dev": true + }, + "chai": { + "version": "4.3.6", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.6.tgz", + "integrity": "sha512-bbcp3YfHCUzMOvKqsztczerVgBKSsEijCySNlHHbX3VG1nskvqjz5Rfso1gGwD6w6oOV3eI60pKuMOV5MV7p3Q==", + "requires": { + "assertion-error": "^1.1.0", + "check-error": "^1.0.2", + "deep-eql": "^3.0.1", + "get-func-name": "^2.0.0", + "loupe": "^2.3.1", + "pathval": "^1.1.1", + "type-detect": "^4.0.5" + } + }, + "check-error": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/check-error/-/check-error-1.0.2.tgz", + "integrity": "sha1-V00xLt2Iu13YkS6Sht1sCu1KrII=" + }, + "create-require": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/create-require/-/create-require-1.1.1.tgz", + "integrity": "sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==", + "dev": true + }, + "deep-eql": { + "version": "3.0.1", + "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", + "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==", + "requires": { + "type-detect": "^4.0.0" + } + }, + "diff": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/diff/-/diff-4.0.2.tgz", + "integrity": "sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==", + "dev": true + }, + "get-func-name": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/get-func-name/-/get-func-name-2.0.0.tgz", + "integrity": "sha1-6td0q+5y4gQJQzoGY2YCPdaIekE=" + }, + "loupe": { + "version": "2.3.4", + "resolved": "https://registry.npmjs.org/loupe/-/loupe-2.3.4.tgz", + "integrity": "sha512-OvKfgCC2Ndby6aSTREl5aCCPTNIzlDfQZvZxNUrBrihDhL3xcrYegTblhmEiCrg2kKQz4XsFIaemE5BF4ybSaQ==", + "requires": { + "get-func-name": "^2.0.0" + } + }, + "make-error": { + "version": "1.3.6", + "resolved": "https://registry.npmjs.org/make-error/-/make-error-1.3.6.tgz", + "integrity": "sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==", + "dev": true + }, + "pathval": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/pathval/-/pathval-1.1.1.tgz", + "integrity": "sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==" + }, + "ts-node": { + "version": "10.6.0", + "resolved": "https://registry.npmjs.org/ts-node/-/ts-node-10.6.0.tgz", + "integrity": "sha512-CJen6+dfOXolxudBQXnVjRVvYTmTWbyz7cn+xq2XTsvnaXbHqr4gXSCNbS2Jj8yTZMuGwUoBESLaOkLascVVvg==", + "dev": true, + "requires": { + "@cspotcode/source-map-support": "0.7.0", + "@tsconfig/node10": "^1.0.7", + "@tsconfig/node12": "^1.0.7", + "@tsconfig/node14": "^1.0.0", + "@tsconfig/node16": "^1.0.2", + "acorn": "^8.4.1", + "acorn-walk": "^8.1.1", + "arg": "^4.1.0", + "create-require": "^1.1.0", + "diff": "^4.0.1", + "make-error": "^1.1.1", + "v8-compile-cache-lib": "^3.0.0", + "yn": "3.1.1" + } + }, + "type-detect": { + "version": "4.0.8", + "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", + "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==" + }, + "typescript": { + "version": "4.6.2", + "resolved": "https://registry.npmjs.org/typescript/-/typescript-4.6.2.tgz", + "integrity": "sha512-HM/hFigTBHZhLXshn9sN37H085+hQGeJHJ/X7LpBWLID/fbc2acUMfU+lGD98X81sKP+pFa9f0DZmCwB9GnbAg==", + "dev": true + }, + "v8-compile-cache-lib": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/v8-compile-cache-lib/-/v8-compile-cache-lib-3.0.0.tgz", + "integrity": "sha512-mpSYqfsFvASnSn5qMiwrr4VKfumbPyONLCOPmsR3A6pTY/r0+tSaVbgPWSAIuzbk3lCTa+FForeTiO+wBQGkjA==", + "dev": true + }, + "yn": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/yn/-/yn-3.1.1.tgz", + "integrity": "sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==", + "dev": true + } + } +} diff --git a/nodejs/package.json b/nodejs/package.json new file mode 100644 index 000000000..d772a4d16 --- /dev/null +++ b/nodejs/package.json @@ -0,0 +1,34 @@ +{ + "name": "pravega_client", + "version": "0.1.0", + "description": "Pravega client", + "main": "index.node", + "scripts": { + "build": "cargo-cp-artifact -nc index.node -- cargo build --message-format=json-render-diagnostics", + "build-debug": "npm run build --", + "build-release": "npm run build -- --release", + "install": "npm run build-release", + "test": "cargo test" + }, + "author": "Pravega Community", + "license": "Apache-2.0", + "devDependencies": { + "@types/node": "^17.0.21", + "cargo-cp-artifact": "^0.1", + "ts-node": "^10.6.0", + "typescript": "^4.6.2", + "chai": "*" + }, + "repository": { + "type": "git", + "url": "git+https://github.com/pravega/pravega-client-rust.git" + }, + "keywords": [ + "streaming" + ], + "bugs": { + "url": "https://github.com/pravega/pravega-client-rust/issues" + }, + "homepage": "https://github.com/pravega/pravega-client-rust#readme", + "type": "module" +} diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs new file mode 100644 index 000000000..4aeaeb5cb --- /dev/null +++ b/nodejs/src/lib.rs @@ -0,0 +1,45 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod stream_manager; +pub mod util; + +use neon::prelude::*; +use stream_manager::{StreamManager, StreamRetentionPolicy, StreamScalingPolicy}; + +fn hello(mut cx: FunctionContext) -> JsResult { + let result = 42; + Ok(cx.number(result)) +} + +#[neon::main] +fn main(mut cx: ModuleContext) -> NeonResult<()> { + cx.export_function("hello", hello)?; + cx.export_function("StreamManagerNew", StreamManager::js_new)?; + cx.export_function("StreamManagerCreateScope", StreamManager::js_create_scope)?; + cx.export_function("StreamManagerDeleteScope", StreamManager::js_delete_scope)?; + cx.export_function("StreamManagerListScopes", StreamManager::js_list_scopes)?; + cx.export_function("StreamRetentionPolicyNone", StreamRetentionPolicy::js_none)?; + cx.export_function("StreamRetentionPolicyBySize", StreamRetentionPolicy::js_by_size)?; + cx.export_function("StreamRetentionPolicyByTime", StreamRetentionPolicy::js_by_time)?; + cx.export_function("StreamScalingPolicyFixed", StreamScalingPolicy::js_fixed_scaling_policy)?; + cx.export_function("StreamScalingPolicyByDataRate", StreamScalingPolicy::js_auto_scaling_policy_by_data_rate)?; + cx.export_function("StreamScalingPolicyByEventRate", StreamScalingPolicy::js_auto_scaling_policy_by_event_rate)?; + cx.export_function("StreamManagerCreateStreamWithPolicy", StreamManager::js_create_stream_with_policy)?; + cx.export_function("StreamManagerUpdateStreamWithPolicy", StreamManager::js_update_stream_with_policy)?; + cx.export_function("StreamManagerGetStreamTags", StreamManager::js_get_stream_tags)?; + cx.export_function("StreamManagerSealStream", StreamManager::js_seal_stream)?; + cx.export_function("StreamManagerDeleteStream", StreamManager::js_delete_stream)?; + cx.export_function("StreamManagerListStreams", StreamManager::js_list_streams)?; + Ok(()) +} diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs new file mode 100644 index 000000000..30a6aa1e4 --- /dev/null +++ b/nodejs/src/stream_manager.rs @@ -0,0 +1,561 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::util::js_log; +use neon::prelude::*; +use pravega_client::client_factory::ClientFactory; +use pravega_client_config::{ClientConfig, ClientConfigBuilder}; +use pravega_client_retry::retry_result::RetryError; +use pravega_client_shared::*; +use pravega_controller_client::ControllerError; +use tracing::info; + +// An internal rust struct that holds the necessary info to perform actions on StreamManager. +// The `js_new` method will return a boxed(wrapped) StreamManager as an external object. +pub(crate) struct StreamManager { + controller_ip: String, + cf: ClientFactory, + config: ClientConfig, +} + +pub(crate) struct StreamRetentionPolicy { + retention: Retention, +} + +impl Finalize for StreamRetentionPolicy {} + +impl StreamRetentionPolicy { + pub fn js_none(mut cx: FunctionContext) -> JsResult> { + let stream_retention_policy = StreamRetentionPolicy { + retention: Default::default(), + }; + + Ok(cx.boxed(stream_retention_policy)) + } + + pub fn js_by_size(mut cx: FunctionContext) -> JsResult> { + let size_in_bytes = cx.argument::(0)?.value(&mut cx) as i64; + + let stream_retention_policy = StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Size, + retention_param: size_in_bytes, + }, + }; + + Ok(cx.boxed(stream_retention_policy)) + } + + pub fn js_by_time(mut cx: FunctionContext) -> JsResult> { + let time_in_millis = cx.argument::(0)?.value(&mut cx) as i64; + + let stream_retention_policy = StreamRetentionPolicy { + retention: Retention { + retention_type: RetentionType::Time, + retention_param: time_in_millis, + }, + }; + + Ok(cx.boxed(stream_retention_policy)) + } +} + +pub(crate) struct StreamScalingPolicy { + scaling: Scaling, +} + +impl Finalize for StreamScalingPolicy {} + +impl StreamScalingPolicy { + pub fn js_fixed_scaling_policy(mut cx: FunctionContext) -> JsResult> { + let initial_segments = cx.argument::(0)?.value(&mut cx) as i32; + + let stream_scaling_policy = StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::FixedNumSegments, + target_rate: 0, + scale_factor: 0, + min_num_segments: initial_segments, + }, + }; + + Ok(cx.boxed(stream_scaling_policy)) + } + + pub fn js_auto_scaling_policy_by_data_rate( + mut cx: FunctionContext, + ) -> JsResult> { + let target_rate_kbytes_per_sec = cx.argument::(0)?.value(&mut cx) as i32; + let scale_factor = cx.argument::(1)?.value(&mut cx) as i32; + let initial_segments = cx.argument::(2)?.value(&mut cx) as i32; + + let stream_scaling_policy = StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInKbytesPerSec, + target_rate: target_rate_kbytes_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + }; + + Ok(cx.boxed(stream_scaling_policy)) + } + + pub fn js_auto_scaling_policy_by_event_rate( + mut cx: FunctionContext, + ) -> JsResult> { + let target_events_per_sec = cx.argument::(0)?.value(&mut cx) as i32; + let scale_factor = cx.argument::(1)?.value(&mut cx) as i32; + let initial_segments = cx.argument::(2)?.value(&mut cx) as i32; + + let stream_scaling_policy = StreamScalingPolicy { + scaling: Scaling { + scale_type: ScaleType::ByRateInEventsPerSec, + target_rate: target_events_per_sec, + scale_factor, + min_num_segments: initial_segments, + }, + }; + + Ok(cx.boxed(stream_scaling_policy)) + } +} + +impl Finalize for StreamManager {} + +// The implementation of the pure Rust client call. +impl StreamManager { + fn new( + controller_uri: &str, + auth_enabled: bool, + tls_enabled: bool, + disable_cert_verification: bool, + ) -> Self { + let mut builder = ClientConfigBuilder::default(); + + builder + .controller_uri(controller_uri) + .is_auth_enabled(auth_enabled); + if tls_enabled { + builder.is_tls_enabled(tls_enabled); + builder.disable_cert_verification(disable_cert_verification); + } + let config = builder.build().expect("creating config"); + let client_factory = ClientFactory::new(config.clone()); + + StreamManager { + controller_ip: controller_uri.to_string(), + cf: client_factory, + config, + } + } + + /// + /// Create a Scope in Pravega. + /// + fn create_scope(&self, scope_name: &str) -> Result> { + let handle = self.cf.runtime_handle(); + info!("creating scope {:?}", scope_name); + + let controller = self.cf.controller_client(); + let scope_name = Scope::from(scope_name.to_string()); + + // TODO: async? + handle.block_on(controller.create_scope(&scope_name)) + } + + /// + /// Delete a Scope in Pravega. + /// + fn delete_scope(&self, scope_name: &str) -> Result> { + let handle = self.cf.runtime_handle(); + info!("Delete scope {:?}", scope_name); + + let controller = self.cf.controller_client(); + let scope_name = Scope::from(scope_name.to_string()); + + handle.block_on(controller.delete_scope(&scope_name)) + } + + /// + /// List Scopes in Pravega. + /// + fn list_scopes(&self) -> Vec { + use futures::stream::StreamExt; + use pravega_controller_client::paginator::list_scopes; + + let handle = self.cf.runtime_handle(); + info!("List scopes"); + + let controller = self.cf.controller_client(); + + handle.block_on(async { + let scope = list_scopes(controller); + scope.map(|str| str.unwrap()).collect::>().await + }) + } + + /// + /// Create a Stream in Pravega. + /// + pub fn create_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: Scaling, + retention_policy: Retention, + tags: Option>, + ) -> Result> { + let handle = self.cf.runtime_handle(); + info!( + "creating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy, retention_policy, tags + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }, + scaling: scaling_policy, + retention: retention_policy, + tags, + }; + let controller = self.cf.controller_client(); + + handle.block_on(controller.create_stream(&stream_cfg)) + } + + /// + /// Update Stream Configuration in Pravega + /// + pub fn update_stream_with_policy( + &self, + scope_name: &str, + stream_name: &str, + scaling_policy: Scaling, + retention_policy: Retention, + tags: Option>, + ) -> Result> { + let handle = self.cf.runtime_handle(); + info!( + "updating stream {:?} under scope {:?} with scaling policy {:?}, retention policy {:?} and tags {:?}", + stream_name, scope_name, scaling_policy, retention_policy, tags + ); + let stream_cfg = StreamConfiguration { + scoped_stream: ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }, + scaling: scaling_policy, + retention: retention_policy, + tags, + }; + let controller = self.cf.controller_client(); + + handle.block_on(controller.update_stream(&stream_cfg)) + } + + /// + /// Get Stream tags from Pravega. + /// + pub fn get_stream_tags( + &self, + scope_name: &str, + stream_name: &str, + ) -> Result>, RetryError> { + let handle = self.cf.runtime_handle(); + info!( + "fetch tags for stream {:?} under scope {:?}", + stream_name, scope_name, + ); + let stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + let controller = self.cf.controller_client(); + + let stream_configuration = handle.block_on(controller.get_stream_configuration(&stream)); + match stream_configuration { + Ok(val) => Ok(val.tags), + Err(err) => Err(err), + } + } + + /// + /// Seal a Stream in Pravega. + /// + pub fn seal_stream( + &self, + scope_name: &str, + stream_name: &str, + ) -> Result> { + let handle = self.cf.runtime_handle(); + info!("Sealing stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + + let controller = self.cf.controller_client(); + + handle.block_on(controller.seal_stream(&scoped_stream)) + } + + /// + /// Delete a Stream in Pravega. + /// + pub fn delete_stream( + &self, + scope_name: &str, + stream_name: &str, + ) -> Result> { + let handle = self.cf.runtime_handle(); + info!("Deleting stream {:?} under scope {:?} ", stream_name, scope_name); + let scoped_stream = ScopedStream { + scope: Scope::from(scope_name.to_string()), + stream: Stream::from(stream_name.to_string()), + }; + + let controller = self.cf.controller_client(); + handle.block_on(controller.delete_stream(&scoped_stream)) + } + + /// + /// List Streams in Pravega. + /// + fn list_streams(&self, scope_name: &str) -> Vec { + use futures::stream::StreamExt; + use pravega_controller_client::paginator::list_streams; + + let handle = self.cf.runtime_handle(); + info!("List streams of {:?}", scope_name); + + let controller = self.cf.controller_client(); + + handle.block_on(async { + let stream = list_streams( + Scope { + name: scope_name.to_string(), + }, + controller, + ); + stream + .map(|str| str.unwrap()) + .collect::>() + .await + }) + } +} + +// The implementation of the JavaScript call and parameters cast. +impl StreamManager { + pub fn js_new(mut cx: FunctionContext) -> JsResult> { + let controller_uri = cx.argument::(0)?.value(&mut cx); + let auth_enabled = cx.argument::(1)?.value(&mut cx); + let tls_enabled = cx.argument::(2)?.value(&mut cx); + let disable_cert_verification = cx.argument::(3)?.value(&mut cx); + + let stream_manager = StreamManager::new( + &controller_uri.to_string(), + auth_enabled, + tls_enabled, + disable_cert_verification, + ); + + Ok(cx.boxed(stream_manager)) + } + + pub fn js_create_scope(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + + let scope_result = stream_manager.create_scope(&scope_name.to_string()); + match scope_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_delete_scope(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + + let scope_result = stream_manager.delete_scope(&scope_name.to_string()); + match scope_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_list_scopes(mut cx: FunctionContext) -> JsResult { + use std::convert::TryInto; + + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + + let scopes = stream_manager.list_scopes(); + let scopes_length: u32 = match scopes.len().try_into() { + Ok(val) => val, + Err(_err) => return cx.throw_error("Too many scopes"), + }; + + let array: Handle = JsArray::new(&mut cx, scopes_length); + for (pos, e) in scopes.iter().enumerate() { + let scope_name = cx.string(e.name.clone()); + array.set(&mut cx, pos as u32, scope_name)?; + } + Ok(array) + } + + pub fn js_create_stream_with_policy(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + let stream_name = cx.argument::(1)?.value(&mut cx); + let retention_policy = cx.argument::>(2)?; + let scaling_policy = cx.argument::>(3)?; + // Wonderful example for how to read and cast string[] to Vec. + // https://github.com/neon-bindings/neon/issues/613 + let tags = cx + .argument::(4)? + .to_vec(&mut cx)? + .into_iter() + .map(|v| { + v.downcast_or_throw::(&mut cx) + .map(|v| v.value(&mut cx)) + }) + .collect::, _>>()?; + + let stream_result = stream_manager.create_stream_with_policy( + &scope_name.to_string(), + &stream_name.to_string(), + // TODO: Handle> auto dereferencing won't work for this (no idea why), + // so manually getting the internal scaling and retention is required. + // https://docs.rs/neon/latest/neon/types/struct.JsBox.html#deref-behavior + scaling_policy.scaling.clone(), + retention_policy.retention.clone(), + match tags.len() { + l if l <= 0 => None, + _ => Some(tags), + }, + ); + match stream_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_update_stream_with_policy(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + let stream_name = cx.argument::(1)?.value(&mut cx); + let retention_policy = cx.argument::>(2)?; + let scaling_policy = cx.argument::>(3)?; + let tags = cx + .argument::(4)? + .to_vec(&mut cx)? + .into_iter() + .map(|v| { + v.downcast_or_throw::(&mut cx) + .map(|v| v.value(&mut cx)) + }) + .collect::, _>>()?; + + let stream_result = stream_manager.update_stream_with_policy( + &scope_name.to_string(), + &stream_name.to_string(), + scaling_policy.scaling.clone(), + retention_policy.retention.clone(), + match tags.len() { + l if l <= 0 => None, + _ => Some(tags), + }, + ); + match stream_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_get_stream_tags(mut cx: FunctionContext) -> JsResult { + use std::convert::TryInto; + + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + let stream_name = cx.argument::(1)?.value(&mut cx); + + let tags = match stream_manager.get_stream_tags(&scope_name.to_string(), &stream_name.to_string()) { + Ok(val) => match val { + Some(val) => val, + None => return Ok(cx.empty_array()), + }, + Err(_err) => { + // TODO: display the error details. + return cx.throw_error("Internal controller error when getting the StreamConfiguration"); + } + }; + let tags_length: u32 = match tags.len().try_into() { + Ok(val) => val, + Err(_err) => return cx.throw_error("Too many tags"), + }; + + let array: Handle = JsArray::new(&mut cx, tags_length); + for (pos, e) in tags.iter().enumerate() { + let tag_name = cx.string(e.clone()); + array.set(&mut cx, pos as u32, tag_name)?; + } + Ok(array) + } + + pub fn js_seal_stream(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + let stream_name = cx.argument::(1)?.value(&mut cx); + + let scope_result = stream_manager.seal_stream(&scope_name.to_string(), &stream_name.to_string()); + match scope_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_delete_stream(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + let stream_name = cx.argument::(1)?.value(&mut cx); + + let scope_result = stream_manager.delete_stream(&scope_name.to_string(), &stream_name.to_string()); + match scope_result { + Ok(t) => Ok(cx.boolean(t)), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_list_streams(mut cx: FunctionContext) -> JsResult { + use std::convert::TryInto; + + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let scope_name = cx.argument::(0)?.value(&mut cx); + + let streams = stream_manager.list_streams(&scope_name.to_string()); + let streams_length: u32 = match streams.len().try_into() { + Ok(val) => val, + Err(_err) => return cx.throw_error("Too many streams"), + }; + + let array: Handle = JsArray::new(&mut cx, streams_length); + for (pos, e) in streams.iter().enumerate() { + let stream_name = cx.string(e.stream.name.clone()); + array.set(&mut cx, pos as u32, stream_name)?; + } + Ok(array) + } +} diff --git a/nodejs/src/util.rs b/nodejs/src/util.rs new file mode 100644 index 000000000..e4720bf49 --- /dev/null +++ b/nodejs/src/util.rs @@ -0,0 +1,26 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use neon::prelude::*; + +pub fn js_log<'a, C: Context<'a>>(cx: &mut C, s: String) -> NeonResult<()> { + let global = cx.global(); + let console = global.get(cx, "console")?.downcast_or_throw::(cx)?; + let log = console.get(cx, "log")?.downcast_or_throw::(cx)?; + let null = cx.null(); + + let args: Vec> = vec![cx.string(s)]; + log.call(cx, null, args)?; + + Ok(()) +} diff --git a/nodejs/stream_manager.ts b/nodejs/stream_manager.ts new file mode 100644 index 000000000..649ff7c3d --- /dev/null +++ b/nodejs/stream_manager.ts @@ -0,0 +1,210 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Native modules are not currently supported with ES module imports. +// https://nodejs.org/api/esm.html#esm_no_native_module_loading +import { createRequire } from 'module'; +const require = createRequire(import.meta.url); + +const { + StreamManagerNew, + StreamManagerCreateScope, + StreamManagerDeleteScope, + StreamManagerListScopes, + StreamRetentionPolicyNone, + StreamRetentionPolicyBySize, + StreamRetentionPolicyByTime, + StreamScalingPolicyFixed, + StreamScalingPolicyByDataRate, + StreamScalingPolicyByEventRate, + StreamManagerCreateStreamWithPolicy, + StreamManagerUpdateStreamWithPolicy, + StreamManagerGetStreamTags, + StreamManagerSealStream, + StreamManagerDeleteStream, + StreamManagerListStreams, +} = require('./index.node'); + +export class StreamRetentionPolicy { + static none = (): StreamRetentionPolicy => StreamRetentionPolicyNone(); + static by_size = (size_in_bytes: number): StreamRetentionPolicy => StreamRetentionPolicyBySize(size_in_bytes); + static by_time = (time_in_millis: number): StreamRetentionPolicy => StreamRetentionPolicyByTime(time_in_millis); +} + +export class StreamScalingPolicy { + static fixed_scaling_policy = (initial_segments: number): StreamScalingPolicy => + StreamScalingPolicyFixed(initial_segments); + static auto_scaling_policy_by_data_rate = ( + target_rate_kbytes_per_sec: number, + scale_factor: number, + initial_segments: number + ): StreamScalingPolicy => StreamScalingPolicyByDataRate(target_rate_kbytes_per_sec, scale_factor, initial_segments); + static auto_scaling_policy_by_event_rate = ( + target_events_per_sec: number, + scale_factor: number, + initial_segments: number + ): StreamScalingPolicy => StreamScalingPolicyByEventRate(target_events_per_sec, scale_factor, initial_segments); +} + +/** + * Create a StreamManager by providing a controller uri. + * + * ```typescript + * const stream_manager = new StreamManger('tcp://127.0.0.1:9090', false, false, true); + * ``` + * + * Optionally enable tls support using tls:// scheme. + * + * ```typescript + * const stream_manager = new StreamManger('tls://127.0.0.1:9090', false, false, true); + * ``` + */ +export class StreamManger { + // TODO: represent this object other than `StreamManger { StreamManger: [External: 676ee80] }` + StreamManger: StreamManger; + + constructor( + controller_uri: string, + auth_enabled: boolean = false, + tls_enabled: boolean = false, + disable_cert_verification: boolean = true + ) { + this.StreamManger = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification); + } + + /** + * Create a Pravega scope. + * + * @param scope_name The scope name. + * @returns The scope creation result. `false` indicates that the scope exists before creation. + */ + create_scope(scope_name: string): boolean { + return StreamManagerCreateScope.call(this.StreamManger, scope_name); + } + + /** + * Delete a Pravega scope. + * + * @param scope_name The scope name. + * @returns The scope deletion result. + */ + delete_scope(scope_name: string): boolean { + return StreamManagerDeleteScope.call(this.StreamManger, scope_name); + } + + /** + * List all scopes in Pravega. + * + * @returns All scope names. + */ + list_scopes(): string[] { + return StreamManagerListScopes.call(this.StreamManger); + } + + /** + * Create a stream with or without specific policy in Pravega. + * + * @param scope_name The scope name. + * @param stream_name The stream name. + * @param scaling_policy The scaling policy. + * @param retention_policy The retention policy. + * @param tags The stream tags. + * @returns The stream creation result. `false` indicates that the stream exists before creation. + */ + create_stream( + scope_name: string, + stream_name: string, + scaling_policy: StreamScalingPolicy = StreamRetentionPolicy.none(), + retention_policy: StreamRetentionPolicy = StreamScalingPolicy.fixed_scaling_policy(1), + tags: string[] = [] + ): boolean { + return StreamManagerCreateStreamWithPolicy.call( + this.StreamManger, + scope_name, + stream_name, + scaling_policy, + retention_policy, + tags + ); + } + + /** + * Update a Pravega stream with new policies and tags. + * + * @param scope_name The scope name. + * @param stream_name The stream name. + * @param scaling_policy The scaling policy. + * @param retention_policy The retention policy. + * @param tags The stream tags. + * @returns The stream update result. + */ + update_stream( + scope_name: string, + stream_name: string, + scaling_policy: StreamScalingPolicy = StreamRetentionPolicy.none(), + retention_policy: StreamRetentionPolicy = StreamScalingPolicy.fixed_scaling_policy(1), + tags: string[] = [] + ): boolean { + return StreamManagerUpdateStreamWithPolicy.call( + this.StreamManger, + scope_name, + stream_name, + scaling_policy, + retention_policy, + tags + ); + } + + /** + * Get tags of a Pravega stream. + * + * @param scope_name The scope name. + * @param stream_name The stream name. + * @returns The stream tags. + */ + get_stream_tags(scope_name: string, stream_name: string): string[] { + return StreamManagerGetStreamTags.call(this.StreamManger, scope_name, stream_name); + } + + /** + * Seal a Pravega stream. SEAL BEFORE DELETE! + * + * @param scope_name The scope name. + * @param stream_name The stream name. + * @returns The seal result. + */ + seal_stream(scope_name: string, stream_name: string): boolean { + return StreamManagerSealStream.call(this.StreamManger, scope_name, stream_name); + } + + /** + * Deleta a Pravega stream. SEAL BEFORE DELETE! + * + * @param scope_name The scope name. + * @param stream_name The stream name. + * @returns The deletion result. + */ + delete_stream(scope_name: string, stream_name: string): boolean { + return StreamManagerDeleteStream.call(this.StreamManger, scope_name, stream_name); + } + + /** + * List all scopes in Pravega. + * + * @param scope_name The scope name. + * @returns All stream names in this scope. + */ + list_streams(scope_name: string): string[] { + return StreamManagerListStreams.call(this.StreamManger, scope_name); + } +} diff --git a/nodejs/tests/stream_manager.ts b/nodejs/tests/stream_manager.ts new file mode 100644 index 000000000..e6b5bba80 --- /dev/null +++ b/nodejs/tests/stream_manager.ts @@ -0,0 +1,60 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { assert } from 'chai'; +import { StreamManger, StreamRetentionPolicy, StreamScalingPolicy } from '../stream_manager.js'; + +const stream_manager = new StreamManger('tcp://127.0.0.1:9090', false, false, true); +console.log(stream_manager); + +// create scope and stream +assert.equal(stream_manager.create_scope('scope1'), true); +assert.equal( + stream_manager.create_stream( + 'scope1', + 'stream1', + StreamRetentionPolicy.none(), + StreamScalingPolicy.fixed_scaling_policy(1) + ), + true +); +assert.equal(stream_manager.create_stream('scope1', 'stream2withoutpolicy'), true); + +// assert list scope and stream +assert.deepEqual(stream_manager.list_scopes(), ['scope1', '_system']); +assert.deepEqual(stream_manager.list_streams('scope1'), [ + 'stream2withoutpolicy', + '_MARKstream2withoutpolicy', + '_MARKstream1', + 'stream1', +]); + +// update stream with tags and get tags +assert.equal( + stream_manager.update_stream( + 'scope1', + 'stream2withoutpolicy', + StreamRetentionPolicy.none(), + StreamScalingPolicy.fixed_scaling_policy(1), + ['a', 'bb', 'ccc'] + ), + true +); +assert.deepEqual(stream_manager.get_stream_tags('scope1', 'stream2withoutpolicy'), ['bb', 'a', 'ccc']); + +// delete stream and scope +assert.equal(stream_manager.seal_stream('scope1', 'stream1'), true); +assert.equal(stream_manager.delete_stream('scope1', 'stream1'), true); +assert.equal(stream_manager.seal_stream('scope1', 'stream2withoutpolicy'), true); +assert.equal(stream_manager.delete_stream('scope1', 'stream2withoutpolicy'), true); +assert.equal(stream_manager.delete_scope('scope1'), true); diff --git a/nodejs/tsconfig.json b/nodejs/tsconfig.json new file mode 100644 index 000000000..615d584d1 --- /dev/null +++ b/nodejs/tsconfig.json @@ -0,0 +1,10 @@ +{ + "compilerOptions": { + "sourceMap": true, + "target": "es6", + "module": "esnext", + "esModuleInterop": true, + "moduleResolution": "nodenext", + "rootDir": "./" + } +} \ No newline at end of file From 5a2f8a811b38b7b6fbed0780af7f6add6d49ef7e Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 7 Mar 2022 10:51:49 +0800 Subject: [PATCH 02/16] update doc to /// Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_manager.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index 30a6aa1e4..8402cb140 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -20,8 +20,10 @@ use pravega_client_shared::*; use pravega_controller_client::ControllerError; use tracing::info; -// An internal rust struct that holds the necessary info to perform actions on StreamManager. -// The `js_new` method will return a boxed(wrapped) StreamManager as an external object. +/// +/// An internal rust struct that holds the necessary info to perform actions on StreamManager. +/// The `js_new` method will return a boxed(wrapped) StreamManager as an external object. +/// pub(crate) struct StreamManager { controller_ip: String, cf: ClientFactory, @@ -133,7 +135,9 @@ impl StreamScalingPolicy { impl Finalize for StreamManager {} -// The implementation of the pure Rust client call. +/// +/// The implementation of the pure Rust client call. +/// impl StreamManager { fn new( controller_uri: &str, @@ -357,7 +361,9 @@ impl StreamManager { } } -// The implementation of the JavaScript call and parameters cast. +/// +/// The implementation of the JavaScript proxy call and the parameters cast. +/// impl StreamManager { pub fn js_new(mut cx: FunctionContext) -> JsResult> { let controller_uri = cx.argument::(0)?.value(&mut cx); From 678caf312d059221897e36ac018eb58e0f208d28 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 7 Mar 2022 16:39:25 +0800 Subject: [PATCH 03/16] favor composition over inheritance Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/Cargo.toml | 4 +- nodejs/src/lib.rs | 1 + nodejs/src/stream_manager.rs | 10 ++- nodejs/src/util.rs | 4 +- nodejs/stream_manager.ts | 140 +++++++++++++++++---------------- nodejs/tests/stream_manager.ts | 6 +- 6 files changed, 89 insertions(+), 76 deletions(-) diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml index 1e1bb4930..f4a4a6a31 100644 --- a/nodejs/Cargo.toml +++ b/nodejs/Cargo.toml @@ -29,5 +29,5 @@ tracing-subscriber = "0.2.2" [dependencies.neon] default-features = false -features = ["napi-6"] -version = "0.9" +features = ["napi-6", "promise-api", "channel-api"] +version = "0.10.0-alpha.3" diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 4aeaeb5cb..49820d2ff 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -41,5 +41,6 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("StreamManagerSealStream", StreamManager::js_seal_stream)?; cx.export_function("StreamManagerDeleteStream", StreamManager::js_delete_stream)?; cx.export_function("StreamManagerListStreams", StreamManager::js_list_streams)?; + cx.export_function("StreamManagerToString", StreamManager::js_to_str)?; Ok(()) } diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index 8402cb140..335c0e5bc 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -11,7 +11,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::util::js_log; use neon::prelude::*; use pravega_client::client_factory::ClientFactory; use pravega_client_config::{ClientConfig, ClientConfigBuilder}; @@ -564,4 +563,13 @@ impl StreamManager { } Ok(array) } + + pub fn js_to_str(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + + Ok(cx.string(format!( + "Controller ip: {:?} ClientConfig: {:?}", + stream_manager.controller_ip, stream_manager.config + ))) + } } diff --git a/nodejs/src/util.rs b/nodejs/src/util.rs index e4720bf49..6c894fb88 100644 --- a/nodejs/src/util.rs +++ b/nodejs/src/util.rs @@ -17,10 +17,8 @@ pub fn js_log<'a, C: Context<'a>>(cx: &mut C, s: String) -> NeonResult<()> { let global = cx.global(); let console = global.get(cx, "console")?.downcast_or_throw::(cx)?; let log = console.get(cx, "log")?.downcast_or_throw::(cx)?; - let null = cx.null(); - let args: Vec> = vec![cx.string(s)]; - log.call(cx, null, args)?; + log.call_with(cx).arg(cx.string(s)).apply::(cx)?; Ok(()) } diff --git a/nodejs/stream_manager.ts b/nodejs/stream_manager.ts index 649ff7c3d..2ea4216e4 100644 --- a/nodejs/stream_manager.ts +++ b/nodejs/stream_manager.ts @@ -33,54 +33,53 @@ const { StreamManagerSealStream, StreamManagerDeleteStream, StreamManagerListStreams, + StreamManagerToString, } = require('./index.node'); -export class StreamRetentionPolicy { - static none = (): StreamRetentionPolicy => StreamRetentionPolicyNone(); - static by_size = (size_in_bytes: number): StreamRetentionPolicy => StreamRetentionPolicyBySize(size_in_bytes); - static by_time = (time_in_millis: number): StreamRetentionPolicy => StreamRetentionPolicyByTime(time_in_millis); -} +export interface StreamRetentionPolicy {} -export class StreamScalingPolicy { - static fixed_scaling_policy = (initial_segments: number): StreamScalingPolicy => - StreamScalingPolicyFixed(initial_segments); - static auto_scaling_policy_by_data_rate = ( +export const StreamRetentionPolicy = { + none: (): StreamRetentionPolicy => StreamRetentionPolicyNone(), + by_size: (size_in_bytes: number): StreamRetentionPolicy => StreamRetentionPolicyBySize(size_in_bytes), + by_time: (time_in_millis: number): StreamRetentionPolicy => StreamRetentionPolicyByTime(time_in_millis), +}; + +export interface StreamScalingPolicy {} + +export const StreamScalingPolicy = { + fixed_scaling_policy: (initial_segments: number): StreamScalingPolicy => StreamScalingPolicyFixed(initial_segments), + auto_scaling_policy_by_data_rate: ( target_rate_kbytes_per_sec: number, scale_factor: number, initial_segments: number - ): StreamScalingPolicy => StreamScalingPolicyByDataRate(target_rate_kbytes_per_sec, scale_factor, initial_segments); - static auto_scaling_policy_by_event_rate = ( + ): StreamScalingPolicy => StreamScalingPolicyByDataRate(target_rate_kbytes_per_sec, scale_factor, initial_segments), + auto_scaling_policy_by_event_rate: ( target_events_per_sec: number, scale_factor: number, initial_segments: number - ): StreamScalingPolicy => StreamScalingPolicyByEventRate(target_events_per_sec, scale_factor, initial_segments); -} + ): StreamScalingPolicy => StreamScalingPolicyByEventRate(target_events_per_sec, scale_factor, initial_segments), +}; /** * Create a StreamManager by providing a controller uri. * * ```typescript - * const stream_manager = new StreamManger('tcp://127.0.0.1:9090', false, false, true); + * const stream_manager = StreamManger('tcp://127.0.0.1:9090', false, false, true); * ``` * * Optionally enable tls support using tls:// scheme. * * ```typescript - * const stream_manager = new StreamManger('tls://127.0.0.1:9090', false, false, true); + * const stream_manager = StreamManger('tls://127.0.0.1:9090', false, false, true); * ``` */ -export class StreamManger { - // TODO: represent this object other than `StreamManger { StreamManger: [External: 676ee80] }` - StreamManger: StreamManger; - - constructor( - controller_uri: string, - auth_enabled: boolean = false, - tls_enabled: boolean = false, - disable_cert_verification: boolean = true - ) { - this.StreamManger = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification); - } +export const StreamManager = ( + controller_uri: string, + auth_enabled: boolean = false, + tls_enabled: boolean = false, + disable_cert_verification: boolean = true +) => { + const StreamManger = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification); /** * Create a Pravega scope. @@ -88,28 +87,22 @@ export class StreamManger { * @param scope_name The scope name. * @returns The scope creation result. `false` indicates that the scope exists before creation. */ - create_scope(scope_name: string): boolean { - return StreamManagerCreateScope.call(this.StreamManger, scope_name); - } + const create_scope = (scope_name: string): boolean => StreamManagerCreateScope.call(StreamManger, scope_name); /** * Delete a Pravega scope. * * @param scope_name The scope name. - * @returns The scope deletion result. + * @returns The scope deletion result. `false` indicates that the scope does not exist before deletion. */ - delete_scope(scope_name: string): boolean { - return StreamManagerDeleteScope.call(this.StreamManger, scope_name); - } + const delete_scope = (scope_name: string): boolean => StreamManagerDeleteScope.call(StreamManger, scope_name); /** * List all scopes in Pravega. * * @returns All scope names. */ - list_scopes(): string[] { - return StreamManagerListScopes.call(this.StreamManger); - } + const list_scopes = (): string[] => StreamManagerListScopes.call(StreamManger); /** * Create a stream with or without specific policy in Pravega. @@ -118,29 +111,28 @@ export class StreamManger { * @param stream_name The stream name. * @param scaling_policy The scaling policy. * @param retention_policy The retention policy. - * @param tags The stream tags. + * @param tags The stream tags. * @returns The stream creation result. `false` indicates that the stream exists before creation. */ - create_stream( + const create_stream = ( scope_name: string, stream_name: string, scaling_policy: StreamScalingPolicy = StreamRetentionPolicy.none(), retention_policy: StreamRetentionPolicy = StreamScalingPolicy.fixed_scaling_policy(1), tags: string[] = [] - ): boolean { - return StreamManagerCreateStreamWithPolicy.call( - this.StreamManger, + ): boolean => + StreamManagerCreateStreamWithPolicy.call( + StreamManger, scope_name, stream_name, scaling_policy, retention_policy, tags ); - } /** * Update a Pravega stream with new policies and tags. - * + * * @param scope_name The scope name. * @param stream_name The stream name. * @param scaling_policy The scaling policy. @@ -148,63 +140,77 @@ export class StreamManger { * @param tags The stream tags. * @returns The stream update result. */ - update_stream( + const update_stream = ( scope_name: string, stream_name: string, scaling_policy: StreamScalingPolicy = StreamRetentionPolicy.none(), retention_policy: StreamRetentionPolicy = StreamScalingPolicy.fixed_scaling_policy(1), tags: string[] = [] - ): boolean { - return StreamManagerUpdateStreamWithPolicy.call( - this.StreamManger, + ): boolean => + StreamManagerUpdateStreamWithPolicy.call( + StreamManger, scope_name, stream_name, scaling_policy, retention_policy, tags ); - } /** * Get tags of a Pravega stream. - * + * * @param scope_name The scope name. * @param stream_name The stream name. - * @returns The stream tags. + * @returns The stream tags. */ - get_stream_tags(scope_name: string, stream_name: string): string[] { - return StreamManagerGetStreamTags.call(this.StreamManger, scope_name, stream_name); - } + const get_stream_tags = (scope_name: string, stream_name: string): string[] => + StreamManagerGetStreamTags.call(StreamManger, scope_name, stream_name); /** * Seal a Pravega stream. SEAL BEFORE DELETE! - * + * * @param scope_name The scope name. * @param stream_name The stream name. * @returns The seal result. */ - seal_stream(scope_name: string, stream_name: string): boolean { - return StreamManagerSealStream.call(this.StreamManger, scope_name, stream_name); - } + const seal_stream = (scope_name: string, stream_name: string): boolean => + StreamManagerSealStream.call(StreamManger, scope_name, stream_name); /** * Deleta a Pravega stream. SEAL BEFORE DELETE! - * + * * @param scope_name The scope name. * @param stream_name The stream name. * @returns The deletion result. */ - delete_stream(scope_name: string, stream_name: string): boolean { - return StreamManagerDeleteStream.call(this.StreamManger, scope_name, stream_name); - } + const delete_stream = (scope_name: string, stream_name: string): boolean => + StreamManagerDeleteStream.call(StreamManger, scope_name, stream_name); /** * List all scopes in Pravega. - * + * * @param scope_name The scope name. * @returns All stream names in this scope. */ - list_streams(scope_name: string): string[] { - return StreamManagerListStreams.call(this.StreamManger, scope_name); - } -} + const list_streams = (scope_name: string): string[] => StreamManagerListStreams.call(StreamManger, scope_name); + + /** + * A detailed view of a StreamManager. + * + * @returns String representation of the StreamManager. + */ + const toString = (): string => StreamManagerToString.call(StreamManger); + + return { + create_scope, + delete_scope, + list_scopes, + create_stream, + update_stream, + get_stream_tags, + seal_stream, + delete_stream, + list_streams, + toString, + }; +}; diff --git a/nodejs/tests/stream_manager.ts b/nodejs/tests/stream_manager.ts index e6b5bba80..c8ea376f8 100644 --- a/nodejs/tests/stream_manager.ts +++ b/nodejs/tests/stream_manager.ts @@ -12,10 +12,10 @@ // limitations under the License. import { assert } from 'chai'; -import { StreamManger, StreamRetentionPolicy, StreamScalingPolicy } from '../stream_manager.js'; +import { StreamManager, StreamRetentionPolicy, StreamScalingPolicy } from '../stream_manager.js'; -const stream_manager = new StreamManger('tcp://127.0.0.1:9090', false, false, true); -console.log(stream_manager); +const stream_manager = StreamManager('tcp://127.0.0.1:9090', false, false, true); +console.log(`${stream_manager}`); // create scope and stream assert.equal(stream_manager.create_scope('scope1'), true); From 2f025767e694e24a6aab51aa302999934c9aa05b Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:12:21 +0800 Subject: [PATCH 04/16] remove redundant hello Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/.prettierrc | 2 +- nodejs/src/lib.rs | 6 ------ nodejs/tsconfig.json | 2 +- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/nodejs/.prettierrc b/nodejs/.prettierrc index bc39e1a0e..2ba4894d5 100644 --- a/nodejs/.prettierrc +++ b/nodejs/.prettierrc @@ -5,4 +5,4 @@ "trailingComma": "es5", "printWidth": 120, "endOfLine": "auto" -} \ No newline at end of file +} diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 49820d2ff..91d108f18 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -17,14 +17,8 @@ pub mod util; use neon::prelude::*; use stream_manager::{StreamManager, StreamRetentionPolicy, StreamScalingPolicy}; -fn hello(mut cx: FunctionContext) -> JsResult { - let result = 42; - Ok(cx.number(result)) -} - #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { - cx.export_function("hello", hello)?; cx.export_function("StreamManagerNew", StreamManager::js_new)?; cx.export_function("StreamManagerCreateScope", StreamManager::js_create_scope)?; cx.export_function("StreamManagerDeleteScope", StreamManager::js_delete_scope)?; diff --git a/nodejs/tsconfig.json b/nodejs/tsconfig.json index 615d584d1..aebcadda6 100644 --- a/nodejs/tsconfig.json +++ b/nodejs/tsconfig.json @@ -7,4 +7,4 @@ "moduleResolution": "nodenext", "rootDir": "./" } -} \ No newline at end of file +} From 7407404db59eadca4f7d3d8747d3e6e30f8491e9 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Thu, 10 Mar 2022 17:17:01 +0800 Subject: [PATCH 05/16] lint Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/lib.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 91d108f18..0ee17a49f 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -26,11 +26,26 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("StreamRetentionPolicyNone", StreamRetentionPolicy::js_none)?; cx.export_function("StreamRetentionPolicyBySize", StreamRetentionPolicy::js_by_size)?; cx.export_function("StreamRetentionPolicyByTime", StreamRetentionPolicy::js_by_time)?; - cx.export_function("StreamScalingPolicyFixed", StreamScalingPolicy::js_fixed_scaling_policy)?; - cx.export_function("StreamScalingPolicyByDataRate", StreamScalingPolicy::js_auto_scaling_policy_by_data_rate)?; - cx.export_function("StreamScalingPolicyByEventRate", StreamScalingPolicy::js_auto_scaling_policy_by_event_rate)?; - cx.export_function("StreamManagerCreateStreamWithPolicy", StreamManager::js_create_stream_with_policy)?; - cx.export_function("StreamManagerUpdateStreamWithPolicy", StreamManager::js_update_stream_with_policy)?; + cx.export_function( + "StreamScalingPolicyFixed", + StreamScalingPolicy::js_fixed_scaling_policy, + )?; + cx.export_function( + "StreamScalingPolicyByDataRate", + StreamScalingPolicy::js_auto_scaling_policy_by_data_rate, + )?; + cx.export_function( + "StreamScalingPolicyByEventRate", + StreamScalingPolicy::js_auto_scaling_policy_by_event_rate, + )?; + cx.export_function( + "StreamManagerCreateStreamWithPolicy", + StreamManager::js_create_stream_with_policy, + )?; + cx.export_function( + "StreamManagerUpdateStreamWithPolicy", + StreamManager::js_update_stream_with_policy, + )?; cx.export_function("StreamManagerGetStreamTags", StreamManager::js_get_stream_tags)?; cx.export_function("StreamManagerSealStream", StreamManager::js_seal_stream)?; cx.export_function("StreamManagerDeleteStream", StreamManager::js_delete_stream)?; From eaf10c8ce872b57e51eaca4dee607a6fc41e08ed Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Fri, 11 Mar 2022 21:32:09 +0800 Subject: [PATCH 06/16] Issue357: Nodejs ReaderGroup and EventReader Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/Cargo.toml | 6 +- nodejs/src/lib.rs | 33 +++++ nodejs/src/stream_manager.rs | 58 +++++++++ nodejs/src/stream_reader.rs | 209 ++++++++++++++++++++++++++++++ nodejs/src/stream_reader_group.rs | 144 ++++++++++++++++++++ nodejs/src/util.rs | 6 +- nodejs/stream_manager.ts | 43 ++++-- nodejs/stream_reader.ts | 127 ++++++++++++++++++ nodejs/stream_reader_group.ts | 77 +++++++++++ nodejs/tests/stream_reader.ts | 20 +++ nodejs/tests/write.py | 31 +++++ nodejs/tsconfig.json | 4 +- 12 files changed, 740 insertions(+), 18 deletions(-) create mode 100644 nodejs/src/stream_reader.rs create mode 100644 nodejs/src/stream_reader_group.rs create mode 100644 nodejs/stream_reader.ts create mode 100644 nodejs/stream_reader_group.ts create mode 100644 nodejs/tests/stream_reader.ts create mode 100644 nodejs/tests/write.py diff --git a/nodejs/Cargo.toml b/nodejs/Cargo.toml index f4a4a6a31..a6909c09f 100644 --- a/nodejs/Cargo.toml +++ b/nodejs/Cargo.toml @@ -9,7 +9,7 @@ license = "Apache-2.0" name = "pravega_client" readme = "README.md" repository = "https://github.com/pravega/pravega-client-rust" -version = "0.1.0" +version = "0.2.0" [lib] crate-type = ["cdylib"] @@ -17,12 +17,14 @@ crate-type = ["cdylib"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +derive-new = "0.5" futures = "0.3" pravega-client = {path = "../", version = "0.3"} pravega-client-config = {path = "../config", version = "0.3"} pravega-client-retry = {path = "../retry", version = "0.3"} pravega-client-shared = {path = "../shared", version = "0.3"} pravega-controller-client = {path = "../controller-client", version = "0.3"} +tokio = "1.1" tracing = "0.1.17" tracing-futures = "0.2.4" tracing-subscriber = "0.2.2" @@ -30,4 +32,4 @@ tracing-subscriber = "0.2.2" [dependencies.neon] default-features = false features = ["napi-6", "promise-api", "channel-api"] -version = "0.10.0-alpha.3" +version = "0.10.0" diff --git a/nodejs/src/lib.rs b/nodejs/src/lib.rs index 0ee17a49f..989827f04 100644 --- a/nodejs/src/lib.rs +++ b/nodejs/src/lib.rs @@ -11,11 +11,18 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[macro_use] +extern crate derive_new; + pub mod stream_manager; +pub mod stream_reader; +pub mod stream_reader_group; pub mod util; use neon::prelude::*; use stream_manager::{StreamManager, StreamRetentionPolicy, StreamScalingPolicy}; +use stream_reader::{EventData, Slice, StreamReader}; +use stream_reader_group::{StreamCut, StreamReaderGroup}; #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { @@ -51,5 +58,31 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("StreamManagerDeleteStream", StreamManager::js_delete_stream)?; cx.export_function("StreamManagerListStreams", StreamManager::js_list_streams)?; cx.export_function("StreamManagerToString", StreamManager::js_to_str)?; + + cx.export_function("StreamRetentionStreamCutHead", StreamCut::js_head)?; + cx.export_function("StreamRetentionStreamCutTail", StreamCut::js_tail)?; + cx.export_function( + "StreamManagerCreateReaderGroup", + StreamManager::js_create_reader_group, + )?; + cx.export_function( + "StreamReaderGroupCreateReader", + StreamReaderGroup::js_create_reader, + )?; + cx.export_function( + "StreamReaderGroupReaderOffline", + StreamReaderGroup::js_reader_offline, + )?; + cx.export_function("StreamReaderGroupToString", StreamReaderGroup::js_to_str)?; + + cx.export_function("EventDataData", EventData::js_data)?; + cx.export_function("EventDataOffset", EventData::js_offset)?; + cx.export_function("EventDataToString", EventData::js_to_str)?; + cx.export_function("SliceNext", Slice::js_next)?; + cx.export_function("StreamReaderGetSegementSlice", StreamReader::js_get_segment_slice)?; + cx.export_function("StreamReaderReaderOffline", StreamReader::js_reader_offline)?; + cx.export_function("StreamReaderReleaseSegment", StreamReader::js_release_segment)?; + cx.export_function("StreamReaderToString", StreamReader::js_to_str)?; + Ok(()) } diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index 335c0e5bc..1f4ce9684 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -11,8 +11,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +use crate::stream_reader_group::{StreamCut, StreamReaderGroup}; use neon::prelude::*; use pravega_client::client_factory::ClientFactory; +use pravega_client::event::reader_group::{ReaderGroupConfigBuilder, StreamCutVersioned}; use pravega_client_config::{ClientConfig, ClientConfigBuilder}; use pravega_client_retry::retry_result::RetryError; use pravega_client_shared::*; @@ -358,6 +360,37 @@ impl StreamManager { .await }) } + + /// + /// Create a ReaderGroup for a given Stream. + /// + fn create_reader_group( + &self, + reader_group_name: &str, + scope_name: &str, + streams: Vec, + stream_cut: &StreamCutVersioned, + ) -> StreamReaderGroup { + let mut rg_config_builder = ReaderGroupConfigBuilder::default(); + + let scope = Scope::from(scope_name.to_string()); + for stream in streams { + let scoped_stream = ScopedStream { + scope: scope.clone(), + stream: Stream::from(stream.to_string()), + }; + rg_config_builder.read_from_stream(scoped_stream, stream_cut.clone()); + } + let rg_config = rg_config_builder.build(); + + let handle = self.cf.runtime_handle(); + let rg = handle.block_on(self.cf.create_reader_group_with_config( + reader_group_name.to_string(), + rg_config, + scope, + )); + StreamReaderGroup::new(rg, self.cf.runtime_handle()) + } } /// @@ -564,6 +597,31 @@ impl StreamManager { Ok(array) } + pub fn js_create_reader_group(mut cx: FunctionContext) -> JsResult> { + let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; + let reader_group_name = cx.argument::(0)?.value(&mut cx); + let scope_name = cx.argument::(1)?.value(&mut cx); + let streams = cx + .argument::(2)? + .to_vec(&mut cx)? + .into_iter() + .map(|v| { + v.downcast_or_throw::(&mut cx) + .map(|v| v.value(&mut cx)) + }) + .collect::, _>>()?; + let stream_cut = cx.argument::>(3)?; + + let stream_reader_group = stream_manager.create_reader_group( + &reader_group_name.to_string(), + &scope_name.to_string(), + streams, + &stream_cut.stream_cut, + ); + + Ok(cx.boxed(stream_reader_group)) + } + pub fn js_to_str(mut cx: FunctionContext) -> JsResult { let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs new file mode 100644 index 000000000..7ae8e349f --- /dev/null +++ b/nodejs/src/stream_reader.rs @@ -0,0 +1,209 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use neon::prelude::*; +use pravega_client::event::reader::{Event, SegmentSlice}; +use pravega_client::event::reader::{EventReader, EventReaderError}; +use pravega_client_shared::ScopedStream; +use std::cell::RefCell; +use std::sync::Arc; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tracing::info; + +impl Finalize for EventData {} + +/// +/// This represents an event that was read from a Pravega Segment and the offset at which the event +/// was read from. +/// +#[derive(new)] +pub(crate) struct EventData { + offset_in_segment: i64, + value: Vec, +} + +impl EventData { + /// + /// Return the event data as `ArrayBuffer`. + /// + pub fn js_data(mut cx: FunctionContext) -> JsResult { + let event_data = cx.this().downcast_or_throw::, _>(&mut cx)?; + let data = event_data.value.as_slice().clone(); + + Ok(JsArrayBuffer::external(&mut cx, data.to_owned())) + } + + /// + /// Return the event offset in the segment. + /// + pub fn js_offset(mut cx: FunctionContext) -> JsResult { + let event_data = cx.this().downcast_or_throw::, _>(&mut cx)?; + // this may result to a f64::MAX, but I'm not able to find a f64: From impl + let offset = event_data.offset_in_segment as f64; + + Ok(cx.number(offset)) + } + + /// + /// Return the string representation. + /// + pub fn js_to_str(mut cx: FunctionContext) -> JsResult { + let event_data = cx.this().downcast_or_throw::, _>(&mut cx)?; + Ok(cx.string(format!( + "offset {:?} data :{:?}", + event_data.offset_in_segment, event_data.value + ))) + } +} + +impl Finalize for Slice {} + +/// +/// This represents a segment slice which can be used to read events from a Pravega segment. +/// +#[derive(new)] +pub(crate) struct Slice { + seg_slice: Option, +} + +impl Slice { + fn next(&mut self) -> Option { + if let Some(mut slice) = self.seg_slice.take() { + let next_event: Option = slice.next(); + self.seg_slice = Some(slice); + next_event.map(|e| EventData { + offset_in_segment: e.offset_in_segment, + value: e.value, + }) + } else { + info!("Empty Slice"); + None + } + } + + pub fn js_next(mut cx: FunctionContext) -> JsResult> { + // Use RefCell as self.seg_slice needs to be changed. + // Example at https://github.com/neon-bindings/neon/blob/0.10.0/test/napi/src/js/boxed.rs + let slice = cx.argument::>>(0)?; + let event_data = slice.borrow_mut().next(); + + match event_data { + Some(event_data) => Ok(cx.boxed(event_data)), + // TODO: better return something like undefined instead of throwing an error? + None => cx.throw_error("No more data in the stream!"), + } + } +} + +impl Finalize for StreamReader {} + +/// +/// This represents a Stream reader for a given Stream. +/// Note: A StreamReader cannot be created directly without using the StreamReaderGroup. +/// +#[derive(new)] +pub(crate) struct StreamReader { + reader: Arc>, + runtime_handle: Handle, + streams: Vec, +} + +impl StreamReader { + // Helper method for to set the reader offline. + async fn reader_offline_async(&self) -> Result<(), EventReaderError> { + self.reader.lock().await.reader_offline().await + } + + // Helper method for to release the segment. + async fn release_segment_async(&self, slice: SegmentSlice) -> Result<(), EventReaderError> { + self.reader.lock().await.release_segment(slice).await + } +} + +impl StreamReader { + /// + /// Return a Slice in an asynchronous call. + /// The actural returned type from await will be `Promise` aka `JsResult>>`. + /// + /// See the tokio-fetch example for more details on how to return a Promise and await. + /// https://github.com/neon-bindings/examples/tree/2dbbef55f483635d0118c20c9902bf4c6faa1ecc/examples/tokio-fetch + /// + pub fn js_get_segment_slice(mut cx: FunctionContext) -> JsResult { + let stream_reader = cx.this().downcast_or_throw::, _>(&mut cx)?; + let channel = cx.channel(); + + let (deferred, promise) = cx.promise(); + + // Spawn an `async` task on the tokio runtime. + let reader = stream_reader.reader.clone(); + stream_reader.runtime_handle.spawn(async move { + // expensive async procrdure executed in the tokio thread + let slice_result = reader.lock().await.acquire_segment().await; + + // notify and execute in the javascript main thread + deferred.settle_with(&channel, move |mut cx| match slice_result { + Ok(slice_) => { + let slice = RefCell::new(Slice { seg_slice: slice_ }); + Ok(cx.boxed(slice)) + } + Err(e) => cx.throw_error(e.to_string()), + }) + }); + + Ok(promise) + } + + /// + /// Set the reader offline. + /// + pub fn js_reader_offline(mut cx: FunctionContext) -> JsResult { + let stream_reader = cx.this().downcast_or_throw::, _>(&mut cx)?; + + match stream_reader + .runtime_handle + .block_on(stream_reader.reader_offline_async()) + { + Ok(_) => Ok(cx.undefined()), + Err(e) => cx.throw_error(format!("Error while attempting to acquire segment {:?}", e)), + } + } + + /// + /// Release the segment back. + /// + pub fn js_release_segment(mut cx: FunctionContext) -> JsResult { + let stream_reader = cx.this().downcast_or_throw::, _>(&mut cx)?; + let slice = cx.argument::>>(0)?; + + if let Some(s) = slice.borrow_mut().seg_slice.take() { + return match stream_reader + .runtime_handle + .block_on(stream_reader.release_segment_async(s)) + { + Ok(_) => Ok(cx.undefined()), + Err(e) => cx.throw_error(format!("Error while attempting to acquire segment {:?}", e)), + }; + } + Ok(cx.undefined()) + } + + /// + /// Return the string representation. + /// + pub fn js_to_str(mut cx: FunctionContext) -> JsResult { + let stream_reader = cx.this().downcast_or_throw::, _>(&mut cx)?; + + Ok(cx.string(format!("Streams: {:?} ", stream_reader.streams))) + } +} diff --git a/nodejs/src/stream_reader_group.rs b/nodejs/src/stream_reader_group.rs new file mode 100644 index 000000000..7fa9e2008 --- /dev/null +++ b/nodejs/src/stream_reader_group.rs @@ -0,0 +1,144 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::stream_reader::StreamReader; +use neon::prelude::*; +use pravega_client::event::reader_group::ReaderGroup; +use pravega_client::event::reader_group::StreamCutVersioned; +use pravega_client::event::reader_group_state::ReaderGroupStateError; +use std::sync::Arc; +use tokio::runtime::Handle; +use tokio::sync::Mutex; +use tracing::error; +use tracing::info; + +impl Finalize for StreamCut {} + +pub(crate) struct StreamCut { + pub stream_cut: StreamCutVersioned, +} + +/// +/// Represent a consistent position in the stream. +/// +impl StreamCut { + pub fn js_head(mut cx: FunctionContext) -> JsResult> { + Ok(cx.boxed(StreamCut { + stream_cut: StreamCutVersioned::Unbounded, + })) + } + + pub fn js_tail(mut cx: FunctionContext) -> JsResult> { + Ok(cx.boxed(StreamCut { + stream_cut: StreamCutVersioned::Tail, + })) + } +} + +impl Finalize for StreamReaderGroup {} + +/// +/// A reader group is a collection of readers that collectively read all the events in the +/// stream. The events are distributed among the readers in the group such that each event goes +/// to only one reader. +/// +/// Note: A StreamReaderGroup cannot be created directly without using the StreamManager. +/// +#[derive(new)] +pub(crate) struct StreamReaderGroup { + reader_group: ReaderGroup, + runtime_handle: Handle, +} + +impl StreamReaderGroup { + /// + /// This method is used to create a reader under a ReaderGroup. + /// + fn create_reader(&self, reader_name: &str) -> StreamReader { + info!( + "Creating reader {:?} under reader group {:?}", + reader_name, self.reader_group.name + ); + + let reader = self + .runtime_handle + .block_on(self.reader_group.create_reader(reader_name.to_string())); + StreamReader::new( + Arc::new(Mutex::new(reader)), + self.runtime_handle.clone(), + self.reader_group.get_managed_streams(), + ) + } + + /// + /// This method is used to manually mark a reader as offline under a ReaderGroup. + /// + fn reader_offline(&self, reader_name: &str) -> Result<(), ReaderGroupStateError> { + info!( + "Marking reader {:?} under reader group {:?} as offline", + reader_name, self.reader_group.name + ); + + let res = self + .runtime_handle + .block_on(self.reader_group.reader_offline(reader_name.to_string(), None)); + match res { + Ok(_) => Ok(()), + Err(e) => match e { + ReaderGroupStateError::SyncError { .. } => { + error!("Failed to mark the reader {:?} offline {:?} ", reader_name, e); + Err(e) + } + ReaderGroupStateError::ReaderAlreadyOfflineError { .. } => { + info!("Reader {:?} is already offline", reader_name); + Ok(()) + } + }, + } + } +} + +impl StreamReaderGroup { + pub fn js_create_reader(mut cx: FunctionContext) -> JsResult> { + let stream_reader_group = cx + .this() + .downcast_or_throw::, _>(&mut cx)?; + let reader_name = cx.argument::(0)?.value(&mut cx); + + Ok(cx.boxed(stream_reader_group.create_reader(&reader_name.to_string()))) + } + + pub fn js_reader_offline(mut cx: FunctionContext) -> JsResult { + let stream_reader_group = cx + .this() + .downcast_or_throw::, _>(&mut cx)?; + let reader_name = cx.argument::(0)?.value(&mut cx); + + let res = stream_reader_group.reader_offline(&reader_name.to_string()); + match res { + Ok(()) => Ok(cx.undefined()), + Err(e) => cx.throw_error(e.to_string()), + } + } + + pub fn js_to_str(mut cx: FunctionContext) -> JsResult { + let stream_manager = cx + .this() + .downcast_or_throw::, _>(&mut cx)?; + + Ok(cx.string(format!( + "ReaderGroup: {:?}, ReaderGroup config : {:?}", + stream_manager.reader_group.name, stream_manager.reader_group.config + ))) + } +} diff --git a/nodejs/src/util.rs b/nodejs/src/util.rs index 6c894fb88..8cda5bf41 100644 --- a/nodejs/src/util.rs +++ b/nodejs/src/util.rs @@ -15,10 +15,10 @@ use neon::prelude::*; pub fn js_log<'a, C: Context<'a>>(cx: &mut C, s: String) -> NeonResult<()> { let global = cx.global(); - let console = global.get(cx, "console")?.downcast_or_throw::(cx)?; - let log = console.get(cx, "log")?.downcast_or_throw::(cx)?; + let console = global.get::(cx, "console")?; + let log = console.get::(cx, "log")?; - log.call_with(cx).arg(cx.string(s)).apply::(cx)?; + log.call_with(cx).arg(cx.string(s)).exec(cx)?; Ok(()) } diff --git a/nodejs/stream_manager.ts b/nodejs/stream_manager.ts index 2ea4216e4..be6c02949 100644 --- a/nodejs/stream_manager.ts +++ b/nodejs/stream_manager.ts @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { StreamCut, StreamReaderGroup } from './stream_reader_group.js'; + // Native modules are not currently supported with ES module imports. // https://nodejs.org/api/esm.html#esm_no_native_module_loading import { createRequire } from 'module'; @@ -33,6 +35,7 @@ const { StreamManagerSealStream, StreamManagerDeleteStream, StreamManagerListStreams, + StreamManagerCreateReaderGroup, StreamManagerToString, } = require('./index.node'); @@ -79,7 +82,7 @@ export const StreamManager = ( tls_enabled: boolean = false, disable_cert_verification: boolean = true ) => { - const StreamManger = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification); + const stream_manager = StreamManagerNew(controller_uri, auth_enabled, tls_enabled, disable_cert_verification); /** * Create a Pravega scope. @@ -87,7 +90,7 @@ export const StreamManager = ( * @param scope_name The scope name. * @returns The scope creation result. `false` indicates that the scope exists before creation. */ - const create_scope = (scope_name: string): boolean => StreamManagerCreateScope.call(StreamManger, scope_name); + const create_scope = (scope_name: string): boolean => StreamManagerCreateScope.call(stream_manager, scope_name); /** * Delete a Pravega scope. @@ -95,14 +98,14 @@ export const StreamManager = ( * @param scope_name The scope name. * @returns The scope deletion result. `false` indicates that the scope does not exist before deletion. */ - const delete_scope = (scope_name: string): boolean => StreamManagerDeleteScope.call(StreamManger, scope_name); + const delete_scope = (scope_name: string): boolean => StreamManagerDeleteScope.call(stream_manager, scope_name); /** * List all scopes in Pravega. * * @returns All scope names. */ - const list_scopes = (): string[] => StreamManagerListScopes.call(StreamManger); + const list_scopes = (): string[] => StreamManagerListScopes.call(stream_manager); /** * Create a stream with or without specific policy in Pravega. @@ -122,7 +125,7 @@ export const StreamManager = ( tags: string[] = [] ): boolean => StreamManagerCreateStreamWithPolicy.call( - StreamManger, + stream_manager, scope_name, stream_name, scaling_policy, @@ -148,7 +151,7 @@ export const StreamManager = ( tags: string[] = [] ): boolean => StreamManagerUpdateStreamWithPolicy.call( - StreamManger, + stream_manager, scope_name, stream_name, scaling_policy, @@ -164,7 +167,7 @@ export const StreamManager = ( * @returns The stream tags. */ const get_stream_tags = (scope_name: string, stream_name: string): string[] => - StreamManagerGetStreamTags.call(StreamManger, scope_name, stream_name); + StreamManagerGetStreamTags.call(stream_manager, scope_name, stream_name); /** * Seal a Pravega stream. SEAL BEFORE DELETE! @@ -174,7 +177,7 @@ export const StreamManager = ( * @returns The seal result. */ const seal_stream = (scope_name: string, stream_name: string): boolean => - StreamManagerSealStream.call(StreamManger, scope_name, stream_name); + StreamManagerSealStream.call(stream_manager, scope_name, stream_name); /** * Deleta a Pravega stream. SEAL BEFORE DELETE! @@ -184,7 +187,7 @@ export const StreamManager = ( * @returns The deletion result. */ const delete_stream = (scope_name: string, stream_name: string): boolean => - StreamManagerDeleteStream.call(StreamManger, scope_name, stream_name); + StreamManagerDeleteStream.call(stream_manager, scope_name, stream_name); /** * List all scopes in Pravega. @@ -192,14 +195,31 @@ export const StreamManager = ( * @param scope_name The scope name. * @returns All stream names in this scope. */ - const list_streams = (scope_name: string): string[] => StreamManagerListStreams.call(StreamManger, scope_name); + const list_streams = (scope_name: string): string[] => StreamManagerListStreams.call(stream_manager, scope_name); + + /** + * Create a ReaderGroup for a given Stream. + * + * @param scope_name The scope name. + * @returns All stream names in this scope. + * @todo An optional element cannot follow a rest element. `...args: [...stream: string[], stream_cut?: StreamCut]` + */ + const create_reader_group = ( + stream_cut: StreamCut, + reader_group_name: string, + scope_name: string, + ...streams: string[] + ): StreamReaderGroup => + StreamReaderGroup( + StreamManagerCreateReaderGroup.call(stream_manager, reader_group_name, scope_name, streams, stream_cut) + ); /** * A detailed view of a StreamManager. * * @returns String representation of the StreamManager. */ - const toString = (): string => StreamManagerToString.call(StreamManger); + const toString = (): string => StreamManagerToString.call(stream_manager); return { create_scope, @@ -211,6 +231,7 @@ export const StreamManager = ( seal_stream, delete_stream, list_streams, + create_reader_group, toString, }; }; diff --git a/nodejs/stream_reader.ts b/nodejs/stream_reader.ts new file mode 100644 index 000000000..6f9fcc759 --- /dev/null +++ b/nodejs/stream_reader.ts @@ -0,0 +1,127 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Native modules are not currently supported with ES module imports. +// https://nodejs.org/api/esm.html#esm_no_native_module_loading +import { createRequire } from 'module'; +const require = createRequire(import.meta.url); + +const { + EventDataData, + EventDataOffset, + EventDataToString, + SliceNext, + StreamReaderGetSegementSlice, + StreamReaderReaderOffline, + StreamReaderReleaseSegment, + StreamReaderToString, +} = require('./index.node'); + +/** + * This represents an event that was read from a Pravega Segment and the offset at which the event + * was read from. + */ +interface Event { + /** + * Return the event data as `ArrayBuffer`. + * + * @returns ArrayBuffer that contains raw data. + */ + data: () => ArrayBuffer; + /** + * Return the event offset in the segment. + * + * @returns offset + */ + offset: () => number; + toString: () => string; +} + +const Event = (event): Event => { + const data = (): ArrayBuffer => EventDataData.call(event); + const offset = (): number => EventDataOffset.call(event); + const toString = (): string => EventDataToString.call(event); + return { data, offset, toString }; +}; + +/** + * This represents a segment slice which can be used to read events from a Pravega segment as an iterator. + */ +interface Slice extends IterableIterator {} + +const Slice = (slice): Slice => { + return { + next: (): IteratorResult => { + let event: Event; + try { + event = Event(SliceNext(slice)); + } catch (e) { + return { + done: true, + value: null, + }; + } + return { + done: false, + value: event, + }; + }, + [Symbol.iterator]: function () { + return this; + }, + }; +}; + +/** + * A reader for a stream. + * Note: A StreamReader cannot be created directly without using the StreamReaderGroup. + */ +export interface StreamReader { + /** + * This function returns a SegmentSlice from the SegmentStore(s). + * Individual events can be read from the data received using the following snippets. + * ```javascript + * const seg_slice: Slice = await stream_reader.get_segment_slice(); + * for (const event of seg_slice) { + * const raw_value: ArrayBuffer = event.data(); + * } + * ``` + * + * Invoking this function multiple times ensure multiple SegmentSlices corresponding + * to different Segments of the stream are received. In-case we receive data for an already + * acquired SegmentSlice this method waits until SegmentSlice is completely consumed before + * returning the data. + * + * @returns Slice in Promise. + */ + get_segment_slice: () => Promise; + /** + * Mark the reader as offline. + * This will ensure the segments owned by this reader is distributed to other readers in the ReaderGroup. + */ + reader_offline: () => void; + /** + * Release a partially read segment slice back to event reader. + */ + release_segment: (slice: Slice) => void; + toString: () => string; +} + +export const StreamReader = (stream_reader): StreamReader => { + const get_segment_slice = async (): Promise => Slice(await StreamReaderGetSegementSlice.call(stream_reader)); + const reader_offline = (): void => StreamReaderReaderOffline.call(stream_reader); + const release_segment = (slice: Slice): void => StreamReaderReleaseSegment.call(stream_reader, slice); + const toString = (): string => StreamReaderToString.call(stream_reader); + + return { get_segment_slice, reader_offline, release_segment, toString }; +}; diff --git a/nodejs/stream_reader_group.ts b/nodejs/stream_reader_group.ts new file mode 100644 index 000000000..1a6067f81 --- /dev/null +++ b/nodejs/stream_reader_group.ts @@ -0,0 +1,77 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import { StreamReader } from './stream_reader.js'; + +// Native modules are not currently supported with ES module imports. +// https://nodejs.org/api/esm.html#esm_no_native_module_loading +import { createRequire } from 'module'; +const require = createRequire(import.meta.url); + +const { + StreamRetentionStreamCutHead, + StreamRetentionStreamCutTail, + StreamReaderGroupCreateReader, + StreamReaderGroupReaderOffline, + StreamReaderGroupToString, +} = require('./index.node'); + +/** + * Represent a consistent position in the stream. + * Only `head` and `tail` are supported now. + */ +export interface StreamCut {} + +export const StreamCut = { + head: (): StreamCut => StreamRetentionStreamCutHead(), + tail: (): StreamCut => StreamRetentionStreamCutTail(), +}; + +/** + * A reader group is a collection of readers that collectively read all the events in the + * stream. The events are distributed among the readers in the group such that each event goes + * to only one reader. + * + * Note: A StreamReaderGroup cannot be created directly without using the StreamManager. + */ +export interface StreamReaderGroup { + /** + * Creates (or recreates) a new reader that is part of a StreamReaderGroup. The reader + * will join the group and the members of the group will automatically rebalance among + * themselves. + * + * @param reader_name A unique name (within the group) for this reader. + * @returns The StreamReader + */ + create_reader: (reader_name: string) => StreamReader; + /** + * Invoked when a reader that was added to the group is no longer consuming events. This will + * cause the events that were going to that reader to be redistributed among the other + * readers. Events after the lastPosition provided will be (re)read by other readers in the + * StreamReaderGroup. + * + * @param reader_name The name of the reader that is offline. + */ + reader_offline: (reader_name: string) => void; + toString: () => string; +} + +export const StreamReaderGroup = (stream_reader_group): StreamReaderGroup => { + const create_reader = (reader_name: string): StreamReader => + StreamReader(StreamReaderGroupCreateReader.call(stream_reader_group, reader_name)); + const reader_offline = (reader_name: string): void => + StreamReaderGroupReaderOffline.call(stream_reader_group, reader_name); + const toString = (): string => StreamReaderGroupToString.call(stream_reader_group); + + return { create_reader, reader_offline, toString }; +}; diff --git a/nodejs/tests/stream_reader.ts b/nodejs/tests/stream_reader.ts new file mode 100644 index 000000000..e08926e1f --- /dev/null +++ b/nodejs/tests/stream_reader.ts @@ -0,0 +1,20 @@ +import { StreamManager } from '../stream_manager.js'; +import { StreamCut } from '../stream_reader_group.js'; + +const stream_manager = StreamManager('tcp://127.0.0.1:9090', false, false, true); + +const stream_reader_group = stream_manager.create_reader_group(StreamCut.head(), 'rg1', 'scope1', 'stream1'); +const stream_reader = stream_reader_group.create_reader('r1'); + +try { + const seg_slice = await stream_reader.get_segment_slice(); + const enc = new TextDecoder('utf-8'); + for (const event of seg_slice) { + const raw_value = event.data(); + console.log(`Event at ${event.offset()} reads ${enc.decode(raw_value)}`); + } +} catch (e) { + console.log(e); +} finally { + stream_reader_group.reader_offline('r1'); +} diff --git a/nodejs/tests/write.py b/nodejs/tests/write.py new file mode 100644 index 000000000..6f8814a2c --- /dev/null +++ b/nodejs/tests/write.py @@ -0,0 +1,31 @@ +import asyncio + +import pravega_client + +SCOPE = 'scope1' +STREAM = 'stream1' + +manager = pravega_client.StreamManager("127.0.0.1:9090") + +manager.create_scope(SCOPE) +manager.create_stream(SCOPE, STREAM, 1) + +writer = manager.create_writer(SCOPE, STREAM) +writer.write_event("e1") +writer.write_event("e2") +writer.write_event("e3") +writer.write_event("e4") +writer.flush() + +reader_group = manager.create_reader_group('rg2', SCOPE, STREAM) +reader = reader_group.create_reader("r2") + +try: + async def read(): + slice = await reader.get_segment_slice_async() + for event in slice: + print(event.data()) + + asyncio.run(read()) +finally: + reader.reader_offline() diff --git a/nodejs/tsconfig.json b/nodejs/tsconfig.json index aebcadda6..fbad7f669 100644 --- a/nodejs/tsconfig.json +++ b/nodejs/tsconfig.json @@ -1,10 +1,10 @@ { "compilerOptions": { "sourceMap": true, - "target": "es6", + "target": "esnext", "module": "esnext", "esModuleInterop": true, - "moduleResolution": "nodenext", + "moduleResolution": "node", "rootDir": "./" } } From 66ede75354025adcb1f43bf87eedff348ccb7ed8 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 14 Mar 2022 16:07:01 +0800 Subject: [PATCH 07/16] add header and arc Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_reader.rs | 2 +- nodejs/tests/stream_reader.ts | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs index 7ae8e349f..3e6a1883b 100644 --- a/nodejs/src/stream_reader.rs +++ b/nodejs/src/stream_reader.rs @@ -146,7 +146,7 @@ impl StreamReader { let (deferred, promise) = cx.promise(); // Spawn an `async` task on the tokio runtime. - let reader = stream_reader.reader.clone(); + let reader = Arc::clone(&stream_reader.reader); stream_reader.runtime_handle.spawn(async move { // expensive async procrdure executed in the tokio thread let slice_result = reader.lock().await.acquire_segment().await; diff --git a/nodejs/tests/stream_reader.ts b/nodejs/tests/stream_reader.ts index e08926e1f..c776d06f8 100644 --- a/nodejs/tests/stream_reader.ts +++ b/nodejs/tests/stream_reader.ts @@ -1,3 +1,16 @@ +// Copyright Pravega Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + import { StreamManager } from '../stream_manager.js'; import { StreamCut } from '../stream_reader_group.js'; @@ -8,10 +21,10 @@ const stream_reader = stream_reader_group.create_reader('r1'); try { const seg_slice = await stream_reader.get_segment_slice(); - const enc = new TextDecoder('utf-8'); + const dec = new TextDecoder('utf-8'); for (const event of seg_slice) { const raw_value = event.data(); - console.log(`Event at ${event.offset()} reads ${enc.decode(raw_value)}`); + console.log(`Event at ${event.offset()} reads ${dec.decode(raw_value)}`); } } catch (e) { console.log(e); From c92c04cac8a53eaea9fa36f2819c4187f90f18dc Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Tue, 15 Mar 2022 10:52:00 +0800 Subject: [PATCH 08/16] fix doc Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_reader.rs | 2 +- nodejs/stream_manager.ts | 5 ++++- nodejs/stream_reader.ts | 5 +++++ nodejs/stream_reader_group.ts | 2 ++ 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs index 3e6a1883b..cd4d0c44e 100644 --- a/nodejs/src/stream_reader.rs +++ b/nodejs/src/stream_reader.rs @@ -148,7 +148,7 @@ impl StreamReader { // Spawn an `async` task on the tokio runtime. let reader = Arc::clone(&stream_reader.reader); stream_reader.runtime_handle.spawn(async move { - // expensive async procrdure executed in the tokio thread + // expensive async procedure executed in the tokio thread let slice_result = reader.lock().await.acquire_segment().await; // notify and execute in the javascript main thread diff --git a/nodejs/stream_manager.ts b/nodejs/stream_manager.ts index be6c02949..3e8cecf72 100644 --- a/nodejs/stream_manager.ts +++ b/nodejs/stream_manager.ts @@ -200,8 +200,11 @@ export const StreamManager = ( /** * Create a ReaderGroup for a given Stream. * + * @param stream_cut The offset you would like to read from. + * @param reader_group_name The reader group name. * @param scope_name The scope name. - * @returns All stream names in this scope. + * @param streams All stream names in this scope. + * @returns A StreamReaderGroup. * @todo An optional element cannot follow a rest element. `...args: [...stream: string[], stream_cut?: StreamCut]` */ const create_reader_group = ( diff --git a/nodejs/stream_reader.ts b/nodejs/stream_reader.ts index 6f9fcc759..74a58fbe8 100644 --- a/nodejs/stream_reader.ts +++ b/nodejs/stream_reader.ts @@ -38,12 +38,14 @@ interface Event { * @returns ArrayBuffer that contains raw data. */ data: () => ArrayBuffer; + /** * Return the event offset in the segment. * * @returns offset */ offset: () => number; + toString: () => string; } @@ -105,15 +107,18 @@ export interface StreamReader { * @returns Slice in Promise. */ get_segment_slice: () => Promise; + /** * Mark the reader as offline. * This will ensure the segments owned by this reader is distributed to other readers in the ReaderGroup. */ reader_offline: () => void; + /** * Release a partially read segment slice back to event reader. */ release_segment: (slice: Slice) => void; + toString: () => string; } diff --git a/nodejs/stream_reader_group.ts b/nodejs/stream_reader_group.ts index 1a6067f81..57704de8b 100644 --- a/nodejs/stream_reader_group.ts +++ b/nodejs/stream_reader_group.ts @@ -54,6 +54,7 @@ export interface StreamReaderGroup { * @returns The StreamReader */ create_reader: (reader_name: string) => StreamReader; + /** * Invoked when a reader that was added to the group is no longer consuming events. This will * cause the events that were going to that reader to be redistributed among the other @@ -63,6 +64,7 @@ export interface StreamReaderGroup { * @param reader_name The name of the reader that is offline. */ reader_offline: (reader_name: string) => void; + toString: () => string; } From 53275f1a6e11d164b1da905f7cd5d7cda10cc2ce Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 10:59:35 +0800 Subject: [PATCH 09/16] fix redundant clone Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_manager.rs | 26 +++++++++++++------------- nodejs/src/stream_reader_group.rs | 4 ++-- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index 1f4ce9684..867ad8e89 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -404,7 +404,7 @@ impl StreamManager { let disable_cert_verification = cx.argument::(3)?.value(&mut cx); let stream_manager = StreamManager::new( - &controller_uri.to_string(), + &controller_uri, auth_enabled, tls_enabled, disable_cert_verification, @@ -417,7 +417,7 @@ impl StreamManager { let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; let scope_name = cx.argument::(0)?.value(&mut cx); - let scope_result = stream_manager.create_scope(&scope_name.to_string()); + let scope_result = stream_manager.create_scope(&scope_name); match scope_result { Ok(t) => Ok(cx.boolean(t)), Err(e) => cx.throw_error(e.to_string()), @@ -428,7 +428,7 @@ impl StreamManager { let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; let scope_name = cx.argument::(0)?.value(&mut cx); - let scope_result = stream_manager.delete_scope(&scope_name.to_string()); + let scope_result = stream_manager.delete_scope(&scope_name); match scope_result { Ok(t) => Ok(cx.boolean(t)), Err(e) => cx.throw_error(e.to_string()), @@ -473,8 +473,8 @@ impl StreamManager { .collect::, _>>()?; let stream_result = stream_manager.create_stream_with_policy( - &scope_name.to_string(), - &stream_name.to_string(), + &scope_name, + &stream_name, // TODO: Handle> auto dereferencing won't work for this (no idea why), // so manually getting the internal scaling and retention is required. // https://docs.rs/neon/latest/neon/types/struct.JsBox.html#deref-behavior @@ -508,8 +508,8 @@ impl StreamManager { .collect::, _>>()?; let stream_result = stream_manager.update_stream_with_policy( - &scope_name.to_string(), - &stream_name.to_string(), + &scope_name, + &stream_name, scaling_policy.scaling.clone(), retention_policy.retention.clone(), match tags.len() { @@ -530,7 +530,7 @@ impl StreamManager { let scope_name = cx.argument::(0)?.value(&mut cx); let stream_name = cx.argument::(1)?.value(&mut cx); - let tags = match stream_manager.get_stream_tags(&scope_name.to_string(), &stream_name.to_string()) { + let tags = match stream_manager.get_stream_tags(&scope_name, &stream_name) { Ok(val) => match val { Some(val) => val, None => return Ok(cx.empty_array()), @@ -558,7 +558,7 @@ impl StreamManager { let scope_name = cx.argument::(0)?.value(&mut cx); let stream_name = cx.argument::(1)?.value(&mut cx); - let scope_result = stream_manager.seal_stream(&scope_name.to_string(), &stream_name.to_string()); + let scope_result = stream_manager.seal_stream(&scope_name, &stream_name); match scope_result { Ok(t) => Ok(cx.boolean(t)), Err(e) => cx.throw_error(e.to_string()), @@ -570,7 +570,7 @@ impl StreamManager { let scope_name = cx.argument::(0)?.value(&mut cx); let stream_name = cx.argument::(1)?.value(&mut cx); - let scope_result = stream_manager.delete_stream(&scope_name.to_string(), &stream_name.to_string()); + let scope_result = stream_manager.delete_stream(&scope_name, &stream_name); match scope_result { Ok(t) => Ok(cx.boolean(t)), Err(e) => cx.throw_error(e.to_string()), @@ -583,7 +583,7 @@ impl StreamManager { let stream_manager = cx.this().downcast_or_throw::, _>(&mut cx)?; let scope_name = cx.argument::(0)?.value(&mut cx); - let streams = stream_manager.list_streams(&scope_name.to_string()); + let streams = stream_manager.list_streams(&scope_name); let streams_length: u32 = match streams.len().try_into() { Ok(val) => val, Err(_err) => return cx.throw_error("Too many streams"), @@ -613,8 +613,8 @@ impl StreamManager { let stream_cut = cx.argument::>(3)?; let stream_reader_group = stream_manager.create_reader_group( - &reader_group_name.to_string(), - &scope_name.to_string(), + &reader_group_name, + &scope_name, streams, &stream_cut.stream_cut, ); diff --git a/nodejs/src/stream_reader_group.rs b/nodejs/src/stream_reader_group.rs index 7fa9e2008..4d4282985 100644 --- a/nodejs/src/stream_reader_group.rs +++ b/nodejs/src/stream_reader_group.rs @@ -115,7 +115,7 @@ impl StreamReaderGroup { .downcast_or_throw::, _>(&mut cx)?; let reader_name = cx.argument::(0)?.value(&mut cx); - Ok(cx.boxed(stream_reader_group.create_reader(&reader_name.to_string()))) + Ok(cx.boxed(stream_reader_group.create_reader(&reader_name))) } pub fn js_reader_offline(mut cx: FunctionContext) -> JsResult { @@ -124,7 +124,7 @@ impl StreamReaderGroup { .downcast_or_throw::, _>(&mut cx)?; let reader_name = cx.argument::(0)?.value(&mut cx); - let res = stream_reader_group.reader_offline(&reader_name.to_string()); + let res = stream_reader_group.reader_offline(&reader_name); match res { Ok(()) => Ok(cx.undefined()), Err(e) => cx.throw_error(e.to_string()), From d47edaec116fdead5636d3660d1e00ee7a2a4c3e Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 11:11:30 +0800 Subject: [PATCH 10/16] fix clone_double_ref and absurd_extreme_comparison Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_manager.rs | 4 ++-- nodejs/src/stream_reader.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index 867ad8e89..d3e404c24 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -481,7 +481,7 @@ impl StreamManager { scaling_policy.scaling.clone(), retention_policy.retention.clone(), match tags.len() { - l if l <= 0 => None, + l if l == 0 => None, _ => Some(tags), }, ); @@ -513,7 +513,7 @@ impl StreamManager { scaling_policy.scaling.clone(), retention_policy.retention.clone(), match tags.len() { - l if l <= 0 => None, + l if l == 0 => None, _ => Some(tags), }, ); diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs index cd4d0c44e..8eb8a7f10 100644 --- a/nodejs/src/stream_reader.rs +++ b/nodejs/src/stream_reader.rs @@ -39,7 +39,7 @@ impl EventData { /// pub fn js_data(mut cx: FunctionContext) -> JsResult { let event_data = cx.this().downcast_or_throw::, _>(&mut cx)?; - let data = event_data.value.as_slice().clone(); + let data = event_data.value.as_slice(); Ok(JsArrayBuffer::external(&mut cx, data.to_owned())) } From b2114c851739c96fec948dc3ec370d9cff69a7ec Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 11:22:56 +0800 Subject: [PATCH 11/16] add ci test Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .github/workflows/nodejs_test.yml | 58 +++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 .github/workflows/nodejs_test.yml diff --git a/.github/workflows/nodejs_test.yml b/.github/workflows/nodejs_test.yml new file mode 100644 index 000000000..483494aea --- /dev/null +++ b/.github/workflows/nodejs_test.yml @@ -0,0 +1,58 @@ +on: + # Trigger the workflow on push or pull request, + # but only for the master branch + push: + branches: + - master + pull_request: + branches: + - master + +name: nodejstest + +jobs: + test_jest: + name: run-jest + runs-on: ubuntu-20.04 + timeout-minutes: 25 + steps: + - uses: actions/checkout@v2 + - uses: actions/setup-java@v1 + with: + java-version: "11" # The JDK version to make available on the path. + - name: Download and Run Pravega standalone + run: | + wget https://github.com/pravega/pravega/releases/download/v0.10.1/pravega-0.10.1.tgz + tar -xzvf pravega-0.10.1.tgz + pravega-0.10.1/bin/pravega-standalone > pravega.log 2>&1 & + sleep 20 && echo "Started standalone" + - name: Set up Nodejs + uses: actions/setup-node@v2 + with: + node-version: "16" + - name: Install modules + working-directory: ./nodejs + run: npm i + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.6 + - name: Install stable toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: stable + override: true + components: rustfmt, clippy + - name: Test + working-directory: ./nodejs + run: | + node --loader ts-node/esm tests/stream_manager.ts + python3 tests/write.py + node --loader ts-node/esm tests/stream_reader.ts + - name: Upload Pravega standalone logs + uses: actions/upload-artifact@v2 + if: always() + with: + name: pravega-standalone-log + path: pravega.log + retention-days: 5 From 05c354ad3fd225678b0abdd0f24fe654061698d8 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 11:43:55 +0800 Subject: [PATCH 12/16] fix CI test Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .github/workflows/nodejs_test.yml | 2 ++ nodejs/stream_reader.ts | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/nodejs_test.yml b/.github/workflows/nodejs_test.yml index 483494aea..415b16630 100644 --- a/.github/workflows/nodejs_test.yml +++ b/.github/workflows/nodejs_test.yml @@ -37,6 +37,8 @@ jobs: uses: actions/setup-python@v2 with: python-version: 3.6 + - name: Install pravega + run: pip install pravega - name: Install stable toolchain uses: actions-rs/toolchain@v1 with: diff --git a/nodejs/stream_reader.ts b/nodejs/stream_reader.ts index 74a58fbe8..9a34e172d 100644 --- a/nodejs/stream_reader.ts +++ b/nodejs/stream_reader.ts @@ -34,14 +34,14 @@ const { interface Event { /** * Return the event data as `ArrayBuffer`. - * + * * @returns ArrayBuffer that contains raw data. */ data: () => ArrayBuffer; /** * Return the event offset in the segment. - * + * * @returns offset */ offset: () => number; From f77e5ce3d09592a6c8ea1221eb48ea7f47095f4d Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 11:51:54 +0800 Subject: [PATCH 13/16] remove unnecessary todo Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_manager.rs | 1 - nodejs/src/stream_reader.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index d3e404c24..a8425c5a9 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -175,7 +175,6 @@ impl StreamManager { let controller = self.cf.controller_client(); let scope_name = Scope::from(scope_name.to_string()); - // TODO: async? handle.block_on(controller.create_scope(&scope_name)) } diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs index 8eb8a7f10..d5c834daf 100644 --- a/nodejs/src/stream_reader.rs +++ b/nodejs/src/stream_reader.rs @@ -100,7 +100,6 @@ impl Slice { match event_data { Some(event_data) => Ok(cx.boxed(event_data)), - // TODO: better return something like undefined instead of throwing an error? None => cx.throw_error("No more data in the stream!"), } } From a298bb412463bb515fd0d4dd5aecc9aec37c4e4d Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 11 Apr 2022 12:01:03 +0800 Subject: [PATCH 14/16] update python in CI Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- .github/workflows/nodejs_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/nodejs_test.yml b/.github/workflows/nodejs_test.yml index 415b16630..440ca35e0 100644 --- a/.github/workflows/nodejs_test.yml +++ b/.github/workflows/nodejs_test.yml @@ -36,7 +36,7 @@ jobs: - name: Set up Python uses: actions/setup-python@v2 with: - python-version: 3.6 + python-version: 3.8 - name: Install pravega run: pip install pravega - name: Install stable toolchain From 5c9dc290aad2cf452748c1ae7c7f52c810ebf11f Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 25 Apr 2022 10:27:47 +0800 Subject: [PATCH 15/16] display the internal error Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_manager.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/nodejs/src/stream_manager.rs b/nodejs/src/stream_manager.rs index a8425c5a9..bef19abc7 100644 --- a/nodejs/src/stream_manager.rs +++ b/nodejs/src/stream_manager.rs @@ -534,9 +534,11 @@ impl StreamManager { Some(val) => val, None => return Ok(cx.empty_array()), }, - Err(_err) => { - // TODO: display the error details. - return cx.throw_error("Internal controller error when getting the StreamConfiguration"); + Err(err) => { + return cx.throw_error(format!( + "Internal error in getting StreamConfiguration: {:?}", + err + )); } }; let tags_length: u32 = match tags.len().try_into() { From 64a58484e023e9534f46ec1c8d51a47641881e65 Mon Sep 17 00:00:00 2001 From: thekingofcity <3353040+thekingofcity@users.noreply.github.com> Date: Mon, 25 Apr 2022 10:29:04 +0800 Subject: [PATCH 16/16] fix typo Signed-off-by: thekingofcity <3353040+thekingofcity@users.noreply.github.com> --- nodejs/src/stream_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodejs/src/stream_reader.rs b/nodejs/src/stream_reader.rs index d5c834daf..42d5452bf 100644 --- a/nodejs/src/stream_reader.rs +++ b/nodejs/src/stream_reader.rs @@ -133,7 +133,7 @@ impl StreamReader { impl StreamReader { /// /// Return a Slice in an asynchronous call. - /// The actural returned type from await will be `Promise` aka `JsResult>>`. + /// The actual returned type from await will be `Promise` aka `JsResult>>`. /// /// See the tokio-fetch example for more details on how to return a Promise and await. /// https://github.com/neon-bindings/examples/tree/2dbbef55f483635d0118c20c9902bf4c6faa1ecc/examples/tokio-fetch