From 546aa4715b269f2451a232369544fce0407fc367 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Mon, 26 Apr 2021 17:01:47 +0300 Subject: [PATCH 01/14] Add the support in the new commands: AI.MODELEXECUTE (AI.MODELRUN is now deprecated) and AI.MODELSTORE (AI.MODELSET is now deprecated). --- redisai/client.py | 100 ++++++++++++++++++++++++++------ redisai/command_builder.py | 58 ++++++++++++++++++- redisai/postprocessor.py | 2 +- test/test.py | 113 ++++++++++++++++++++++++------------- 4 files changed, 214 insertions(+), 59 deletions(-) diff --git a/redisai/client.py b/redisai/client.py index d3a3e0e..8f3c2b4 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -5,6 +5,7 @@ from redis import StrictRedis from redis.client import Pipeline as RedisPipeline import numpy as np +from deprecated import deprecated from . import command_builder as builder from .postprocessor import Processor @@ -88,7 +89,7 @@ def dag(self, load: Sequence = None, persist: Sequence = None, ------- >>> con.tensorset('tensor', ...) 'OK' - >>> con.modelset('model', ...) + >>> con.modelstore('model', ...) 'OK' >>> dag = con.dag(load=['tensor'], persist=['output']) >>> dag.tensorset('another', ...) @@ -126,16 +127,17 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.loadbackend(res) - def modelset(self, - key: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int = None, - minbatch: int = None, - tag: AnyStr = None, - inputs: Union[AnyStr, List[AnyStr]] = None, - outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + def modelstore(self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + minbatchtimeout: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None) -> str: """ Set the model on provided key. @@ -154,6 +156,9 @@ def modelset(self, Number of batches for doing auto-batching minbatch : int Minimum number of samples required in a batch for model execution + minbatchtimeout : int + The max number of miliseconds for which the engine will not trigger an execution if the number of samples + is lower than minbatch (after minbatchtimeout is passed, the execution will start even if minbatch jas not reached) tag : AnyStr Any string that will be saved in RedisAI as tag for the model inputs : Union[AnyStr, List[AnyStr]] @@ -166,6 +171,39 @@ def modelset(self, str 'OK' if success, raise an exception otherwise + Example + ------- + >>> # Torch model + >>> model_path = os.path.join('path/to/TorchScriptModel.pt') + >>> model = open(model_path, 'rb').read() + >>> con.modeltore("model", 'torch', 'cpu', model, tag='v1.0') + 'OK' + >>> # Tensorflow model + >>> model_path = os.path.join('/path/to/tf_frozen_graph.pb') + >>> model = open(model_path, 'rb').read() + >>> con.modelstore('m', 'tf', 'cpu', model, + ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + 'OK' + """ + args = builder.modelstore(key, backend, device, data, + batch, minbatch, minbatchtimeout, tag, inputs, outputs) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.modelstore(res) + + @deprecated(version='1.2.2', reason="Use modelstore instead") + def modelset(self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + """ + Similar to modelstore (this is the deprecated version that will not be supported in future versions). + Example ------- >>> # Torch model @@ -234,13 +272,14 @@ def modeldel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modeldel(res) - def modelrun(self, + def modelexecute(self, key: AnyStr, inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> str: + outputs: Union[AnyStr, List[AnyStr]], + timeout: int = None) -> str: """ Run the model using input(s) which are already in the scope and are associated - to some keys. Modelrun also needs the output key name(s) to store the output + to some keys. Modelexecute also needs the output key name(s) to store the output from the model. The number of outputs from the model and the number of keys provided here must be same. Otherwise, RedisAI throws an error @@ -250,10 +289,13 @@ def modelrun(self, Model key to run inputs : Union[AnyStr, List[AnyStr]] Tensor(s) which is already saved in the RedisAI using a tensorset call. These - tensors will be used as the input for the modelrun + tensors will be used as the inputs for the modelexecute outputs : Union[AnyStr, List[AnyStr]] - keys on which the outputs to be saved. If those keys exist already, modelrun + keys on which the outputs to be saved. If those keys exist already, modelexecute will overwrite them with new values + timeout : int + The max number on milisecinds that may pass before the request is prossced + (meaning that the result will not be computed after that time and TIMEDOUT is returned in that case Returns ------- @@ -262,7 +304,31 @@ def modelrun(self, Example ------- - >>> con.modelset('m', 'tf', 'cpu', model_pb, + >>> con.modelstore('m', 'tf', 'cpu', model_pb, + ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + 'OK' + >>> con.tensorset('a', (2, 3), dtype='float') + 'OK' + >>> con.tensorset('b', (2, 3), dtype='float') + 'OK' + >>> con.modelexecute('m', ['a', 'b'], ['c']) + 'OK' + """ + args = builder.modelexecute(key, inputs, outputs, timeout) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.modelexecute(res) + + @deprecated(version='1.2.2', reason="Use modelexecute instead") + def modelrun(self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> str: + """ + Similar to modelexecute (this is the deprecated version that will not be supported in future versions). + + Example + ------- + >>> con.modelstore('m', 'tf', 'cpu', model_pb, ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') 'OK' >>> con.tensorset('a', (2, 3), dtype='float') diff --git a/redisai/command_builder.py b/redisai/command_builder.py index eae464e..60dc0a9 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -9,6 +9,47 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence: return 'AI.CONFIG LOADBACKEND', identifier, path +def modelstore(name: AnyStr, backend: str, device: str, data: ByteString, + batch: int, minbatch: int, minbatchtimeout: int, tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: + if device.upper() not in utils.allowed_devices: + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + if backend.upper() not in utils.allowed_backends: + raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") + args = ['AI.MODELSTORE', name, backend, device] + + if tag is not None: + args += ['TAG', tag] + if batch is not None: + args += ['BATCHSIZE', batch] + if minbatch is not None: + if batch is None: + raise ValueError( + 'Minbatch is not allowed without batch') + args += ['MINBATCHSIZE', minbatch] + if minbatchtimeout is not None: + if minbatch is None: + raise ValueError( + 'Minbatchtimeout is not allowed without minbatch') + args += ['MINBATCHTIMEOUT', minbatchtimeout] + + if backend.upper() == 'TF': + if not(all((inputs, outputs))): + raise ValueError( + 'Require keyword arguments inputs and outputs for TF models') + args += ['INPUTS', len(inputs) if isinstance(inputs, List) else 1, *utils.listify(inputs)] + args += ['OUTPUTS', len(outputs) if isinstance(outputs, List) else 1, *utils.listify(outputs)] + elif inputs is not None or outputs is not None: + raise ValueError( + 'Inputs and outputs keywords should not be specified for this backend') + chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable. + data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] + # TODO: need a test case for this + args += ['BLOB', *data_chunks] + return args + + def modelset(name: AnyStr, backend: str, device: str, data: ByteString, batch: int, minbatch: int, tag: AnyStr, inputs: Union[AnyStr, List[AnyStr]], @@ -19,12 +60,15 @@ def modelset(name: AnyStr, backend: str, device: str, data: ByteString, raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") args = ['AI.MODELSET', name, backend, device] + if tag is not None: + args += ['TAG', tag] if batch is not None: args += ['BATCHSIZE', batch] if minbatch is not None: + if batch is None: + raise ValueError( + 'Minbatch is not allowed without batch') args += ['MINBATCHSIZE', minbatch] - if tag is not None: - args += ['TAG', tag] if backend.upper() == 'TF': if not(all((inputs, outputs))): @@ -50,7 +94,15 @@ def modeldel(name: AnyStr) -> Sequence: return 'AI.MODELDEL', name -def modelrun(name: AnyStr, inputs: List[AnyStr], outputs: List[AnyStr]) -> Sequence: +def modelexecute(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], timeout: int) -> Sequence: + args = ['AI.MODELEXECUTE', name, 'INPUTS', len(utils.listify(inputs)), *utils.listify(inputs), 'OUTPUTS', + len(utils.listify(outputs)), *utils.listify(outputs)] + if timeout is not None: + args += ['TIMEOUT', timeout] + return args + + +def modelrun(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: args = ('AI.MODELRUN', name, 'INPUTS', *utils.listify(inputs), 'OUTPUTS', *utils.listify(outputs)) return args diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index e95f9b8..b0b9dc0 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -52,7 +52,7 @@ def infoget(res): # These functions are only doing decoding on the output from redis decoder = staticmethod(decoder) -decoding_functions = ('loadbackend', 'modelset', 'modeldel', 'modelrun', 'tensorset', +decoding_functions = ('loadbackend', 'modelstore', 'modelset', 'modeldel', 'modelexecute', 'modelrun', 'tensorset', 'scriptset', 'scriptdel', 'scriptrun', 'inforeset') for fn in decoding_functions: setattr(Processor, fn, decoder) diff --git a/test/test.py b/test/test.py index d76601a..69a26e9 100644 --- a/test/test.py +++ b/test/test.py @@ -113,7 +113,8 @@ def test_numpy_tensor(self): with self.assertRaises(TypeError): con.tensorset('trying', stringarr) - def test_modelset_errors(self): + # AI.MODELSET is deprecated by AI.MODELSTORE. + def test_deprecated_modelset(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() @@ -123,25 +124,56 @@ def test_modelset_errors(self): with self.assertRaises(ValueError): con.modelset('m', 'wrongbackend', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelset('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + model = con.modelget('m', meta_only=True) + self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) + + def test_modelstore_errors(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_pb = load_model(model_path) + con = self.get_client() + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'wrongdevice', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception).startswith("Device not allowed")) + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'wrongbackend', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception).startswith("Backend not allowed")) + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0', minbatch=2) + self.assertEqual(str(e.exception), "Minbatch is not allowed without batch") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0', batch=4, minbatchtimeout=1000) + self.assertTrue(str(e.exception), "Minbatchtimeout is not allowed without minbatch") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, tag='v1.0') + self.assertTrue(str(e.exception), "Require keyword arguments inputs and outputs for TF models") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'torch', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception), "Inputs and outputs keywords should not be specified for this backend") def test_modelget_meta(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') model = con.modelget('m', meta_only=True) self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) - def test_modelrun_non_list_input_output(self): + def test_modelexecute_non_list_input_output(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.7') con.tensorset('a', (2, 3), dtype='float') con.tensorset('b', (2, 3), dtype='float') - ret = con.modelrun('m', ['a', 'b'], 'out') + ret = con.modelexecute('m', ['a', 'b'], 'out') self.assertEqual(ret, 'OK') def test_nonasciichar(self): @@ -149,11 +181,11 @@ def test_nonasciichar(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset('m' + nonascii, 'tf', 'cpu', model_pb, + con.modelstore('m' + nonascii, 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') con.tensorset('a' + nonascii, (2, 3), dtype='float') con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m' + nonascii, ['a' + nonascii, 'b'], ['c' + nonascii]) + con.modelexecute('m' + nonascii, ['a' + nonascii, 'b'], ['c' + nonascii]) tensor = con.tensorget('c' + nonascii) self.assertTrue((np.allclose(tensor, [4., 9.]))) @@ -165,32 +197,22 @@ def test_run_tf_model(self): wrong_model_pb = load_model(bad_model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') con.modeldel('m') self.assertRaises(ResponseError, con.modelget, 'm') - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs='mul', tag='v1.0') # wrong model - self.assertRaises(ResponseError, - con.modelset, 'm', 'tf', 'cpu', - wrong_model_pb, - inputs=['a', 'b'], outputs=['mul']) - # missing inputs/outputs - self.assertRaises(ValueError, - con.modelset, 'm', 'tf', 'cpu', - wrong_model_pb) - - # wrong backend - self.assertRaises(ResponseError, - con.modelset, 'm', 'torch', 'cpu', - model_pb, - inputs=['a', 'b'], outputs=['mul']) + with self.assertRaises(ResponseError) as e: + con.modelstore('m', 'tf', 'cpu', + wrong_model_pb, inputs=['a', 'b'], outputs=['mul']) + self.assertEqual(str(e.exception), "Invalid GraphDef") con.tensorset('a', (2, 3), dtype='float') con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m', ['a', 'b'], ['c']) + con.modelexecute('m', ['a', 'b'], ['c']) tensor = con.tensorget('c') self.assertTrue(np.allclose([4, 9], tensor)) model_det = con.modelget('m') @@ -226,10 +248,10 @@ def test_run_onnxml_model(self): mlmodel_path = os.path.join(MODEL_DIR, 'boston.onnx') onnxml_model = load_model(mlmodel_path) con = self.get_client() - con.modelset("onnx_model", 'onnx', 'cpu', onnxml_model) + con.modelstore("onnx_model", 'onnx', 'cpu', onnxml_model) tensor = np.ones((1, 13)).astype(np.float32) con.tensorset("input", tensor) - con.modelrun("onnx_model", ["input"], ["output"]) + con.modelexecute("onnx_model", ["input"], ["output"]) # tests `convert_to_num` outtensor = con.tensorget("output", as_numpy=False) self.assertEqual(int(float(outtensor['values'][0])), 24) @@ -239,10 +261,10 @@ def test_run_onnxdl_model(self): dlmodel_path = os.path.join(MODEL_DIR, 'findsquare.onnx') onnxdl_model = load_model(dlmodel_path) con = self.get_client() - con.modelset("onnx_model", 'onnx', 'cpu', onnxdl_model) + con.modelstore("onnx_model", 'onnx', 'cpu', onnxdl_model) tensor = np.array((2,)).astype(np.float32) con.tensorset("input", tensor) - con.modelrun("onnx_model", ["input"], ["output"]) + con.modelexecute("onnx_model", ["input"], ["output"]) outtensor = con.tensorget("output") self.assertTrue(np.allclose(outtensor, [4.0])) @@ -250,10 +272,10 @@ def test_run_pytorch_model(self): model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') ptmodel = load_model(model_path) con = self.get_client() - con.modelset("pt_model", 'torch', 'cpu', ptmodel, tag='v1.0') + con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v1.0') con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') - con.modelrun("pt_model", ["a", "b"], ["output"]) + con.modelexecute("pt_model", ["a", "b"], ["output"]) output = con.tensorget('output', as_numpy=False) self.assertTrue(np.allclose(output['values'], [4, 6, 4, 6])) @@ -261,18 +283,33 @@ def test_run_tflite_model(self): model_path = os.path.join(MODEL_DIR, 'mnist_model_quant.tflite') tflmodel = load_model(model_path) con = self.get_client() - con.modelset("tfl_model", 'tflite', 'cpu', tflmodel) + con.modelstore("tfl_model", 'tflite', 'cpu', tflmodel) img = np.random.random((1, 1, 28, 28)).astype(np.float) con.tensorset('img', img) - con.modelrun("tfl_model", ["img"], ["output1", "output2"]) + con.modelexecute("tfl_model", ["img"], ["output1", "output2"]) output = con.tensorget('output1') self.assertTrue(np.allclose(output, [8])) + # AI.MODELRUN is deprecated by AI.MODELEXECUTE + def test_deprecated_modelrun(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_pb = load_model(model_path) + + con = self.get_client() + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + + con.tensorset('a', (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + con.modelrun('m', ['a', 'b'], ['c']) + tensor = con.tensorget('c') + self.assertTrue(np.allclose([4, 9], tensor)) + def test_info(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul']) first_info = con.infoget('m') expected = {'key': 'm', 'type': 'MODEL', 'backend': 'TF', 'device': 'cpu', @@ -280,8 +317,8 @@ def test_info(self): self.assertEqual(first_info, expected) con.tensorset('a', (2, 3), dtype='float') con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m', ['a', 'b'], ['c']) - con.modelrun('m', ['a', 'b'], ['c']) + con.modelexecute('m', ['a', 'b'], ['c']) + con.modelexecute('m', ['a', 'b'], ['c']) second_info = con.infoget('m') self.assertEqual(second_info['calls'], 2) # 2 model runs con.inforeset('m') @@ -292,13 +329,13 @@ def test_model_scan(self): model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset('m', 'tf', 'cpu', model_pb, + con.modelstore('m', 'tf', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.2') model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') ptmodel = load_model(model_path) con = self.get_client() # TODO: RedisAI modelscan issue - con.modelset("pt_model", 'torch', 'cpu', ptmodel) + con.modelstore("pt_model", 'torch', 'cpu', ptmodel) mlist = con.modelscan() self.assertEqual(mlist, [['pt_model', ''], ['m', 'v1.2']]) @@ -322,7 +359,7 @@ def setUp(self): con = self.get_client() model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') ptmodel = load_model(model_path) - con.modelset("pt_model", 'torch', 'cpu', ptmodel, tag='v7.0') + con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v7.0') def test_dagrun_with_load(self): con = self.get_client() From d6f5b953b38ec4bf4e28b4cb630f40efd7b44f90 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Mon, 26 Apr 2021 17:12:28 +0300 Subject: [PATCH 02/14] Merge with master --- redisai/client.py | 108 ++++++++++++---- redisai/command_builder.py | 85 +++++++++---- redisai/postprocessor.py | 13 +- test/test.py | 245 ++++++++++++++++++------------------- 4 files changed, 266 insertions(+), 185 deletions(-) diff --git a/redisai/client.py b/redisai/client.py index 21b09fb..a9fdd99 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -4,6 +4,7 @@ from redis import StrictRedis import numpy as np +from deprecated import deprecated from redisai import command_builder as builder from redisai.dag import Dag @@ -96,7 +97,7 @@ def dag( ------- >>> con.tensorset('tensor', ...) 'OK' - >>> con.modelset('model', ...) + >>> con.modelstore('model', ...) 'OK' >>> dag = con.dag(load=['tensor'], persist=['output']) >>> dag.tensorset('another', ...) @@ -136,18 +137,17 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.loadbackend(res) - def modelset( - self, - key: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int = None, - minbatch: int = None, - tag: AnyStr = None, - inputs: Union[AnyStr, List[AnyStr]] = None, - outputs: Union[AnyStr, List[AnyStr]] = None, - ) -> str: + def modelstore(self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + minbatchtimeout: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None) -> str: """ Set the model on provided key. @@ -166,6 +166,9 @@ def modelset( Number of batches for doing auto-batching minbatch : int Minimum number of samples required in a batch for model execution + minbatchtimeout : int + The max number of miliseconds for which the engine will not trigger an execution if the number of samples + is lower than minbatch (after minbatchtimeout is passed, the execution will start even if minbatch jas not reached) tag : AnyStr Any string that will be saved in RedisAI as tag for the model inputs : Union[AnyStr, List[AnyStr]] @@ -178,6 +181,39 @@ def modelset( str 'OK' if success, raise an exception otherwise + Example + ------- + >>> # Torch model + >>> model_path = os.path.join('path/to/TorchScriptModel.pt') + >>> model = open(model_path, 'rb').read() + >>> con.modeltore("model", 'torch', 'cpu', model, tag='v1.0') + 'OK' + >>> # Tensorflow model + >>> model_path = os.path.join('/path/to/tf_frozen_graph.pb') + >>> model = open(model_path, 'rb').read() + >>> con.modelstore('m', 'tf', 'cpu', model, + ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + 'OK' + """ + args = builder.modelstore(key, backend, device, data, + batch, minbatch, minbatchtimeout, tag, inputs, outputs) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.modelstore(res) + + @deprecated(version='1.2.2', reason="Use modelstore instead") + def modelset(self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + """ + Similar to modelstore (this is the deprecated version that will not be supported in future versions). + Example ------- >>> # Torch model @@ -247,15 +283,14 @@ def modeldel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modeldel(res) - def modelrun( - self, - key: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]], - ) -> str: + def modelexecute(self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + timeout: int = None) -> str: """ Run the model using input(s) which are already in the scope and are associated - to some keys. Modelrun also needs the output key name(s) to store the output + to some keys. Modelexecute also needs the output key name(s) to store the output from the model. The number of outputs from the model and the number of keys provided here must be same. Otherwise, RedisAI throws an error @@ -265,10 +300,13 @@ def modelrun( Model key to run inputs : Union[AnyStr, List[AnyStr]] Tensor(s) which is already saved in the RedisAI using a tensorset call. These - tensors will be used as the input for the modelrun + tensors will be used as the inputs for the modelexecute outputs : Union[AnyStr, List[AnyStr]] - keys on which the outputs to be saved. If those keys exist already, modelrun + keys on which the outputs to be saved. If those keys exist already, modelexecute will overwrite them with new values + timeout : int + The max number on milisecinds that may pass before the request is prossced + (meaning that the result will not be computed after that time and TIMEDOUT is returned in that case Returns ------- @@ -277,7 +315,31 @@ def modelrun( Example ------- - >>> con.modelset('m', 'tf', 'cpu', model_pb, + >>> con.modelstore('m', 'tf', 'cpu', model_pb, + ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + 'OK' + >>> con.tensorset('a', (2, 3), dtype='float') + 'OK' + >>> con.tensorset('b', (2, 3), dtype='float') + 'OK' + >>> con.modelexecute('m', ['a', 'b'], ['c']) + 'OK' + """ + args = builder.modelexecute(key, inputs, outputs, timeout) + res = self.execute_command(*args) + return res if not self.enable_postprocess else processor.modelexecute(res) + + @deprecated(version='1.2.2', reason="Use modelexecute instead") + def modelrun(self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> str: + """ + Similar to modelexecute (this is the deprecated version that will not be supported in future versions). + + Example + ------- + >>> con.modelstore('m', 'tf', 'cpu', model_pb, ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') 'OK' >>> con.tensorset('a', (2, 3), dtype='float') diff --git a/redisai/command_builder.py b/redisai/command_builder.py index 3087342..5303800 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -9,29 +9,66 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence: return "AI.CONFIG LOADBACKEND", identifier, path -def modelset( - name: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int, - minbatch: int, - tag: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]], -) -> Sequence: +def modelstore(name: AnyStr, backend: str, device: str, data: ByteString, + batch: int, minbatch: int, minbatchtimeout: int, tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: + if device.upper() not in utils.allowed_devices: + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + if backend.upper() not in utils.allowed_backends: + raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") + args = ['AI.MODELSTORE', name, backend, device] + + if tag is not None: + args += ['TAG', tag] + if batch is not None: + args += ['BATCHSIZE', batch] + if minbatch is not None: + if batch is None: + raise ValueError( + 'Minbatch is not allowed without batch') + args += ['MINBATCHSIZE', minbatch] + if minbatchtimeout is not None: + if minbatch is None: + raise ValueError( + 'Minbatchtimeout is not allowed without minbatch') + args += ['MINBATCHTIMEOUT', minbatchtimeout] + + if backend.upper() == 'TF': + if not(all((inputs, outputs))): + raise ValueError( + 'Require keyword arguments inputs and outputs for TF models') + args += ['INPUTS', len(inputs) if isinstance(inputs, List) else 1, *utils.listify(inputs)] + args += ['OUTPUTS', len(outputs) if isinstance(outputs, List) else 1, *utils.listify(outputs)] + elif inputs is not None or outputs is not None: + raise ValueError( + 'Inputs and outputs keywords should not be specified for this backend') + chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable. + data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] + # TODO: need a test case for this + args += ['BLOB', *data_chunks] + return args + + +def modelset(name: AnyStr, backend: str, device: str, data: ByteString, + batch: int, minbatch: int, tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") args = ["AI.MODELSET", name, backend, device] + if tag is not None: + args += ['TAG', tag] if batch is not None: args += ["BATCHSIZE", batch] if minbatch is not None: - args += ["MINBATCHSIZE", minbatch] - if tag is not None: - args += ["TAG", tag] + if batch is None: + raise ValueError( + 'Minbatch is not allowed without batch') + args += ['MINBATCHSIZE', minbatch] if backend.upper() == "TF": if not (all((inputs, outputs))): @@ -56,15 +93,17 @@ def modeldel(name: AnyStr) -> Sequence: return "AI.MODELDEL", name -def modelrun(name: AnyStr, inputs: List[AnyStr], outputs: List[AnyStr]) -> Sequence: - args = ( - "AI.MODELRUN", - name, - "INPUTS", - *utils.listify(inputs), - "OUTPUTS", - *utils.listify(outputs), - ) +def modelexecute(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], timeout: int) -> Sequence: + args = ['AI.MODELEXECUTE', name, 'INPUTS', len(utils.listify(inputs)), *utils.listify(inputs), 'OUTPUTS', + len(utils.listify(outputs)), *utils.listify(outputs)] + if timeout is not None: + args += ['TIMEOUT', timeout] + return args + + +def modelrun(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: + args = ('AI.MODELRUN', name, 'INPUTS', *utils.listify(inputs), 'OUTPUTS', + *utils.listify(outputs)) return args diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index c37c1ef..7bc6119 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -61,16 +61,7 @@ def infoget(res): # These functions are only doing decoding on the output from redis decoder = staticmethod(decoder) -decoding_functions = ( - "loadbackend", - "modelset", - "modeldel", - "modelrun", - "tensorset", - "scriptset", - "scriptdel", - "scriptrun", - "inforeset", -) +decoding_functions = ('loadbackend', 'modelstore', 'modelset', 'modeldel', 'modelexecute', 'modelrun', 'tensorset', + 'scriptset', 'scriptdel', 'scriptrun', 'inforeset') for fn in decoding_functions: setattr(Processor, fn, decoder) diff --git a/test/test.py b/test/test.py index 1dcf8fa..b069b53 100644 --- a/test/test.py +++ b/test/test.py @@ -115,8 +115,9 @@ def test_numpy_tensor(self): with self.assertRaises(TypeError): con.tensorset("trying", stringarr) - def test_modelset_errors(self): - model_path = os.path.join(MODEL_DIR, tf_graph) + # AI.MODELSET is deprecated by AI.MODELSTORE. + def test_deprecated_modelset(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() with self.assertRaises(ValueError): @@ -130,68 +131,72 @@ def test_modelset_errors(self): tag="v1.0", ) with self.assertRaises(ValueError): - con.modelset( - "m", - "wrongbackend", - "cpu", - model_pb, - inputs=["a", "b"], - outputs=["mul"], - tag="v1.0", - ) + con.modelset('m', 'wrongbackend', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelset('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + model = con.modelget('m', meta_only=True) + self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) + + def test_modelstore_errors(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_pb = load_model(model_path) + con = self.get_client() + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'wrongdevice', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception).startswith("Device not allowed")) + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'wrongbackend', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception).startswith("Backend not allowed")) + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0', minbatch=2) + self.assertEqual(str(e.exception), "Minbatch is not allowed without batch") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0', batch=4, minbatchtimeout=1000) + self.assertTrue(str(e.exception), "Minbatchtimeout is not allowed without minbatch") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'tf', 'cpu', model_pb, tag='v1.0') + self.assertTrue(str(e.exception), "Require keyword arguments inputs and outputs for TF models") + with self.assertRaises(ValueError) as e: + con.modelstore('m', 'torch', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + self.assertTrue(str(e.exception), "Inputs and outputs keywords should not be specified for this backend") def test_modelget_meta(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset( - "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" - ) - model = con.modelget("m", meta_only=True) - self.assertEqual( - model, - { - "backend": "TF", - "batchsize": 0, - "device": "cpu", - "inputs": ["a", "b"], - "minbatchsize": 0, - "outputs": ["mul"], - "tag": "v1.0", - }, - ) + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + model = con.modelget('m', meta_only=True) + self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) - def test_modelrun_non_list_input_output(self): - model_path = os.path.join(MODEL_DIR, tf_graph) + def test_modelexecute_non_list_input_output(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') model_pb = load_model(model_path) con = self.get_client() - con.modelset( - "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.7" - ) - con.tensorset("a", (2, 3), dtype="float") - con.tensorset("b", (2, 3), dtype="float") - ret = con.modelrun("m", ["a", "b"], "out") - self.assertEqual(ret, "OK") + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.7') + con.tensorset('a', (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + ret = con.modelexecute('m', ['a', 'b'], 'out') + self.assertEqual(ret, 'OK') def test_nonasciichar(self): nonascii = "ĉ" model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset( - "m" + nonascii, - "tf", - "cpu", - model_pb, - inputs=["a", "b"], - outputs=["mul"], - tag="v1.0", - ) - con.tensorset("a" + nonascii, (2, 3), dtype="float") - con.tensorset("b", (2, 3), dtype="float") - con.modelrun("m" + nonascii, ["a" + nonascii, "b"], ["c" + nonascii]) - tensor = con.tensorget("c" + nonascii) - self.assertTrue((np.allclose(tensor, [4.0, 9.0]))) + con.modelstore('m' + nonascii, 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.tensorset('a' + nonascii, (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + con.modelexecute('m' + nonascii, ['a' + nonascii, 'b'], ['c' + nonascii]) + tensor = con.tensorget('c' + nonascii) + self.assertTrue((np.allclose(tensor, [4., 9.]))) def test_run_tf_model(self): model_path = os.path.join(MODEL_DIR, tf_graph) @@ -201,45 +206,23 @@ def test_run_tf_model(self): wrong_model_pb = load_model(bad_model_path) con = self.get_client() - con.modelset( - "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" - ) - con.modeldel("m") - self.assertRaises(ResponseError, con.modelget, "m") - con.modelset( - "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs="mul", tag="v1.0" - ) + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modeldel('m') + self.assertRaises(ResponseError, con.modelget, 'm') + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs='mul', tag='v1.0') # wrong model - self.assertRaises( - ResponseError, - con.modelset, - "m", - "tf", - "cpu", - wrong_model_pb, - inputs=["a", "b"], - outputs=["mul"], - ) - # missing inputs/outputs - self.assertRaises(ValueError, con.modelset, "m", "tf", "cpu", wrong_model_pb) - - # wrong backend - self.assertRaises( - ResponseError, - con.modelset, - "m", - "torch", - "cpu", - model_pb, - inputs=["a", "b"], - outputs=["mul"], - ) - - con.tensorset("a", (2, 3), dtype="float") - con.tensorset("b", (2, 3), dtype="float") - con.modelrun("m", ["a", "b"], ["c"]) - tensor = con.tensorget("c") + with self.assertRaises(ResponseError) as e: + con.modelstore('m', 'tf', 'cpu', + wrong_model_pb, inputs=['a', 'b'], outputs=['mul']) + self.assertEqual(str(e.exception), "Invalid GraphDef") + + con.tensorset('a', (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + con.modelexecute('m', ['a', 'b'], ['c']) + tensor = con.tensorget('c') self.assertTrue(np.allclose([4, 9], tensor)) model_det = con.modelget("m") self.assertTrue(model_det["backend"] == "TF") @@ -276,10 +259,10 @@ def test_run_onnxml_model(self): mlmodel_path = os.path.join(MODEL_DIR, "boston.onnx") onnxml_model = load_model(mlmodel_path) con = self.get_client() - con.modelset("onnx_model", "onnx", "cpu", onnxml_model) + con.modelstore("onnx_model", 'onnx', 'cpu', onnxml_model) tensor = np.ones((1, 13)).astype(np.float32) con.tensorset("input", tensor) - con.modelrun("onnx_model", ["input"], ["output"]) + con.modelexecute("onnx_model", ["input"], ["output"]) # tests `convert_to_num` outtensor = con.tensorget("output", as_numpy=False) self.assertEqual(int(float(outtensor["values"][0])), 24) @@ -289,10 +272,10 @@ def test_run_onnxdl_model(self): dlmodel_path = os.path.join(MODEL_DIR, "findsquare.onnx") onnxdl_model = load_model(dlmodel_path) con = self.get_client() - con.modelset("onnx_model", "onnx", "cpu", onnxdl_model) + con.modelstore("onnx_model", 'onnx', 'cpu', onnxdl_model) tensor = np.array((2,)).astype(np.float32) con.tensorset("input", tensor) - con.modelrun("onnx_model", ["input"], ["output"]) + con.modelexecute("onnx_model", ["input"], ["output"]) outtensor = con.tensorget("output") self.assertTrue(np.allclose(outtensor, [4.0])) @@ -300,64 +283,70 @@ def test_run_pytorch_model(self): model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) con = self.get_client() - con.modelset("pt_model", "torch", "cpu", ptmodel, tag="v1.0") - con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") - con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") - con.modelrun("pt_model", ["a", "b"], ["output"]) - output = con.tensorget("output", as_numpy=False) - self.assertTrue(np.allclose(output["values"], [4, 6, 4, 6])) + con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v1.0') + con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.modelexecute("pt_model", ["a", "b"], ["output"]) + output = con.tensorget('output', as_numpy=False) + self.assertTrue(np.allclose(output['values'], [4, 6, 4, 6])) def test_run_tflite_model(self): model_path = os.path.join(MODEL_DIR, "mnist_model_quant.tflite") tflmodel = load_model(model_path) con = self.get_client() - con.modelset("tfl_model", "tflite", "cpu", tflmodel) + con.modelstore("tfl_model", 'tflite', 'cpu', tflmodel) img = np.random.random((1, 1, 28, 28)).astype(np.float) - con.tensorset("img", img) - con.modelrun("tfl_model", ["img"], ["output1", "output2"]) - output = con.tensorget("output1") + con.tensorset('img', img) + con.modelexecute("tfl_model", ["img"], ["output1", "output2"]) + output = con.tensorget('output1') self.assertTrue(np.allclose(output, [8])) + # AI.MODELRUN is deprecated by AI.MODELEXECUTE + def test_deprecated_modelrun(self): + model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_pb = load_model(model_path) + + con = self.get_client() + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + + con.tensorset('a', (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + con.modelrun('m', ['a', 'b'], ['c']) + tensor = con.tensorget('c') + self.assertTrue(np.allclose([4, 9], tensor)) + def test_info(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset("m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"]) - first_info = con.infoget("m") - expected = { - "key": "m", - "type": "MODEL", - "backend": "TF", - "device": "cpu", - "tag": "", - "duration": 0, - "samples": 0, - "calls": 0, - "errors": 0, - } + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul']) + first_info = con.infoget('m') + expected = {'key': 'm', 'type': 'MODEL', 'backend': 'TF', 'device': 'cpu', + 'tag': '', 'duration': 0, 'samples': 0, 'calls': 0, 'errors': 0} self.assertEqual(first_info, expected) - con.tensorset("a", (2, 3), dtype="float") - con.tensorset("b", (2, 3), dtype="float") - con.modelrun("m", ["a", "b"], ["c"]) - con.modelrun("m", ["a", "b"], ["c"]) - second_info = con.infoget("m") - self.assertEqual(second_info["calls"], 2) # 2 model runs - con.inforeset("m") - third_info = con.infoget("m") + con.tensorset('a', (2, 3), dtype='float') + con.tensorset('b', (2, 3), dtype='float') + con.modelexecute('m', ['a', 'b'], ['c']) + con.modelexecute('m', ['a', 'b'], ['c']) + second_info = con.infoget('m') + self.assertEqual(second_info['calls'], 2) # 2 model runs + con.inforeset('m') + third_info = con.infoget('m') self.assertEqual(first_info, third_info) # before modelrun and after reset def test_model_scan(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelset( - "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.2" - ) - model_path = os.path.join(MODEL_DIR, torch_graph) + con.modelstore('m', 'tf', 'cpu', model_pb, + inputs=['a', 'b'], outputs=['mul'], tag='v1.2') + model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') ptmodel = load_model(model_path) con = self.get_client() # TODO: RedisAI modelscan issue - con.modelset("pt_model", "torch", "cpu", ptmodel) + con.modelstore("pt_model", 'torch', 'cpu', ptmodel) mlist = con.modelscan() self.assertEqual(mlist, [["pt_model", ""], ["m", "v1.2"]]) @@ -381,7 +370,7 @@ def setUp(self): con = self.get_client() model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) - con.modelset("pt_model", "torch", "cpu", ptmodel, tag="v7.0") + con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v7.0') def test_dagrun_with_load(self): con = self.get_client() From 05d0b71bc8c05937f97ee22e6bd02a3d40f703a5 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Mon, 26 Apr 2021 17:29:51 +0300 Subject: [PATCH 03/14] Add deprecated to requirements --- test-requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/test-requirements.txt b/test-requirements.txt index 38dac3d..252e5cc 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,3 +6,4 @@ nose codecov numpy ml2rt +deprecated \ No newline at end of file From 5cc53b8cd26a8da2f06c9dd02dc041ad1351a3b0 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Mon, 26 Apr 2021 17:35:04 +0300 Subject: [PATCH 04/14] Update docker in CI from redisai/redisai to redislabs/redisai --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 78b65c4..9da13e8 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,7 @@ jobs: build: docker: - image: circleci/python:3.6.9 - - image: redisai/redisai:edge + - image: redislabs/redisai:edge working_directory: ~/repo @@ -47,7 +47,7 @@ jobs: build_nightly: docker: - image: circleci/python:3.6.9 - - image: redisai/redisai:edge + - image: redislabs/redisai:edge working_directory: ~/repo From d16bc6dd543c21cc1313cb2df2993f71ebee0671 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Mon, 26 Apr 2021 17:42:42 +0300 Subject: [PATCH 05/14] fix redisai:edge to redisai:edge-cpu in CI --- .circleci/config.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 9da13e8..b3e9494 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -8,7 +8,7 @@ jobs: build: docker: - image: circleci/python:3.6.9 - - image: redislabs/redisai:edge + - image: redislabs/redisai:edge-cpu working_directory: ~/repo @@ -47,7 +47,7 @@ jobs: build_nightly: docker: - image: circleci/python:3.6.9 - - image: redislabs/redisai:edge + - image: redislabs/redisai:edge-cpu working_directory: ~/repo From 8793fc8c102528284e60a936ac8839380491b601 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 12 May 2021 10:07:44 +0300 Subject: [PATCH 06/14] Formatting --- redisai/command_builder.py | 38 +++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/redisai/command_builder.py b/redisai/command_builder.py index 5303800..f552159 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -1,7 +1,10 @@ from typing import Union, AnyStr, ByteString, List, Sequence + import numpy as np + from . import utils + # TODO: mypy check @@ -10,9 +13,9 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence: def modelstore(name: AnyStr, backend: str, device: str, data: ByteString, - batch: int, minbatch: int, minbatchtimeout: int, tag: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: + batch: int, minbatch: int, minbatchtimeout: int, tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: @@ -35,11 +38,11 @@ def modelstore(name: AnyStr, backend: str, device: str, data: ByteString, args += ['MINBATCHTIMEOUT', minbatchtimeout] if backend.upper() == 'TF': - if not(all((inputs, outputs))): + if not (all((inputs, outputs))): raise ValueError( 'Require keyword arguments inputs and outputs for TF models') - args += ['INPUTS', len(inputs) if isinstance(inputs, List) else 1, *utils.listify(inputs)] - args += ['OUTPUTS', len(outputs) if isinstance(outputs, List) else 1, *utils.listify(outputs)] + args += ['INPUTS', len(inputs) if isinstance(inputs, List) else 1, *utils.listify(inputs)] + args += ['OUTPUTS', len(outputs) if isinstance(outputs, List) else 1, *utils.listify(outputs)] elif inputs is not None or outputs is not None: raise ValueError( 'Inputs and outputs keywords should not be specified for this backend') @@ -76,7 +79,7 @@ def modelset(name: AnyStr, backend: str, device: str, data: ByteString, args += ["INPUTS", *utils.listify(inputs)] args += ["OUTPUTS", *utils.listify(outputs)] chunk_size = 500 * 1024 * 1024 - data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i: i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -93,8 +96,9 @@ def modeldel(name: AnyStr) -> Sequence: return "AI.MODELDEL", name -def modelexecute(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], timeout: int) -> Sequence: - args = ['AI.MODELEXECUTE', name, 'INPUTS', len(utils.listify(inputs)), *utils.listify(inputs), 'OUTPUTS', +def modelexecute(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], + timeout: int) -> Sequence: + args = ['AI.MODELEXECUTE', name, 'INPUTS', len(utils.listify(inputs)), *utils.listify(inputs), 'OUTPUTS', len(utils.listify(outputs)), *utils.listify(outputs)] if timeout is not None: args += ['TIMEOUT', timeout] @@ -112,10 +116,10 @@ def modelscan() -> Sequence: def tensorset( - key: AnyStr, - tensor: Union[np.ndarray, list, tuple], - shape: Sequence[int] = None, - dtype: str = None, + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, ) -> Sequence: if np and isinstance(tensor, np.ndarray): dtype, shape, blob = utils.numpy2blob(tensor) @@ -176,10 +180,10 @@ def scriptdel(name: AnyStr) -> Sequence: def scriptrun( - name: AnyStr, - function: AnyStr, - inputs: Union[AnyStr, Sequence[AnyStr]], - outputs: Union[AnyStr, Sequence[AnyStr]], + name: AnyStr, + function: AnyStr, + inputs: Union[AnyStr, Sequence[AnyStr]], + outputs: Union[AnyStr, Sequence[AnyStr]], ) -> Sequence: args = ( "AI.SCRIPTRUN", From f1ed31a8311cfd27aaa41377971dfa349ab3e192 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 12 May 2021 20:19:53 +0300 Subject: [PATCH 07/14] Add formatting transformers --- .deepsource.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.deepsource.toml b/.deepsource.toml index 25bc3d7..6079494 100644 --- a/.deepsource.toml +++ b/.deepsource.toml @@ -6,3 +6,7 @@ enabled = true [analyzers.meta] runtime_version = "3.x.x" + +[[transformers]] + name = "black" + enabled = true \ No newline at end of file From fbe93d2abbd8fe07be6fd267c82e7751ac8b5c2a Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 12 May 2021 20:26:39 +0300 Subject: [PATCH 08/14] Remove transformers, format locally with "black" --- .deepsource.toml | 4 - redisai/client.py | 86 +++++++----- redisai/command_builder.py | 134 ++++++++++++------ redisai/postprocessor.py | 15 +- test/test.py | 271 +++++++++++++++++++++++++------------ 5 files changed, 339 insertions(+), 171 deletions(-) diff --git a/.deepsource.toml b/.deepsource.toml index 6079494..25bc3d7 100644 --- a/.deepsource.toml +++ b/.deepsource.toml @@ -6,7 +6,3 @@ enabled = true [analyzers.meta] runtime_version = "3.x.x" - -[[transformers]] - name = "black" - enabled = true \ No newline at end of file diff --git a/redisai/client.py b/redisai/client.py index a9fdd99..8486bab 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -137,17 +137,19 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.loadbackend(res) - def modelstore(self, - key: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int = None, - minbatch: int = None, - minbatchtimeout: int = None, - tag: AnyStr = None, - inputs: Union[AnyStr, List[AnyStr]] = None, - outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + def modelstore( + self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + minbatchtimeout: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None, + ) -> str: """ Set the model on provided key. @@ -195,22 +197,34 @@ def modelstore(self, ... inputs=['a', 'b'], outputs=['mul'], tag='v1.0') 'OK' """ - args = builder.modelstore(key, backend, device, data, - batch, minbatch, minbatchtimeout, tag, inputs, outputs) + args = builder.modelstore( + key, + backend, + device, + data, + batch, + minbatch, + minbatchtimeout, + tag, + inputs, + outputs, + ) res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelstore(res) - @deprecated(version='1.2.2', reason="Use modelstore instead") - def modelset(self, - key: AnyStr, - backend: str, - device: str, - data: ByteString, - batch: int = None, - minbatch: int = None, - tag: AnyStr = None, - inputs: Union[AnyStr, List[AnyStr]] = None, - outputs: Union[AnyStr, List[AnyStr]] = None) -> str: + @deprecated(version="1.2.2", reason="Use modelstore instead") + def modelset( + self, + key: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int = None, + minbatch: int = None, + tag: AnyStr = None, + inputs: Union[AnyStr, List[AnyStr]] = None, + outputs: Union[AnyStr, List[AnyStr]] = None, + ) -> str: """ Similar to modelstore (this is the deprecated version that will not be supported in future versions). @@ -283,11 +297,13 @@ def modeldel(self, key: AnyStr) -> str: res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modeldel(res) - def modelexecute(self, - key: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]], - timeout: int = None) -> str: + def modelexecute( + self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + timeout: int = None, + ) -> str: """ Run the model using input(s) which are already in the scope and are associated to some keys. Modelexecute also needs the output key name(s) to store the output @@ -329,11 +345,13 @@ def modelexecute(self, res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelexecute(res) - @deprecated(version='1.2.2', reason="Use modelexecute instead") - def modelrun(self, - key: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> str: + @deprecated(version="1.2.2", reason="Use modelexecute instead") + def modelrun( + self, + key: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + ) -> str: """ Similar to modelexecute (this is the deprecated version that will not be supported in future versions). diff --git a/redisai/command_builder.py b/redisai/command_builder.py index f552159..84197a5 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -12,51 +12,74 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence: return "AI.CONFIG LOADBACKEND", identifier, path -def modelstore(name: AnyStr, backend: str, device: str, data: ByteString, - batch: int, minbatch: int, minbatchtimeout: int, tag: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: +def modelstore( + name: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int, + minbatch: int, + minbatchtimeout: int, + tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], +) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") - args = ['AI.MODELSTORE', name, backend, device] + args = ["AI.MODELSTORE", name, backend, device] if tag is not None: - args += ['TAG', tag] + args += ["TAG", tag] if batch is not None: - args += ['BATCHSIZE', batch] + args += ["BATCHSIZE", batch] if minbatch is not None: if batch is None: - raise ValueError( - 'Minbatch is not allowed without batch') - args += ['MINBATCHSIZE', minbatch] + raise ValueError("Minbatch is not allowed without batch") + args += ["MINBATCHSIZE", minbatch] if minbatchtimeout is not None: if minbatch is None: - raise ValueError( - 'Minbatchtimeout is not allowed without minbatch') - args += ['MINBATCHTIMEOUT', minbatchtimeout] + raise ValueError("Minbatchtimeout is not allowed without minbatch") + args += ["MINBATCHTIMEOUT", minbatchtimeout] - if backend.upper() == 'TF': + if backend.upper() == "TF": if not (all((inputs, outputs))): raise ValueError( - 'Require keyword arguments inputs and outputs for TF models') - args += ['INPUTS', len(inputs) if isinstance(inputs, List) else 1, *utils.listify(inputs)] - args += ['OUTPUTS', len(outputs) if isinstance(outputs, List) else 1, *utils.listify(outputs)] + "Require keyword arguments inputs and outputs for TF models" + ) + args += [ + "INPUTS", + len(inputs) if isinstance(inputs, List) else 1, + *utils.listify(inputs), + ] + args += [ + "OUTPUTS", + len(outputs) if isinstance(outputs, List) else 1, + *utils.listify(outputs), + ] elif inputs is not None or outputs is not None: raise ValueError( - 'Inputs and outputs keywords should not be specified for this backend') + "Inputs and outputs keywords should not be specified for this backend" + ) chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable. - data_chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this - args += ['BLOB', *data_chunks] + args += ["BLOB", *data_chunks] return args -def modelset(name: AnyStr, backend: str, device: str, data: ByteString, - batch: int, minbatch: int, tag: AnyStr, - inputs: Union[AnyStr, List[AnyStr]], - outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: +def modelset( + name: AnyStr, + backend: str, + device: str, + data: ByteString, + batch: int, + minbatch: int, + tag: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], +) -> Sequence: if device.upper() not in utils.allowed_devices: raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: @@ -64,14 +87,13 @@ def modelset(name: AnyStr, backend: str, device: str, data: ByteString, args = ["AI.MODELSET", name, backend, device] if tag is not None: - args += ['TAG', tag] + args += ["TAG", tag] if batch is not None: args += ["BATCHSIZE", batch] if minbatch is not None: if batch is None: - raise ValueError( - 'Minbatch is not allowed without batch') - args += ['MINBATCHSIZE', minbatch] + raise ValueError("Minbatch is not allowed without batch") + args += ["MINBATCHSIZE", minbatch] if backend.upper() == "TF": if not (all((inputs, outputs))): @@ -79,7 +101,7 @@ def modelset(name: AnyStr, backend: str, device: str, data: ByteString, args += ["INPUTS", *utils.listify(inputs)] args += ["OUTPUTS", *utils.listify(outputs)] chunk_size = 500 * 1024 * 1024 - data_chunks = [data[i: i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -96,18 +118,40 @@ def modeldel(name: AnyStr) -> Sequence: return "AI.MODELDEL", name -def modelexecute(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], - timeout: int) -> Sequence: - args = ['AI.MODELEXECUTE', name, 'INPUTS', len(utils.listify(inputs)), *utils.listify(inputs), 'OUTPUTS', - len(utils.listify(outputs)), *utils.listify(outputs)] +def modelexecute( + name: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], + timeout: int, +) -> Sequence: + args = [ + "AI.MODELEXECUTE", + name, + "INPUTS", + len(utils.listify(inputs)), + *utils.listify(inputs), + "OUTPUTS", + len(utils.listify(outputs)), + *utils.listify(outputs), + ] if timeout is not None: - args += ['TIMEOUT', timeout] + args += ["TIMEOUT", timeout] return args -def modelrun(name: AnyStr, inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]]) -> Sequence: - args = ('AI.MODELRUN', name, 'INPUTS', *utils.listify(inputs), 'OUTPUTS', - *utils.listify(outputs)) +def modelrun( + name: AnyStr, + inputs: Union[AnyStr, List[AnyStr]], + outputs: Union[AnyStr, List[AnyStr]], +) -> Sequence: + args = ( + "AI.MODELRUN", + name, + "INPUTS", + *utils.listify(inputs), + "OUTPUTS", + *utils.listify(outputs), + ) return args @@ -116,10 +160,10 @@ def modelscan() -> Sequence: def tensorset( - key: AnyStr, - tensor: Union[np.ndarray, list, tuple], - shape: Sequence[int] = None, - dtype: str = None, + key: AnyStr, + tensor: Union[np.ndarray, list, tuple], + shape: Sequence[int] = None, + dtype: str = None, ) -> Sequence: if np and isinstance(tensor, np.ndarray): dtype, shape, blob = utils.numpy2blob(tensor) @@ -180,10 +224,10 @@ def scriptdel(name: AnyStr) -> Sequence: def scriptrun( - name: AnyStr, - function: AnyStr, - inputs: Union[AnyStr, Sequence[AnyStr]], - outputs: Union[AnyStr, Sequence[AnyStr]], + name: AnyStr, + function: AnyStr, + inputs: Union[AnyStr, Sequence[AnyStr]], + outputs: Union[AnyStr, Sequence[AnyStr]], ) -> Sequence: args = ( "AI.SCRIPTRUN", diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index 7bc6119..42bd141 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -61,7 +61,18 @@ def infoget(res): # These functions are only doing decoding on the output from redis decoder = staticmethod(decoder) -decoding_functions = ('loadbackend', 'modelstore', 'modelset', 'modeldel', 'modelexecute', 'modelrun', 'tensorset', - 'scriptset', 'scriptdel', 'scriptrun', 'inforeset') +decoding_functions = ( + "loadbackend", + "modelstore", + "modelset", + "modeldel", + "modelexecute", + "modelrun", + "tensorset", + "scriptset", + "scriptdel", + "scriptrun", + "inforeset", +) for fn in decoding_functions: setattr(Processor, fn, decoder) diff --git a/test/test.py b/test/test.py index b069b53..3ae5f14 100644 --- a/test/test.py +++ b/test/test.py @@ -117,7 +117,7 @@ def test_numpy_tensor(self): # AI.MODELSET is deprecated by AI.MODELSTORE. def test_deprecated_modelset(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, "graph.pb") model_pb = load_model(model_path) con = self.get_client() with self.assertRaises(ValueError): @@ -131,72 +131,158 @@ def test_deprecated_modelset(self): tag="v1.0", ) with self.assertRaises(ValueError): - con.modelset('m', 'wrongbackend', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - con.modelset('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - model = con.modelget('m', meta_only=True) - self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) + con.modelset( + "m", + "wrongbackend", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) + con.modelset( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) + model = con.modelget("m", meta_only=True) + self.assertEqual( + model, + { + "backend": "TF", + "batchsize": 0, + "device": "cpu", + "inputs": ["a", "b"], + "minbatchsize": 0, + "outputs": ["mul"], + "tag": "v1.0", + }, + ) def test_modelstore_errors(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, "graph.pb") model_pb = load_model(model_path) con = self.get_client() with self.assertRaises(ValueError) as e: - con.modelstore('m', 'tf', 'wrongdevice', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelstore( + "m", + "tf", + "wrongdevice", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) self.assertTrue(str(e.exception).startswith("Device not allowed")) with self.assertRaises(ValueError) as e: - con.modelstore('m', 'wrongbackend', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelstore( + "m", + "wrongbackend", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) self.assertTrue(str(e.exception).startswith("Backend not allowed")) with self.assertRaises(ValueError) as e: - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0', minbatch=2) + con.modelstore( + "m", + "tf", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + minbatch=2, + ) self.assertEqual(str(e.exception), "Minbatch is not allowed without batch") with self.assertRaises(ValueError) as e: - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0', batch=4, minbatchtimeout=1000) - self.assertTrue(str(e.exception), "Minbatchtimeout is not allowed without minbatch") + con.modelstore( + "m", + "tf", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + batch=4, + minbatchtimeout=1000, + ) + self.assertTrue( + str(e.exception), "Minbatchtimeout is not allowed without minbatch" + ) with self.assertRaises(ValueError) as e: - con.modelstore('m', 'tf', 'cpu', model_pb, tag='v1.0') - self.assertTrue(str(e.exception), "Require keyword arguments inputs and outputs for TF models") + con.modelstore("m", "tf", "cpu", model_pb, tag="v1.0") + self.assertTrue( + str(e.exception), + "Require keyword arguments inputs and outputs for TF models", + ) with self.assertRaises(ValueError) as e: - con.modelstore('m', 'torch', 'cpu', model_pb, inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - self.assertTrue(str(e.exception), "Inputs and outputs keywords should not be specified for this backend") + con.modelstore( + "m", + "torch", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) + self.assertTrue( + str(e.exception), + "Inputs and outputs keywords should not be specified for this backend", + ) def test_modelget_meta(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - model = con.modelget('m', meta_only=True) - self.assertEqual(model, {'backend': 'TF', 'batchsize': 0, 'device': 'cpu', 'inputs': ['a', 'b'], 'minbatchsize': 0, 'outputs': ['mul'], 'tag': 'v1.0'}) + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) + model = con.modelget("m", meta_only=True) + self.assertEqual( + model, + { + "backend": "TF", + "batchsize": 0, + "device": "cpu", + "inputs": ["a", "b"], + "minbatchsize": 0, + "outputs": ["mul"], + "tag": "v1.0", + }, + ) def test_modelexecute_non_list_input_output(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, "graph.pb") model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.7') - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - ret = con.modelexecute('m', ['a', 'b'], 'out') - self.assertEqual(ret, 'OK') + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.7" + ) + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + ret = con.modelexecute("m", ["a", "b"], "out") + self.assertEqual(ret, "OK") def test_nonasciichar(self): nonascii = "ĉ" model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m' + nonascii, 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - con.tensorset('a' + nonascii, (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelexecute('m' + nonascii, ['a' + nonascii, 'b'], ['c' + nonascii]) - tensor = con.tensorget('c' + nonascii) - self.assertTrue((np.allclose(tensor, [4., 9.]))) + con.modelstore( + "m" + nonascii, + "tf", + "cpu", + model_pb, + inputs=["a", "b"], + outputs=["mul"], + tag="v1.0", + ) + con.tensorset("a" + nonascii, (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelexecute("m" + nonascii, ["a" + nonascii, "b"], ["c" + nonascii]) + tensor = con.tensorget("c" + nonascii) + self.assertTrue((np.allclose(tensor, [4.0, 9.0]))) def test_run_tf_model(self): model_path = os.path.join(MODEL_DIR, tf_graph) @@ -206,23 +292,26 @@ def test_run_tf_model(self): wrong_model_pb = load_model(bad_model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') - con.modeldel('m') - self.assertRaises(ResponseError, con.modelget, 'm') - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs='mul', tag='v1.0') + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) + con.modeldel("m") + self.assertRaises(ResponseError, con.modelget, "m") + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs="mul", tag="v1.0" + ) # wrong model with self.assertRaises(ResponseError) as e: - con.modelstore('m', 'tf', 'cpu', - wrong_model_pb, inputs=['a', 'b'], outputs=['mul']) + con.modelstore( + "m", "tf", "cpu", wrong_model_pb, inputs=["a", "b"], outputs=["mul"] + ) self.assertEqual(str(e.exception), "Invalid GraphDef") - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelexecute('m', ['a', 'b'], ['c']) - tensor = con.tensorget('c') + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelexecute("m", ["a", "b"], ["c"]) + tensor = con.tensorget("c") self.assertTrue(np.allclose([4, 9], tensor)) model_det = con.modelget("m") self.assertTrue(model_det["backend"] == "TF") @@ -259,7 +348,7 @@ def test_run_onnxml_model(self): mlmodel_path = os.path.join(MODEL_DIR, "boston.onnx") onnxml_model = load_model(mlmodel_path) con = self.get_client() - con.modelstore("onnx_model", 'onnx', 'cpu', onnxml_model) + con.modelstore("onnx_model", "onnx", "cpu", onnxml_model) tensor = np.ones((1, 13)).astype(np.float32) con.tensorset("input", tensor) con.modelexecute("onnx_model", ["input"], ["output"]) @@ -272,7 +361,7 @@ def test_run_onnxdl_model(self): dlmodel_path = os.path.join(MODEL_DIR, "findsquare.onnx") onnxdl_model = load_model(dlmodel_path) con = self.get_client() - con.modelstore("onnx_model", 'onnx', 'cpu', onnxdl_model) + con.modelstore("onnx_model", "onnx", "cpu", onnxdl_model) tensor = np.array((2,)).astype(np.float32) con.tensorset("input", tensor) con.modelexecute("onnx_model", ["input"], ["output"]) @@ -283,70 +372,80 @@ def test_run_pytorch_model(self): model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) con = self.get_client() - con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v1.0') - con.tensorset('a', [2, 3, 2, 3], shape=(2, 2), dtype='float') - con.tensorset('b', [2, 3, 2, 3], shape=(2, 2), dtype='float') + con.modelstore("pt_model", "torch", "cpu", ptmodel, tag="v1.0") + con.tensorset("a", [2, 3, 2, 3], shape=(2, 2), dtype="float") + con.tensorset("b", [2, 3, 2, 3], shape=(2, 2), dtype="float") con.modelexecute("pt_model", ["a", "b"], ["output"]) - output = con.tensorget('output', as_numpy=False) - self.assertTrue(np.allclose(output['values'], [4, 6, 4, 6])) + output = con.tensorget("output", as_numpy=False) + self.assertTrue(np.allclose(output["values"], [4, 6, 4, 6])) def test_run_tflite_model(self): model_path = os.path.join(MODEL_DIR, "mnist_model_quant.tflite") tflmodel = load_model(model_path) con = self.get_client() - con.modelstore("tfl_model", 'tflite', 'cpu', tflmodel) + con.modelstore("tfl_model", "tflite", "cpu", tflmodel) img = np.random.random((1, 1, 28, 28)).astype(np.float) - con.tensorset('img', img) + con.tensorset("img", img) con.modelexecute("tfl_model", ["img"], ["output1", "output2"]) - output = con.tensorget('output1') + output = con.tensorget("output1") self.assertTrue(np.allclose(output, [8])) # AI.MODELRUN is deprecated by AI.MODELEXECUTE def test_deprecated_modelrun(self): - model_path = os.path.join(MODEL_DIR, 'graph.pb') + model_path = os.path.join(MODEL_DIR, "graph.pb") model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.0') + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.0" + ) - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelrun('m', ['a', 'b'], ['c']) - tensor = con.tensorget('c') + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelrun("m", ["a", "b"], ["c"]) + tensor = con.tensorget("c") self.assertTrue(np.allclose([4, 9], tensor)) def test_info(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul']) - first_info = con.infoget('m') - expected = {'key': 'm', 'type': 'MODEL', 'backend': 'TF', 'device': 'cpu', - 'tag': '', 'duration': 0, 'samples': 0, 'calls': 0, 'errors': 0} + con.modelstore("m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"]) + first_info = con.infoget("m") + expected = { + "key": "m", + "type": "MODEL", + "backend": "TF", + "device": "cpu", + "tag": "", + "duration": 0, + "samples": 0, + "calls": 0, + "errors": 0, + } self.assertEqual(first_info, expected) - con.tensorset('a', (2, 3), dtype='float') - con.tensorset('b', (2, 3), dtype='float') - con.modelexecute('m', ['a', 'b'], ['c']) - con.modelexecute('m', ['a', 'b'], ['c']) - second_info = con.infoget('m') - self.assertEqual(second_info['calls'], 2) # 2 model runs - con.inforeset('m') - third_info = con.infoget('m') + con.tensorset("a", (2, 3), dtype="float") + con.tensorset("b", (2, 3), dtype="float") + con.modelexecute("m", ["a", "b"], ["c"]) + con.modelexecute("m", ["a", "b"], ["c"]) + second_info = con.infoget("m") + self.assertEqual(second_info["calls"], 2) # 2 model runs + con.inforeset("m") + third_info = con.infoget("m") self.assertEqual(first_info, third_info) # before modelrun and after reset def test_model_scan(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelstore('m', 'tf', 'cpu', model_pb, - inputs=['a', 'b'], outputs=['mul'], tag='v1.2') - model_path = os.path.join(MODEL_DIR, 'pt-minimal.pt') + con.modelstore( + "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"], tag="v1.2" + ) + model_path = os.path.join(MODEL_DIR, "pt-minimal.pt") ptmodel = load_model(model_path) con = self.get_client() # TODO: RedisAI modelscan issue - con.modelstore("pt_model", 'torch', 'cpu', ptmodel) + con.modelstore("pt_model", "torch", "cpu", ptmodel) mlist = con.modelscan() self.assertEqual(mlist, [["pt_model", ""], ["m", "v1.2"]]) @@ -370,7 +469,7 @@ def setUp(self): con = self.get_client() model_path = os.path.join(MODEL_DIR, torch_graph) ptmodel = load_model(model_path) - con.modelstore("pt_model", 'torch', 'cpu', ptmodel, tag='v7.0') + con.modelstore("pt_model", "torch", "cpu", ptmodel, tag="v7.0") def test_dagrun_with_load(self): con = self.get_client() From f98dfdb06badc2348a9c37458a82d5d22889abe2 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 12 May 2021 20:32:23 +0300 Subject: [PATCH 09/14] More formatting locally --- redisai/client.py | 7 +++---- redisai/command_builder.py | 27 +++++++++++++++---------- redisai/dag.py | 5 ++--- redisai/pipeline.py | 5 ++--- redisai/postprocessor.py | 3 ++- redisai/utils.py | 7 ++++--- setup.py | 3 +-- test/test.py | 40 +++++++++++++++++++++++++------------- 8 files changed, 57 insertions(+), 40 deletions(-) diff --git a/redisai/client.py b/redisai/client.py index 8486bab..c74edf9 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -1,17 +1,16 @@ -from functools import wraps, partial -from typing import Union, AnyStr, ByteString, List, Sequence, Any import warnings +from functools import partial, wraps +from typing import Any, AnyStr, ByteString, List, Sequence, Union -from redis import StrictRedis import numpy as np from deprecated import deprecated +from redis import StrictRedis from redisai import command_builder as builder from redisai.dag import Dag from redisai.pipeline import Pipeline from redisai.postprocessor import Processor - processor = Processor() diff --git a/redisai/command_builder.py b/redisai/command_builder.py index 84197a5..b0a0a2d 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -1,10 +1,9 @@ -from typing import Union, AnyStr, ByteString, List, Sequence +from typing import AnyStr, ByteString, List, Sequence, Union import numpy as np from . import utils - # TODO: mypy check @@ -25,9 +24,11 @@ def modelstore( outputs: Union[AnyStr, List[AnyStr]], ) -> Sequence: if device.upper() not in utils.allowed_devices: - raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError( + f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: - raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") + raise ValueError( + f"Backend not allowed. Use any from {utils.allowed_backends}") args = ["AI.MODELSTORE", name, backend, device] if tag is not None: @@ -63,7 +64,8 @@ def modelstore( "Inputs and outputs keywords should not be specified for this backend" ) chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable. - data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i: i + chunk_size] + for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -81,9 +83,11 @@ def modelset( outputs: Union[AnyStr, List[AnyStr]], ) -> Sequence: if device.upper() not in utils.allowed_devices: - raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError( + f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: - raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") + raise ValueError( + f"Backend not allowed. Use any from {utils.allowed_backends}") args = ["AI.MODELSET", name, backend, device] if tag is not None: @@ -97,11 +101,13 @@ def modelset( if backend.upper() == "TF": if not (all((inputs, outputs))): - raise ValueError("Require keyword arguments input and output for TF models") + raise ValueError( + "Require keyword arguments input and output for TF models") args += ["INPUTS", *utils.listify(inputs)] args += ["OUTPUTS", *utils.listify(outputs)] chunk_size = 500 * 1024 * 1024 - data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] + data_chunks = [data[i: i + chunk_size] + for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -203,7 +209,8 @@ def tensorget(key: AnyStr, as_numpy: bool = True, meta_only: bool = False) -> Se def scriptset(name: AnyStr, device: str, script: str, tag: AnyStr = None) -> Sequence: if device.upper() not in utils.allowed_devices: - raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError( + f"Device not allowed. Use any from {utils.allowed_devices}") args = ["AI.SCRIPTSET", name, device] if tag: args += ["TAG", tag] diff --git a/redisai/dag.py b/redisai/dag.py index 1b60529..eb2e1ad 100644 --- a/redisai/dag.py +++ b/redisai/dag.py @@ -1,11 +1,10 @@ from functools import partial -from typing import AnyStr, Union, Sequence, Any, List +from typing import Any, AnyStr, List, Sequence, Union import numpy as np -from redisai.postprocessor import Processor from redisai import command_builder as builder - +from redisai.postprocessor import Processor processor = Processor() diff --git a/redisai/pipeline.py b/redisai/pipeline.py index 447f528..f6a8255 100644 --- a/redisai/pipeline.py +++ b/redisai/pipeline.py @@ -1,14 +1,13 @@ import warnings from functools import partial -from typing import AnyStr, Union, Sequence +from typing import AnyStr, Sequence, Union import numpy as np +import redis from redisai import command_builder as builder -import redis from redisai.postprocessor import Processor - processor = Processor() diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index 42bd141..6ef9699 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -42,7 +42,8 @@ def tensorget(res, as_numpy, as_numpy_mutable, meta_only): mutable=False, ) else: - target = float if rai_result["dtype"] in ("FLOAT", "DOUBLE") else int + target = float if rai_result["dtype"] in ( + "FLOAT", "DOUBLE") else int utils.recursive_bytetransform(rai_result["values"], target) return rai_result diff --git a/redisai/utils.py b/redisai/utils.py index 3723bc5..093c7fc 100644 --- a/redisai/utils.py +++ b/redisai/utils.py @@ -1,6 +1,6 @@ -from typing import Union, ByteString, Sequence, List, AnyStr, Callable -import numpy as np +from typing import AnyStr, ByteString, Callable, List, Sequence, Union +import numpy as np dtype_dict = { "float": "FLOAT", @@ -26,7 +26,8 @@ def numpy2blob(tensor: np.ndarray) -> tuple: try: dtype = dtype_dict[str(tensor.dtype)] except KeyError: - raise TypeError(f"RedisAI doesn't support tensors of type {tensor.dtype}") + raise TypeError( + f"RedisAI doesn't support tensors of type {tensor.dtype}") shape = tensor.shape blob = bytes(tensor.data) return dtype, shape, blob diff --git a/setup.py b/setup.py index 6b3dfd3..c1d1f40 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -from setuptools import setup, find_packages - +from setuptools import find_packages, setup with open("README.rst") as f: long_description = f.read() diff --git a/test/test.py b/test/test.py index 3ae5f14..581429c 100644 --- a/test/test.py +++ b/test/test.py @@ -1,12 +1,13 @@ -from io import StringIO +import os.path import sys +from io import StringIO from unittest import TestCase + import numpy as np -import os.path -from redisai import Client from ml2rt import load_model from redis.exceptions import ResponseError +from redisai import Client DEBUG = False tf_graph = "graph.pb" @@ -194,7 +195,8 @@ def test_modelstore_errors(self): tag="v1.0", minbatch=2, ) - self.assertEqual(str(e.exception), "Minbatch is not allowed without batch") + self.assertEqual(str(e.exception), + "Minbatch is not allowed without batch") with self.assertRaises(ValueError) as e: con.modelstore( "m", @@ -280,7 +282,8 @@ def test_nonasciichar(self): ) con.tensorset("a" + nonascii, (2, 3), dtype="float") con.tensorset("b", (2, 3), dtype="float") - con.modelexecute("m" + nonascii, ["a" + nonascii, "b"], ["c" + nonascii]) + con.modelexecute( + "m" + nonascii, ["a" + nonascii, "b"], ["c" + nonascii]) tensor = con.tensorget("c" + nonascii) self.assertTrue((np.allclose(tensor, [4.0, 9.0]))) @@ -324,7 +327,8 @@ def test_run_tf_model(self): def test_scripts(self): con = self.get_client() - self.assertRaises(ResponseError, con.scriptset, "ket", "cpu", "return 1") + self.assertRaises(ResponseError, con.scriptset, + "ket", "cpu", "return 1") con.scriptset("ket", "cpu", script) con.tensorset("a", (2, 3), dtype="float") con.tensorset("b", (2, 3), dtype="float") @@ -410,7 +414,8 @@ def test_info(self): model_path = os.path.join(MODEL_DIR, tf_graph) model_pb = load_model(model_path) con = self.get_client() - con.modelstore("m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs=["mul"]) + con.modelstore("m", "tf", "cpu", model_pb, + inputs=["a", "b"], outputs=["mul"]) first_info = con.infoget("m") expected = { "key": "m", @@ -432,7 +437,8 @@ def test_info(self): self.assertEqual(second_info["calls"], 2) # 2 model runs con.inforeset("m") third_info = con.infoget("m") - self.assertEqual(first_info, third_info) # before modelrun and after reset + # before modelrun and after reset + self.assertEqual(first_info, third_info) def test_model_scan(self): model_path = os.path.join(MODEL_DIR, tf_graph) @@ -480,7 +486,8 @@ def test_dagrun_with_load(self): dag.modelrun("pt_model", ["a", "b"], ["output"]) dag.tensorget("output") result = dag.run() - expected = ["OK", "OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] + expected = ["OK", "OK", np.array( + [[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) self.assertRaises(ResponseError, con.tensorget, "b") @@ -512,7 +519,8 @@ def test_dagrun_calling_on_return(self): .tensorget("output") .run() ) - expected = ["OK", "OK", np.array([[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] + expected = ["OK", "OK", np.array( + [[4.0, 6.0], [4.0, 6.0]], dtype=np.float32)] self.assertTrue(np.allclose(expected.pop(), result.pop())) self.assertEqual(expected, result) @@ -574,12 +582,14 @@ def test_pipeline_non_transaction(self): pipe = con.pipeline(transaction=False) pipe = pipe.tensorset("a", arr).set("native", 1) pipe = pipe.tensorget("a", as_numpy=False) - pipe = pipe.tensorget("a", as_numpy=True).tensorget("a", meta_only=True) + pipe = pipe.tensorget("a", as_numpy=True).tensorget( + "a", meta_only=True) result = pipe.execute() expected = [ b"OK", True, - {"dtype": "FLOAT", "shape": [2, 2], "values": [2.0, 3.0, 2.0, 3.0]}, + {"dtype": "FLOAT", "shape": [2, 2], + "values": [2.0, 3.0, 2.0, 3.0]}, arr, {"dtype": "FLOAT", "shape": [2, 2]}, ] @@ -595,12 +605,14 @@ def test_pipeline_transaction(self): pipe = con.pipeline(transaction=True) pipe = pipe.tensorset("a", arr).set("native", 1) pipe = pipe.tensorget("a", as_numpy=False) - pipe = pipe.tensorget("a", as_numpy=True).tensorget("a", meta_only=True) + pipe = pipe.tensorget("a", as_numpy=True).tensorget( + "a", meta_only=True) result = pipe.execute() expected = [ b"OK", True, - {"dtype": "FLOAT", "shape": [2, 2], "values": [2.0, 3.0, 2.0, 3.0]}, + {"dtype": "FLOAT", "shape": [2, 2], + "values": [2.0, 3.0, 2.0, 3.0]}, arr, {"dtype": "FLOAT", "shape": [2, 2]}, ] From 5448fc3c4333ba3bca1e2d6c8eecd054bc52b1d1 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 12 May 2021 20:45:24 +0300 Subject: [PATCH 10/14] More formatting locally (to long lines) --- redisai/client.py | 18 +++++++++++------- redisai/command_builder.py | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/redisai/client.py b/redisai/client.py index c74edf9..24e3b6c 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -168,8 +168,9 @@ def modelstore( minbatch : int Minimum number of samples required in a batch for model execution minbatchtimeout : int - The max number of miliseconds for which the engine will not trigger an execution if the number of samples - is lower than minbatch (after minbatchtimeout is passed, the execution will start even if minbatch jas not reached) + The max number of miliseconds for which the engine will not trigger an execution + if the number of samples is lower than minbatch (after minbatchtimeout is passed, + the execution will start even if minbatch jas not reached) tag : AnyStr Any string that will be saved in RedisAI as tag for the model inputs : Union[AnyStr, List[AnyStr]] @@ -225,7 +226,8 @@ def modelset( outputs: Union[AnyStr, List[AnyStr]] = None, ) -> str: """ - Similar to modelstore (this is the deprecated version that will not be supported in future versions). + Similar to modelstore (this is the deprecated version that will not be + supported in future versions). Example ------- @@ -317,11 +319,12 @@ def modelexecute( Tensor(s) which is already saved in the RedisAI using a tensorset call. These tensors will be used as the inputs for the modelexecute outputs : Union[AnyStr, List[AnyStr]] - keys on which the outputs to be saved. If those keys exist already, modelexecute - will overwrite them with new values + keys on which the outputs to be saved. If those keys exist already, + modelexecute will overwrite them with new values timeout : int The max number on milisecinds that may pass before the request is prossced - (meaning that the result will not be computed after that time and TIMEDOUT is returned in that case + (meaning that the result will not be computed after that time and TIMEDOUT + is returned in that case Returns ------- @@ -352,7 +355,8 @@ def modelrun( outputs: Union[AnyStr, List[AnyStr]], ) -> str: """ - Similar to modelexecute (this is the deprecated version that will not be supported in future versions). + Similar to modelexecute (this is the deprecated version that will not be + supported in future versions). Example ------- diff --git a/redisai/command_builder.py b/redisai/command_builder.py index b0a0a2d..6811f4a 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -45,7 +45,7 @@ def modelstore( args += ["MINBATCHTIMEOUT", minbatchtimeout] if backend.upper() == "TF": - if not (all((inputs, outputs))): + if not all((inputs, outputs)): raise ValueError( "Require keyword arguments inputs and outputs for TF models" ) From e91fce51b6b6b90e6bee008ea135a9cef21a78bc Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 19 May 2021 14:21:02 +0300 Subject: [PATCH 11/14] PR fixes, among them: bring back the documentation for deprecated modelset and modelrun. Validate that all required arguments appear in the new commands (+test it) --- redisai/client.py | 56 ++++++++++++++++++++++++++++++++++---- redisai/command_builder.py | 28 ++++++++----------- redisai/postprocessor.py | 3 +- redisai/utils.py | 3 +- setup.py | 4 +-- test/test.py | 21 ++++++++++++++ 6 files changed, 87 insertions(+), 28 deletions(-) diff --git a/redisai/client.py b/redisai/client.py index 24e3b6c..5458e4c 100644 --- a/redisai/client.py +++ b/redisai/client.py @@ -212,7 +212,7 @@ def modelstore( res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelstore(res) - @deprecated(version="1.2.2", reason="Use modelstore instead") + @deprecated(version="1.2.0", reason="Use modelstore instead") def modelset( self, key: AnyStr, @@ -226,8 +226,34 @@ def modelset( outputs: Union[AnyStr, List[AnyStr]] = None, ) -> str: """ - Similar to modelstore (this is the deprecated version that will not be - supported in future versions). + Set the model on provided key. + + Parameters + ---------- + key : AnyStr + Key name + backend : str + Backend name. Allowed backends are TF, TORCH, TFLITE, ONNX + device : str + Device name. Allowed devices are CPU and GPU. If multiple GPUs are available, + it can be specified using the format GPU:. For example: GPU:0 + data : bytes + Model graph read as bytes string + batch : int + Number of batches for doing auto-batching + minbatch : int + Minimum number of samples required in a batch for model execution + tag : AnyStr + Any string that will be saved in RedisAI as tag for the model + inputs : Union[AnyStr, List[AnyStr]] + Input node(s) in the graph. Required only Tensorflow graphs + outputs : Union[AnyStr, List[AnyStr]] + Output node(s) in the graph Required only for Tensorflow graphs + + Returns + ------- + str + 'OK' if success, raise an exception otherwise Example ------- @@ -347,7 +373,7 @@ def modelexecute( res = self.execute_command(*args) return res if not self.enable_postprocess else processor.modelexecute(res) - @deprecated(version="1.2.2", reason="Use modelexecute instead") + @deprecated(version="1.2.0", reason="Use modelexecute instead") def modelrun( self, key: AnyStr, @@ -355,8 +381,26 @@ def modelrun( outputs: Union[AnyStr, List[AnyStr]], ) -> str: """ - Similar to modelexecute (this is the deprecated version that will not be - supported in future versions). + Run the model using input(s) which are already in the scope and are associated + to some keys. Modelrun also needs the output key name(s) to store the output + from the model. The number of outputs from the model and the number of keys + provided here must be same. Otherwise, RedisAI throws an error + + Parameters + ---------- + key : str + Model key to run + inputs : Union[AnyStr, List[AnyStr]] + Tensor(s) which is already saved in the RedisAI using a tensorset call. These + tensors will be used as the input for the modelrun + outputs : Union[AnyStr, List[AnyStr]] + keys on which the outputs to be saved. If those keys exist already, modelrun + will overwrite them with new values + + Returns + ------- + str + 'OK' if success, raise an exception otherwise Example ------- diff --git a/redisai/command_builder.py b/redisai/command_builder.py index 6811f4a..3f1c151 100644 --- a/redisai/command_builder.py +++ b/redisai/command_builder.py @@ -23,12 +23,12 @@ def modelstore( inputs: Union[AnyStr, List[AnyStr]], outputs: Union[AnyStr, List[AnyStr]], ) -> Sequence: + if name is None: + raise ValueError("Model name was not given") if device.upper() not in utils.allowed_devices: - raise ValueError( - f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: - raise ValueError( - f"Backend not allowed. Use any from {utils.allowed_backends}") + raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") args = ["AI.MODELSTORE", name, backend, device] if tag is not None: @@ -64,8 +64,7 @@ def modelstore( "Inputs and outputs keywords should not be specified for this backend" ) chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable. - data_chunks = [data[i: i + chunk_size] - for i in range(0, len(data), chunk_size)] + data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -83,11 +82,9 @@ def modelset( outputs: Union[AnyStr, List[AnyStr]], ) -> Sequence: if device.upper() not in utils.allowed_devices: - raise ValueError( - f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") if backend.upper() not in utils.allowed_backends: - raise ValueError( - f"Backend not allowed. Use any from {utils.allowed_backends}") + raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}") args = ["AI.MODELSET", name, backend, device] if tag is not None: @@ -101,13 +98,11 @@ def modelset( if backend.upper() == "TF": if not (all((inputs, outputs))): - raise ValueError( - "Require keyword arguments input and output for TF models") + raise ValueError("Require keyword arguments input and output for TF models") args += ["INPUTS", *utils.listify(inputs)] args += ["OUTPUTS", *utils.listify(outputs)] chunk_size = 500 * 1024 * 1024 - data_chunks = [data[i: i + chunk_size] - for i in range(0, len(data), chunk_size)] + data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)] # TODO: need a test case for this args += ["BLOB", *data_chunks] return args @@ -130,6 +125,8 @@ def modelexecute( outputs: Union[AnyStr, List[AnyStr]], timeout: int, ) -> Sequence: + if name is None or inputs is None or outputs is None: + raise ValueError("Missing required arguments for model execute command") args = [ "AI.MODELEXECUTE", name, @@ -209,8 +206,7 @@ def tensorget(key: AnyStr, as_numpy: bool = True, meta_only: bool = False) -> Se def scriptset(name: AnyStr, device: str, script: str, tag: AnyStr = None) -> Sequence: if device.upper() not in utils.allowed_devices: - raise ValueError( - f"Device not allowed. Use any from {utils.allowed_devices}") + raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}") args = ["AI.SCRIPTSET", name, device] if tag: args += ["TAG", tag] diff --git a/redisai/postprocessor.py b/redisai/postprocessor.py index 6ef9699..42bd141 100644 --- a/redisai/postprocessor.py +++ b/redisai/postprocessor.py @@ -42,8 +42,7 @@ def tensorget(res, as_numpy, as_numpy_mutable, meta_only): mutable=False, ) else: - target = float if rai_result["dtype"] in ( - "FLOAT", "DOUBLE") else int + target = float if rai_result["dtype"] in ("FLOAT", "DOUBLE") else int utils.recursive_bytetransform(rai_result["values"], target) return rai_result diff --git a/redisai/utils.py b/redisai/utils.py index 093c7fc..ca8007f 100644 --- a/redisai/utils.py +++ b/redisai/utils.py @@ -26,8 +26,7 @@ def numpy2blob(tensor: np.ndarray) -> tuple: try: dtype = dtype_dict[str(tensor.dtype)] except KeyError: - raise TypeError( - f"RedisAI doesn't support tensors of type {tensor.dtype}") + raise TypeError(f"RedisAI doesn't support tensors of type {tensor.dtype}") shape = tensor.shape blob = bytes(tensor.data) return dtype, shape, blob diff --git a/setup.py b/setup.py index c1d1f40..eefac3f 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ setup( name="redisai", - version="1.0.2", + version="1.2.0", description="RedisAI Python Client", long_description=long_description, long_description_content_type="text/x-rst", @@ -14,7 +14,7 @@ author="RedisLabs", author_email="oss@redislabs.com", packages=find_packages(), - install_requires=["redis", "hiredis", "numpy"], + install_requires=["redis", "hiredis", "numpy", "deprecated"], python_requires=">=3.6", classifiers=[ "Development Status :: 4 - Beta", diff --git a/test/test.py b/test/test.py index 581429c..d7062ad 100644 --- a/test/test.py +++ b/test/test.py @@ -162,6 +162,18 @@ def test_modelstore_errors(self): model_path = os.path.join(MODEL_DIR, "graph.pb") model_pb = load_model(model_path) con = self.get_client() + + with self.assertRaises(ValueError) as e: + con.modelstore( + None, + "TF", + "CPU", + model_pb, + inputs=["a", "b"], + outputs=["mul"] + ) + self.assertEqual(str(e.exception), "Model name was not given") + with self.assertRaises(ValueError) as e: con.modelstore( "m", @@ -304,6 +316,15 @@ def test_run_tf_model(self): "m", "tf", "cpu", model_pb, inputs=["a", "b"], outputs="mul", tag="v1.0" ) + # Required arguments ar None + with self.assertRaises(ValueError) as e: + con.modelexecute( + "m", + inputs=None, + outputs=None + ) + self.assertEqual(str(e.exception), "Missing required arguments for model execute command") + # wrong model with self.assertRaises(ResponseError) as e: con.modelstore( From 4ca3dfd3a98cbf9a8bb0301976a4637a12864193 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 19 May 2021 14:31:15 +0300 Subject: [PATCH 12/14] Make fixes in tests to suit the change in "AI.modelget" --- test/test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/test.py b/test/test.py index d7062ad..48e0036 100644 --- a/test/test.py +++ b/test/test.py @@ -153,6 +153,7 @@ def test_deprecated_modelset(self): "device": "cpu", "inputs": ["a", "b"], "minbatchsize": 0, + "minbatchtimeout": 0 "outputs": ["mul"], "tag": "v1.0", }, @@ -261,6 +262,7 @@ def test_modelget_meta(self): "device": "cpu", "inputs": ["a", "b"], "minbatchsize": 0, + "minbatchtimeout": 0, "outputs": ["mul"], "tag": "v1.0", }, From 034ced1533501f52a24b838ce88486ceb1aeaeda Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 19 May 2021 14:33:26 +0300 Subject: [PATCH 13/14] Add missing comma... --- test/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test.py b/test/test.py index 48e0036..506c1ce 100644 --- a/test/test.py +++ b/test/test.py @@ -153,7 +153,7 @@ def test_deprecated_modelset(self): "device": "cpu", "inputs": ["a", "b"], "minbatchsize": 0, - "minbatchtimeout": 0 + "minbatchtimeout": 0, "outputs": ["mul"], "tag": "v1.0", }, From 40e50a88f4feffc57ddfc682b6ee1bdca3a03df3 Mon Sep 17 00:00:00 2001 From: alonre24 Date: Wed, 19 May 2021 15:53:03 +0300 Subject: [PATCH 14/14] DO not update version yet --- setup.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index eefac3f..f5de6bb 100644 --- a/setup.py +++ b/setup.py @@ -1,12 +1,13 @@ #!/usr/bin/env python -from setuptools import find_packages, setup +from setuptools import setup, find_packages + with open("README.rst") as f: long_description = f.read() setup( name="redisai", - version="1.2.0", + version="1.0.2", description="RedisAI Python Client", long_description=long_description, long_description_content_type="text/x-rst",