Skip to content

Commit

Permalink
tags backward compatibility in executor (#1481)
Browse files Browse the repository at this point in the history
* Fix tags propagation with Executor, closes #1474

* fix broken log formatting

* apply style fix and increase flakiness
  • Loading branch information
RafalSkolasinski authored Mar 4, 2020
1 parent a771afb commit 5eb177c
Show file tree
Hide file tree
Showing 24 changed files with 904 additions and 24 deletions.
40 changes: 32 additions & 8 deletions python/seldon_core/seldon_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@ def predict(
client_response = client_predict(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_predict(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def send_feedback(
Expand Down Expand Up @@ -162,14 +164,16 @@ def transform_input(
client_response = client_transform_input(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_transform_input(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def transform_output(
Expand Down Expand Up @@ -213,14 +217,16 @@ def transform_output(
client_response = client_transform_output(
user_model, features, datadef.names, meta=meta
)
return construct_response(user_model, False, request, client_response)
return construct_response(user_model, False, request, client_response, meta)
else:
(features, meta, datadef, data_type) = extract_request_parts_json(request)
class_names = datadef["names"] if datadef and "names" in datadef else []
client_response = client_transform_output(
user_model, features, class_names, meta=meta
)
return construct_response_json(user_model, False, request, client_response)
return construct_response_json(
user_model, False, request, client_response, meta
)


def route(
Expand Down Expand Up @@ -296,6 +302,14 @@ def aggregate(
Aggregated SeldonMessage proto
"""

def merge_meta(meta_list):
tags = {}
for meta in meta_list:
if meta:
tags.update(meta.get("tags", {}))
return {"tags": tags}

is_proto = isinstance(request, prediction_pb2.SeldonMessageList)

if hasattr(user_model, "aggregate_rest"):
Expand All @@ -314,15 +328,21 @@ def aggregate(
if is_proto:
features_list = []
names_list = []
meta_list = []

for msg in request.seldonMessages:
(features, meta, datadef, data_type) = extract_request_parts(msg)
features_list.append(features)
names_list.append(datadef.names)
meta_list.append(meta)

client_response = client_aggregate(user_model, features_list, names_list)
return construct_response(
user_model, False, request.seldonMessages[0], client_response
user_model,
False,
request.seldonMessages[0],
client_response,
merge_meta(meta_list),
)
else:
features_list = []
Expand All @@ -339,14 +359,18 @@ def aggregate(
f"Invalid request data type: {request}"
)

meta_list = []
for msg in msgs:
(features, meta, datadef, data_type) = extract_request_parts_json(msg)
class_names = datadef["names"] if datadef and "names" in datadef else []
features_list.append(features)
names_list.append(class_names)
meta_list.append(meta)

client_response = client_aggregate(user_model, features_list, names_list)
return construct_response_json(user_model, False, msgs[0], client_response)
return construct_response_json(
user_model, False, msgs[0], client_response, merge_meta(meta_list)
)


def health_status(user_model: Any) -> Union[prediction_pb2.SeldonMessage, List, Dict]:
Expand Down
33 changes: 24 additions & 9 deletions python/seldon_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ def construct_response_json(
is_request: bool,
client_request_raw: Union[List, Dict],
client_raw_response: Union[np.ndarray, str, bytes, dict],
meta: dict = None,
) -> Union[List, Dict]:
"""
This class converts a raw REST response into a JSON object that has the same structure as
Expand Down Expand Up @@ -410,8 +411,13 @@ def construct_response_json(
response["data"]["names"] = names

response["meta"] = {}
client_custom_tags(user_model)
tags = client_custom_tags(user_model)
if meta:
tags = meta.get("tags", {})
else:
tags = {}
custom_tags = client_custom_tags(user_model)
if custom_tags:
tags.update(custom_tags)
if tags:
response["meta"]["tags"] = tags
metrics = client_custom_metrics(user_model)
Expand All @@ -429,6 +435,7 @@ def construct_response(
is_request: bool,
client_request: prediction_pb2.SeldonMessage,
client_raw_response: Union[np.ndarray, str, bytes, dict],
meta: dict = None,
) -> prediction_pb2.SeldonMessage:
"""
Expand All @@ -449,18 +456,26 @@ def construct_response(
"""
data_type = client_request.WhichOneof("data_oneof")
meta = prediction_pb2.Meta()
meta_pb = prediction_pb2.Meta()
meta_json: Dict = {}
tags = client_custom_tags(user_model)

if meta:
tags = meta.get("tags", {})
else:
tags = {}
custom_tags = client_custom_tags(user_model)
if custom_tags:
tags.update(custom_tags)
if tags:
meta_json["tags"] = tags

metrics = client_custom_metrics(user_model)
if metrics:
meta_json["metrics"] = metrics
if client_request.meta:
if client_request.meta.puid:
meta_json["puid"] = client_request.meta.puid
json_format.ParseDict(meta_json, meta)
json_format.ParseDict(meta_json, meta_pb)
if isinstance(client_raw_response, np.ndarray) or isinstance(
client_raw_response, list
):
Expand All @@ -482,16 +497,16 @@ def construct_response(
else:
default_data_type = "ndarray"
data = array_to_grpc_datadef(default_data_type, client_raw_response, names)
return prediction_pb2.SeldonMessage(data=data, meta=meta)
return prediction_pb2.SeldonMessage(data=data, meta=meta_pb)
elif isinstance(client_raw_response, str):
return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta)
return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta_pb)
elif isinstance(client_raw_response, dict):
jsonDataResponse = ParseDict(
client_raw_response, prediction_pb2.SeldonMessage().jsonData
)
return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta)
return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta_pb)
elif isinstance(client_raw_response, (bytes, bytearray)):
return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta)
return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta_pb)
else:
raise SeldonMicroserviceException(
"Unknown data type returned as payload:" + client_raw_response
Expand Down
67 changes: 67 additions & 0 deletions python/tests/test_combiner_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,33 @@ def test_aggreate_ok_seldon_messages():
assert j["data"]["ndarray"] == [1]


def test_aggreate_combines_tags():
user_object = UserObject()
app = get_rest_microservice(user_object)
client = app.test_client()
msgs = (
"["
'{"meta":{"tags":{"input-1":"yes","common":1}}, "data":{"ndarray":[0]}}, '
'{"meta":{"tags":{"input-2":"yes","common":2}}, "data":{"ndarray":[1]}}'
"]"
)
# Note: double "{{}}" used to escape for string formatting
rv = client.get('/aggregate?json={{"seldonMessages":{}}}'.format(msgs))
logging.info(rv)
j = json.loads(rv.data)
logging.info(j)
assert rv.status_code == 200
assert j["meta"]["tags"] == {
"common": 2,
"input-1": "yes",
"input-2": "yes",
"mytag": 1,
}
assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"]
assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"]
assert j["data"]["ndarray"] == [0]


def test_aggreate_ok_list():
user_object = UserObject()
app = get_rest_microservice(user_object)
Expand Down Expand Up @@ -265,6 +292,46 @@ def test_aggregate_proto_ok():
assert j["data"]["tensor"]["values"] == [1, 2]


def test_aggregate_proto_combines_tags():
user_object = UserObject()
app = SeldonModelGRPC(user_object)

arr1 = np.array([1, 2])
meta1 = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"input-1": "yes", "common": 1}}, meta1)
datadef1 = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr1)
)

arr2 = np.array([3, 4])
meta2 = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"input-2": "yes", "common": 2}}, meta2)
datadef2 = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr2)
)

