From acb796927a0c64b5eaafb41f9a3ead91a070d816 Mon Sep 17 00:00:00 2001 From: Woodson Delhia Date: Sat, 4 Apr 2020 00:44:05 +0900 Subject: [PATCH 1/3] Added scanStream and hscanStream --- src/Database/Redis.js | 48 +++++++++++++++++++++++++++++++++++++++++ src/Database/Redis.purs | 27 +++++++++++++++++++++++ test/Main.purs | 31 +++++++++++++++++++++++--- 3 files changed, 103 insertions(+), 3 deletions(-) diff --git a/src/Database/Redis.js b/src/Database/Redis.js index 03da61f..150d247 100644 --- a/src/Database/Redis.js +++ b/src/Database/Redis.js @@ -624,3 +624,51 @@ exports.zscoreImpl = function(conn) { }; }; }; + +exports.scanStreamImpl = function(conn){ + return function(options){ + return function (onError, onSuccess){ + var stream = conn.scanStream(options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", onSuccess) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } + } + } +} + +exports.hscanStreamImpl = function(conn){ + return function(options){ + return function(hash){ + return function (onError, onSuccess){ + var stream = conn.hscanStream(hash,options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(result){ + var arr = []; + for (var i = 0; i < result.length; i += 2) { + arr.push({ key: result[i], value: result[i + 1] }); + } + onSuccess(arr); + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } + } + } + } +} \ No newline at end of file diff --git a/src/Database/Redis.purs b/src/Database/Redis.purs index 6eb300d..13ca606 100644 --- a/src/Database/Redis.purs +++ b/src/Database/Redis.purs @@ -6,6 +6,7 @@ module Database.Redis , IPFamily , negInf , posInf + , ScanStreamOptions , Write(..) , Zadd(..) , ZaddReturn(..) @@ -22,6 +23,7 @@ module Database.Redis , hget , hgetall , hset + , hscanStream , get , incr , keys @@ -33,6 +35,7 @@ module Database.Redis , rpop , rpush , set + , scanStream , withConnection , zadd , zcard @@ -59,6 +62,7 @@ import Data.Maybe (Maybe(..)) import Data.NonEmpty (NonEmpty) import Data.Nullable (Nullable, toMaybe, toNullable) import Data.Tuple (Tuple(..)) +import Prim.Row (class Union) import Unsafe.Coerce (unsafeCoerce) -------------------------------------------------------------------------------- @@ -549,3 +553,26 @@ zscore -> ByteString -> Aff (Maybe Int53) zscore conn key = (toMaybe <$> _) <<< fromEffectFnAff <<< zscoreImpl conn key + +type ScanStreamOptions = + ( count :: Int + , match :: String + ) + +foreign import scanStreamImpl :: forall opts. Connection -> Record opts -> EffectFnAff (Array String) +foreign import hscanStreamImpl :: forall opts. Connection -> Record opts -> String -> EffectFnAff (Array {key :: String, value :: String}) + +scanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> Aff (Array String) +scanStream redis options = fromEffectFnAff $ scanStreamImpl redis options + +hscanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> String + -> Aff (Array {key :: String, value :: String}) +hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash \ No newline at end of file diff --git a/test/Main.purs b/test/Main.purs index c7b801d..a36d047 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -4,11 +4,9 @@ module Test.Main import Prelude -import Effect.Aff (Aff, Milliseconds(Milliseconds), delay, forkAff) -import Effect (Effect) import Control.Monad.Except (catchError, throwError) import Data.Array (drop, filter, fromFoldable, sort, sortWith, take) -import Data.ByteString (ByteString) +import Data.ByteString (ByteString, Encoding(..)) import Data.ByteString as ByteString import Data.Foldable (length) import Data.Int53 (fromInt) @@ -16,6 +14,8 @@ import Data.Maybe (Maybe(..)) import Data.NonEmpty (singleton, (:|)) import Database.Redis (Connection, Expire(..), Write(..), ZscoreInterval(..), Config, defaultConfig, flushdb, keys, negInf, posInf, withConnection) import Database.Redis as Redis +import Effect (Effect) +import Effect.Aff (Aff, Milliseconds(Milliseconds), delay, forkAff) import Test.Unit (TestSuite, suite) import Test.Unit as Test.Unit import Test.Unit.Assert as Assert @@ -24,6 +24,9 @@ import Test.Unit.Main (runTest) b :: String -> ByteString b = ByteString.toUTF8 +text :: ByteString -> String +text = flip ByteString.toString UTF8 + test :: forall a . Config @@ -470,3 +473,25 @@ main = runTest $ do void $ Redis.rpush conn2 testList value1 v <- Redis.brpopIndef conn (singleton testList) Assert.equal v.value value1 + + suite "scan stream" do + test addr "scan stream all keys" $ \conn -> do + void $ Redis.incr conn key1 + void $ Redis.incr conn key2 + got <- Redis.scanStream conn {} + Assert.equal (sort [text key1, text key2]) (sort got) + + suite "hscan stream" do + let + testHash = b "testHash" + value1 = { key: key1, value: b "val1" } + value2 = { key: key2, value: b "val2" } + + test addr "hscan stream all keys" $ \conn -> do + void $ Redis.hset conn testHash value1.key value1.value + void $ Redis.hset conn testHash value2.key value2.value + values <- Redis.hscanStream conn {} (text testHash) + + Assert.equal + [text value1.value, text value2.value] + (map _.value <<< sortWith _.key $ values) \ No newline at end of file From 16f04ba4428eda602464b11cc0647545c79e7492 Mon Sep 17 00:00:00 2001 From: Woodson Delhia Date: Sat, 4 Apr 2020 20:13:19 +0900 Subject: [PATCH 2/3] added node-stream package and added stream as a return value for all scan stream functions --- bower.json | 63 +++++++++++++++++++++-------------------- src/Database/Redis.js | 32 ++++++++++++--------- src/Database/Redis.purs | 24 ++++++++++++---- test/Main.purs | 5 ++-- 4 files changed, 71 insertions(+), 53 deletions(-) diff --git a/bower.json b/bower.json index d3911ef..c1ffdbd 100644 --- a/bower.json +++ b/bower.json @@ -1,33 +1,34 @@ { - "name": "purescript-redis-client", - "license": [ - "BSD-3-Clause" - ], - "repository": { - "type": "git", - "url": "https://github.com/paluh/purescript-redis-client.git" - }, - "ignore": [ - "**/.*", - "node_modules", - "bower_components", - "output" - ], - "dependencies": { - "purescript-aff": "^v5.1.2", - "purescript-arrays": "^v5.3.1", - "purescript-bytestrings": "^v8.0.0", - "purescript-console": "^v4.4.0", - "purescript-effect": "^v2.0.1", - "purescript-either": "^v4.1.1", - "purescript-int-53": "^v4.0.0", - "purescript-maybe": "^v4.0.1", - "purescript-nullable": "^v4.1.1", - "purescript-prelude": "^v4.1.1", - "purescript-transformers": "^v4.2.0" - }, - "devDependencies": { - "purescript-psci-support": "^v4.0.0", - "purescript-test-unit": "^v15.0.0" - } + "name": "purescript-redis-client", + "license": [ + "BSD-3-Clause" + ], + "repository": { + "type": "git", + "url": "https://github.com/paluh/purescript-redis-client.git" + }, + "ignore": [ + "**/.*", + "node_modules", + "bower_components", + "output" + ], + "dependencies": { + "purescript-aff": "^v5.1.2", + "purescript-arrays": "^v5.3.1", + "purescript-bytestrings": "^v8.0.0", + "purescript-console": "^v4.4.0", + "purescript-effect": "^v2.0.1", + "purescript-either": "^v4.1.1", + "purescript-int-53": "^v4.0.0", + "purescript-maybe": "^v4.0.1", + "purescript-nullable": "^v4.1.1", + "purescript-prelude": "^v4.1.1", + "purescript-transformers": "^v4.2.0", + "purescript-node-streams": "^4.0.1" + }, + "devDependencies": { + "purescript-psci-support": "^v4.0.0", + "purescript-test-unit": "^v15.0.0" + } } diff --git a/src/Database/Redis.js b/src/Database/Redis.js index 150d247..b8ddf59 100644 --- a/src/Database/Redis.js +++ b/src/Database/Redis.js @@ -627,19 +627,23 @@ exports.zscoreImpl = function(conn) { exports.scanStreamImpl = function(conn){ return function(options){ - return function (onError, onSuccess){ - var stream = conn.scanStream(options) - - // `resultKeys` is an array of strings representing key names. - // Note that resultKeys may contain 0 keys, and that it will sometimes - // contain duplicates due to SCAN's implementation in Redis. - // woody: Should I check for duplicates here or leave it to end user?? - stream.on("data", onSuccess) - - stream.on("error", onError) - - return function (cancelError, cancelerError, cancelerSuccess) { - cancelerSuccess() + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.scanStream(options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(keys){ + onSuccess(tuple(keys)(stream)) + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } } } } @@ -660,7 +664,7 @@ exports.hscanStreamImpl = function(conn){ for (var i = 0; i < result.length; i += 2) { arr.push({ key: result[i], value: result[i + 1] }); } - onSuccess(arr); + onSuccess(tuple(arr)(stream)) }) stream.on("error", onError) diff --git a/src/Database/Redis.purs b/src/Database/Redis.purs index 13ca606..22a2ec8 100644 --- a/src/Database/Redis.purs +++ b/src/Database/Redis.purs @@ -63,6 +63,7 @@ import Data.NonEmpty (NonEmpty) import Data.Nullable (Nullable, toMaybe, toNullable) import Data.Tuple (Tuple(..)) import Prim.Row (class Union) +import Node.Stream import Unsafe.Coerce (unsafeCoerce) -------------------------------------------------------------------------------- @@ -559,20 +560,31 @@ type ScanStreamOptions = , match :: String ) -foreign import scanStreamImpl :: forall opts. Connection -> Record opts -> EffectFnAff (Array String) -foreign import hscanStreamImpl :: forall opts. Connection -> Record opts -> String -> EffectFnAff (Array {key :: String, value :: String}) +foreign import scanStreamImpl :: forall opts. + Connection + -> Record opts + -> (Array String -> Readable () -> Tuple (Array String) (Readable ())) + -> EffectFnAff (Tuple (Array String) (Readable ())) + +foreign import hscanStreamImpl :: forall opts. + Connection + -> Record opts + -> String + -> (Array {key :: String, value :: String} -> Readable () -> Tuple (Array {key :: String, value :: String}) (Readable ())) + -> EffectFnAff (Tuple (Array {key :: String, value :: String}) (Readable ())) + scanStream :: forall options t. Union options t ScanStreamOptions => Connection -> Record options - -> Aff (Array String) -scanStream redis options = fromEffectFnAff $ scanStreamImpl redis options + -> Aff (Tuple (Array String) (Readable ())) +scanStream redis options = fromEffectFnAff $ scanStreamImpl redis options Tuple hscanStream :: forall options t. Union options t ScanStreamOptions => Connection -> Record options -> String - -> Aff (Array {key :: String, value :: String}) -hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash \ No newline at end of file + -> Aff (Tuple (Array {key :: String, value :: String}) (Readable ())) +hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash Tuple \ No newline at end of file diff --git a/test/Main.purs b/test/Main.purs index a36d047..1212189 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -12,6 +12,7 @@ import Data.Foldable (length) import Data.Int53 (fromInt) import Data.Maybe (Maybe(..)) import Data.NonEmpty (singleton, (:|)) +import Data.Tuple (fst) import Database.Redis (Connection, Expire(..), Write(..), ZscoreInterval(..), Config, defaultConfig, flushdb, keys, negInf, posInf, withConnection) import Database.Redis as Redis import Effect (Effect) @@ -478,7 +479,7 @@ main = runTest $ do test addr "scan stream all keys" $ \conn -> do void $ Redis.incr conn key1 void $ Redis.incr conn key2 - got <- Redis.scanStream conn {} + got <- fst <$> Redis.scanStream conn {} Assert.equal (sort [text key1, text key2]) (sort got) suite "hscan stream" do @@ -490,7 +491,7 @@ main = runTest $ do test addr "hscan stream all keys" $ \conn -> do void $ Redis.hset conn testHash value1.key value1.value void $ Redis.hset conn testHash value2.key value2.value - values <- Redis.hscanStream conn {} (text testHash) + values <- fst <$> Redis.hscanStream conn {} (text testHash) Assert.equal [text value1.value, text value2.value] From 42bba6caf3b4e95c2005751dc4f3424cb6842531 Mon Sep 17 00:00:00 2001 From: Woodson Delhia Date: Sat, 4 Apr 2020 22:27:48 +0900 Subject: [PATCH 3/3] Added zscanStream --- src/Database/Redis.js | 66 ++++++++++++++++++++++++++++++----------- src/Database/Redis.purs | 17 ++++++++++- test/Main.purs | 33 +++++++++++++++------ 3 files changed, 89 insertions(+), 27 deletions(-) diff --git a/src/Database/Redis.js b/src/Database/Redis.js index b8ddf59..6eeeef2 100644 --- a/src/Database/Redis.js +++ b/src/Database/Redis.js @@ -652,27 +652,59 @@ exports.scanStreamImpl = function(conn){ exports.hscanStreamImpl = function(conn){ return function(options){ return function(hash){ - return function (onError, onSuccess){ - var stream = conn.hscanStream(hash,options) - - // `resultKeys` is an array of strings representing key names. - // Note that resultKeys may contain 0 keys, and that it will sometimes - // contain duplicates due to SCAN's implementation in Redis. - // woody: Should I check for duplicates here or leave it to end user?? - stream.on("data", function(result){ - var arr = []; - for (var i = 0; i < result.length; i += 2) { - arr.push({ key: result[i], value: result[i + 1] }); + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.hscanStream(hash,options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(result){ + var arr = []; + for (var i = 0; i < result.length; i += 2) { + arr.push({ key: result[i], value: result[i + 1] }); + } + onSuccess(tuple(arr)(stream)) + }) + + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() } - onSuccess(tuple(arr)(stream)) - }) + } + } + } + } +} - stream.on("error", onError) +exports.zscanStreamImpl = function(conn){ + return function(options){ + return function(key){ + return function(tuple){ + return function (onError, onSuccess){ + var stream = conn.zscanStream(key,options) + + // `resultKeys` is an array of strings representing key names. + // Note that resultKeys may contain 0 keys, and that it will sometimes + // contain duplicates due to SCAN's implementation in Redis. + // woody: Should I check for duplicates here or leave it to end user?? + stream.on("data", function(result){ + var arr = []; + for (var i = 0; i < result.length; i += 2) { + arr.push({ member: result[i], value: parseFloat(result[i + 1]) }); + } + onSuccess(tuple(arr)(stream)) + }) - return function (cancelError, cancelerError, cancelerSuccess) { - cancelerSuccess() + stream.on("error", onError) + + return function (cancelError, cancelerError, cancelerSuccess) { + cancelerSuccess() + } } } } } -} \ No newline at end of file +} diff --git a/src/Database/Redis.purs b/src/Database/Redis.purs index 22a2ec8..6b3b822 100644 --- a/src/Database/Redis.purs +++ b/src/Database/Redis.purs @@ -48,6 +48,7 @@ module Database.Redis , zremrangebyrank , zremrangebyscore , zrevrangebyscore + , zscanStream , zscore ) where @@ -573,6 +574,12 @@ foreign import hscanStreamImpl :: forall opts. -> (Array {key :: String, value :: String} -> Readable () -> Tuple (Array {key :: String, value :: String}) (Readable ())) -> EffectFnAff (Tuple (Array {key :: String, value :: String}) (Readable ())) +foreign import zscanStreamImpl :: forall opts. + Connection + -> Record opts + -> String + -> (Array {member :: String, score :: Int} -> Readable () -> Tuple (Array {member :: String, score :: Int}) (Readable ())) + -> EffectFnAff (Tuple (Array {member :: String, score :: Int}) (Readable ())) scanStream :: forall options t. Union options t ScanStreamOptions @@ -587,4 +594,12 @@ hscanStream :: forall options t. -> Record options -> String -> Aff (Tuple (Array {key :: String, value :: String}) (Readable ())) -hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash Tuple \ No newline at end of file +hscanStream redis options hash = fromEffectFnAff $ hscanStreamImpl redis options hash Tuple + +zscanStream :: forall options t. + Union options t ScanStreamOptions + => Connection + -> Record options + -> String + -> Aff (Tuple (Array {member :: String, score :: Int}) (Readable ())) +zscanStream redis options key = fromEffectFnAff $ zscanStreamImpl redis options key Tuple \ No newline at end of file diff --git a/test/Main.purs b/test/Main.purs index 1212189..3dd6083 100644 --- a/test/Main.purs +++ b/test/Main.purs @@ -475,24 +475,39 @@ main = runTest $ do v <- Redis.brpopIndef conn (singleton testList) Assert.equal v.value value1 - suite "scan stream" do - test addr "scan stream all keys" $ \conn -> do + suite "scan streams" do + test addr "keys" $ \conn -> do void $ Redis.incr conn key1 void $ Redis.incr conn key2 got <- fst <$> Redis.scanStream conn {} Assert.equal (sort [text key1, text key2]) (sort got) - suite "hscan stream" do - let - testHash = b "testHash" - value1 = { key: key1, value: b "val1" } - value2 = { key: key2, value: b "val2" } + test addr "hash" $ \conn -> do + let + testHash = b "testHash" + value1 = { key: key1, value: b "val1" } + value2 = { key: key2, value: b "val2" } - test addr "hscan stream all keys" $ \conn -> do void $ Redis.hset conn testHash value1.key value1.value void $ Redis.hset conn testHash value2.key value2.value values <- fst <$> Redis.hscanStream conn {} (text testHash) Assert.equal [text value1.value, text value2.value] - (map _.value <<< sortWith _.key $ values) \ No newline at end of file + (map _.value <<< sortWith _.key $ values) + + test addr "sorted set" $ \conn -> do + let + testSet = b "testSet" + members = + {member: b "m1", score: 1 } :| [{ member: b "m2", score: 2 } , { member: b "m3", score: 3 }] + + void $ Redis.zadd + conn + testSet + (Redis.ZaddAll Redis.Added) + members + + values <- fst <$> Redis.zscanStream conn {} (text testSet) + + Assert.equal (map _.score $ fromFoldable members) (map _.score values) \ No newline at end of file