Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 133 additions & 6 deletions redisai/client.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from functools import wraps, partial
from typing import Union, AnyStr, ByteString, List, Sequence, Any
import warnings
from functools import partial, wraps
from typing import Any, AnyStr, ByteString, List, Sequence, Union

from redis import StrictRedis
import numpy as np
from deprecated import deprecated
from redis import StrictRedis

from redisai import command_builder as builder
from redisai.dag import Dag
from redisai.pipeline import Pipeline
from redisai.postprocessor import Processor


processor = Processor()


Expand Down Expand Up @@ -96,7 +96,7 @@ def dag(
-------
>>> con.tensorset('tensor', ...)
'OK'
>>> con.modelset('model', ...)
>>> con.modelstore('model', ...)
'OK'
>>> dag = con.dag(load=['tensor'], persist=['output'])
>>> dag.tensorset('another', ...)
Expand Down Expand Up @@ -136,6 +136,83 @@ def loadbackend(self, identifier: AnyStr, path: AnyStr) -> str:
res = self.execute_command(*args)
return res if not self.enable_postprocess else processor.loadbackend(res)

def modelstore(
self,
key: AnyStr,
backend: str,
device: str,
data: ByteString,
batch: int = None,
minbatch: int = None,
minbatchtimeout: int = None,
tag: AnyStr = None,
inputs: Union[AnyStr, List[AnyStr]] = None,
outputs: Union[AnyStr, List[AnyStr]] = None,
) -> str:
"""
Set the model on provided key.

Parameters
----------
key : AnyStr
Key name
backend : str
Backend name. Allowed backends are TF, TORCH, TFLITE, ONNX
device : str
Device name. Allowed devices are CPU and GPU. If multiple GPUs are available,
it can be specified using the format GPU:<gpu number>. For example: GPU:0
data : bytes
Model graph read as bytes string
batch : int
Number of batches for doing auto-batching
minbatch : int
Minimum number of samples required in a batch for model execution
minbatchtimeout : int
The max number of miliseconds for which the engine will not trigger an execution
if the number of samples is lower than minbatch (after minbatchtimeout is passed,
the execution will start even if minbatch jas not reached)
tag : AnyStr
Any string that will be saved in RedisAI as tag for the model
inputs : Union[AnyStr, List[AnyStr]]
Input node(s) in the graph. Required only Tensorflow graphs
outputs : Union[AnyStr, List[AnyStr]]
Output node(s) in the graph Required only for Tensorflow graphs

Returns
-------
str
'OK' if success, raise an exception otherwise

Example
-------
>>> # Torch model
>>> model_path = os.path.join('path/to/TorchScriptModel.pt')
>>> model = open(model_path, 'rb').read()
>>> con.modeltore("model", 'torch', 'cpu', model, tag='v1.0')
'OK'
>>> # Tensorflow model
>>> model_path = os.path.join('/path/to/tf_frozen_graph.pb')
>>> model = open(model_path, 'rb').read()
>>> con.modelstore('m', 'tf', 'cpu', model,
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
'OK'
"""
args = builder.modelstore(
key,
backend,
device,
data,
batch,
minbatch,
minbatchtimeout,
tag,
inputs,
outputs,
)
res = self.execute_command(*args)
return res if not self.enable_postprocess else processor.modelstore(res)

@deprecated(version="1.2.0", reason="Use modelstore instead")
def modelset(
self,
key: AnyStr,
Expand Down Expand Up @@ -247,6 +324,56 @@ def modeldel(self, key: AnyStr) -> str:
res = self.execute_command(*args)
return res if not self.enable_postprocess else processor.modeldel(res)

def modelexecute(
self,
key: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
timeout: int = None,
) -> str:
"""
Run the model using input(s) which are already in the scope and are associated
to some keys. Modelexecute also needs the output key name(s) to store the output
from the model. The number of outputs from the model and the number of keys
provided here must be same. Otherwise, RedisAI throws an error

Parameters
----------
key : str
Model key to run
inputs : Union[AnyStr, List[AnyStr]]
Tensor(s) which is already saved in the RedisAI using a tensorset call. These
tensors will be used as the inputs for the modelexecute
outputs : Union[AnyStr, List[AnyStr]]
keys on which the outputs to be saved. If those keys exist already,
modelexecute will overwrite them with new values
timeout : int
The max number on milisecinds that may pass before the request is prossced
(meaning that the result will not be computed after that time and TIMEDOUT
is returned in that case

Returns
-------
str
'OK' if success, raise an exception otherwise

Example
-------
>>> con.modelstore('m', 'tf', 'cpu', model_pb,
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
'OK'
>>> con.tensorset('a', (2, 3), dtype='float')
'OK'
>>> con.tensorset('b', (2, 3), dtype='float')
'OK'
>>> con.modelexecute('m', ['a', 'b'], ['c'])
'OK'
"""
args = builder.modelexecute(key, inputs, outputs, timeout)
res = self.execute_command(*args)
return res if not self.enable_postprocess else processor.modelexecute(res)

@deprecated(version="1.2.0", reason="Use modelexecute instead")
def modelrun(
self,
key: AnyStr,
Expand Down Expand Up @@ -277,7 +404,7 @@ def modelrun(

Example
-------
>>> con.modelset('m', 'tf', 'cpu', model_pb,
>>> con.modelstore('m', 'tf', 'cpu', model_pb,
... inputs=['a', 'b'], outputs=['mul'], tag='v1.0')
'OK'
>>> con.tensorset('a', (2, 3), dtype='float')
Expand Down
98 changes: 94 additions & 4 deletions redisai/command_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Union, AnyStr, ByteString, List, Sequence
from typing import AnyStr, ByteString, List, Sequence, Union

import numpy as np

from . import utils

# TODO: mypy check
Expand All @@ -9,29 +11,90 @@ def loadbackend(identifier: AnyStr, path: AnyStr) -> Sequence:
return "AI.CONFIG LOADBACKEND", identifier, path


def modelset(
def modelstore(
name: AnyStr,
backend: str,
device: str,
data: ByteString,
batch: int,
minbatch: int,
minbatchtimeout: int,
tag: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
) -> Sequence:
if name is None:
raise ValueError("Model name was not given")
if device.upper() not in utils.allowed_devices:
raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}")
if backend.upper() not in utils.allowed_backends:
raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}")
args = ["AI.MODELSET", name, backend, device]
args = ["AI.MODELSTORE", name, backend, device]

if tag is not None:
args += ["TAG", tag]
if batch is not None:
args += ["BATCHSIZE", batch]
if minbatch is not None:
if batch is None:
raise ValueError("Minbatch is not allowed without batch")
args += ["MINBATCHSIZE", minbatch]
if minbatchtimeout is not None:
if minbatch is None:
raise ValueError("Minbatchtimeout is not allowed without minbatch")
args += ["MINBATCHTIMEOUT", minbatchtimeout]

if backend.upper() == "TF":
if not all((inputs, outputs)):
raise ValueError(
"Require keyword arguments inputs and outputs for TF models"
)
args += [
"INPUTS",
len(inputs) if isinstance(inputs, List) else 1,
*utils.listify(inputs),
]
args += [
"OUTPUTS",
len(outputs) if isinstance(outputs, List) else 1,
*utils.listify(outputs),
]
elif inputs is not None or outputs is not None:
raise ValueError(
"Inputs and outputs keywords should not be specified for this backend"
)
chunk_size = 500 * 1024 * 1024 # TODO: this should be configurable.
data_chunks = [data[i : i + chunk_size] for i in range(0, len(data), chunk_size)]
# TODO: need a test case for this
args += ["BLOB", *data_chunks]
return args


def modelset(
name: AnyStr,
backend: str,
device: str,
data: ByteString,
batch: int,
minbatch: int,
tag: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
) -> Sequence:
if device.upper() not in utils.allowed_devices:
raise ValueError(f"Device not allowed. Use any from {utils.allowed_devices}")
if backend.upper() not in utils.allowed_backends:
raise ValueError(f"Backend not allowed. Use any from {utils.allowed_backends}")
args = ["AI.MODELSET", name, backend, device]

if tag is not None:
args += ["TAG", tag]
if batch is not None:
args += ["BATCHSIZE", batch]
if minbatch is not None:
if batch is None:
raise ValueError("Minbatch is not allowed without batch")
args += ["MINBATCHSIZE", minbatch]

if backend.upper() == "TF":
if not (all((inputs, outputs))):
Expand All @@ -56,7 +119,34 @@ def modeldel(name: AnyStr) -> Sequence:
return "AI.MODELDEL", name


def modelrun(name: AnyStr, inputs: List[AnyStr], outputs: List[AnyStr]) -> Sequence:
def modelexecute(
name: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
timeout: int,
) -> Sequence:
if name is None or inputs is None or outputs is None:
raise ValueError("Missing required arguments for model execute command")
args = [
"AI.MODELEXECUTE",
name,
"INPUTS",
len(utils.listify(inputs)),
*utils.listify(inputs),
"OUTPUTS",
len(utils.listify(outputs)),
*utils.listify(outputs),
]
if timeout is not None:
args += ["TIMEOUT", timeout]
return args


def modelrun(
name: AnyStr,
inputs: Union[AnyStr, List[AnyStr]],
outputs: Union[AnyStr, List[AnyStr]],
) -> Sequence:
args = (
"AI.MODELRUN",
name,
Expand Down
5 changes: 2 additions & 3 deletions redisai/dag.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from functools import partial
from typing import AnyStr, Union, Sequence, Any, List
from typing import Any, AnyStr, List, Sequence, Union

import numpy as np

from redisai.postprocessor import Processor
from redisai import command_builder as builder

from redisai.postprocessor import Processor

processor = Processor()

Expand Down
5 changes: 2 additions & 3 deletions redisai/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import warnings
from functools import partial
from typing import AnyStr, Union, Sequence
from typing import AnyStr, Sequence, Union

import numpy as np
import redis

from redisai import command_builder as builder
import redis
from redisai.postprocessor import Processor


processor = Processor()


Expand Down
2 changes: 2 additions & 0 deletions redisai/postprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ def infoget(res):
decoder = staticmethod(decoder)
decoding_functions = (
"loadbackend",
"modelstore",
"modelset",
"modeldel",
"modelexecute",
"modelrun",
"tensorset",
"scriptset",
Expand Down
4 changes: 2 additions & 2 deletions redisai/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Union, ByteString, Sequence, List, AnyStr, Callable
import numpy as np
from typing import AnyStr, ByteString, Callable, List, Sequence, Union

import numpy as np

dtype_dict = {
"float": "FLOAT",
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
author="RedisLabs",
author_email="oss@redislabs.com",
packages=find_packages(),
install_requires=["redis", "hiredis", "numpy"],
install_requires=["redis", "hiredis", "numpy", "deprecated"],
python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ nose
codecov
numpy
ml2rt
deprecated
Loading