msg1 = prediction_pb2.SeldonMessage(data=datadef1, meta=meta1)
msg2 = prediction_pb2.SeldonMessage(data=datadef2, meta=meta2)
request = prediction_pb2.SeldonMessageList(seldonMessages=[msg1, msg2])
resp = app.Aggregate(request, None)
jStr = json_format.MessageToJson(resp)
j = json.loads(jStr)
logging.info(j)

assert j["meta"]["tags"] == {
"common": 2,
"input-1": "yes",
"input-2": "yes",
"mytag": 1,
}

# add default type
assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"]
assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"]
assert j["data"]["tensor"]["shape"] == [2, 1]
assert j["data"]["tensor"]["values"] == [1, 2]


def test_aggregate_proto_bin_data():
user_object = UserObject()
app = SeldonModelGRPC(user_object)
Expand Down
46 changes: 44 additions & 2 deletions python/tests/test_microservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import numpy as np
import signal
import unittest.mock as mock
from google.protobuf import json_format


@contextmanager
Expand Down Expand Up @@ -54,8 +55,8 @@ def start_microservice(app_location, tracing=False, grpc=False, envs={}):
)
if tracing:
cmd = cmd + ("--tracing",)
logging.info("starting:", " ".join(cmd))
logging.info("cwd:", app_location)
logging.info("starting: %s", " ".join(cmd))
logging.info("cwd: %s", app_location)
# stdout=PIPE, stderr=PIPE,
p = Popen(cmd, cwd=app_location, env=env_vars, preexec_fn=os.setsid)

Expand Down Expand Up @@ -100,6 +101,22 @@ def test_model_template_app_rest(tracing):
assert response.json() == {"data": {"ndarray": []}, "meta": {}}


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_rest_tags(tracing):
with start_microservice(
join(dirname(__file__), "model-template-app"), tracing=tracing
):
data = '{"meta":{"tags":{"foo":"bar"}},"data":{"names":["a","b"],"ndarray":[[1.0,2.0]]}}'
response = requests.get(
"http://127.0.0.1:5000/predict", params="json=%s" % data
)
response.raise_for_status()
assert response.json() == {
"data": {"names": ["t:0", "t:1"], "ndarray": [[1.0, 2.0]]},
"meta": {"tags": {"foo": "bar"}},
}


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_rest_submodule(tracing):
with start_microservice(
Expand Down Expand Up @@ -154,6 +171,31 @@ def test_model_template_app_grpc(tracing):
response = stub.SendFeedback(request=request)


@pytest.mark.parametrize("tracing", [(False), (True)])
def test_model_template_app_grpc_tags(tracing):
with start_microservice(
join(dirname(__file__), "model-template-app"), tracing=tracing, grpc=True
):
data = np.array([[1, 2]])
datadef = prediction_pb2.DefaultData(
tensor=prediction_pb2.Tensor(shape=data.shape, values=data.flatten())
)

meta = prediction_pb2.Meta()
json_format.ParseDict({"tags": {"foo": "bar"}}, meta)

request = prediction_pb2.SeldonMessage(data=datadef, meta=meta)
channel = grpc.insecure_channel("0.0.0.0:5000")
stub = prediction_pb2_grpc.ModelStub(channel)
response = stub.Predict(request=request)
assert response.data.tensor.shape[0] == 1
assert response.data.tensor.shape[1] == 2
assert response.data.tensor.values[0] == 1
assert response.data.tensor.values[1] == 2

assert response.meta.tags["foo"].string_value == "bar"


def test_model_template_app_tracing_config():
envs = {
"JAEGER_CONFIG_PATH": join(dirname(__file__), "tracing_config/tracing.yaml")
Expand Down
Loading

0 comments on commit 5eb177c

Please sign in to comment.