diff --git a/python/seldon_core/seldon_methods.py b/python/seldon_core/seldon_methods.py index 7dc27e6fb3..860a8b71fb 100644 --- a/python/seldon_core/seldon_methods.py +++ b/python/seldon_core/seldon_methods.py @@ -62,14 +62,16 @@ def predict( client_response = client_predict( user_model, features, datadef.names, meta=meta ) - return construct_response(user_model, False, request, client_response) + return construct_response(user_model, False, request, client_response, meta) else: (features, meta, datadef, data_type) = extract_request_parts_json(request) class_names = datadef["names"] if datadef and "names" in datadef else [] client_response = client_predict( user_model, features, class_names, meta=meta ) - return construct_response_json(user_model, False, request, client_response) + return construct_response_json( + user_model, False, request, client_response, meta + ) def send_feedback( @@ -162,14 +164,16 @@ def transform_input( client_response = client_transform_input( user_model, features, datadef.names, meta=meta ) - return construct_response(user_model, False, request, client_response) + return construct_response(user_model, False, request, client_response, meta) else: (features, meta, datadef, data_type) = extract_request_parts_json(request) class_names = datadef["names"] if datadef and "names" in datadef else [] client_response = client_transform_input( user_model, features, class_names, meta=meta ) - return construct_response_json(user_model, False, request, client_response) + return construct_response_json( + user_model, False, request, client_response, meta + ) def transform_output( @@ -213,14 +217,16 @@ def transform_output( client_response = client_transform_output( user_model, features, datadef.names, meta=meta ) - return construct_response(user_model, False, request, client_response) + return construct_response(user_model, False, request, client_response, meta) else: (features, meta, datadef, data_type) = extract_request_parts_json(request) class_names = datadef["names"] if datadef and "names" in datadef else [] client_response = client_transform_output( user_model, features, class_names, meta=meta ) - return construct_response_json(user_model, False, request, client_response) + return construct_response_json( + user_model, False, request, client_response, meta + ) def route( @@ -296,6 +302,14 @@ def aggregate( Aggregated SeldonMessage proto """ + + def merge_meta(meta_list): + tags = {} + for meta in meta_list: + if meta: + tags.update(meta.get("tags", {})) + return {"tags": tags} + is_proto = isinstance(request, prediction_pb2.SeldonMessageList) if hasattr(user_model, "aggregate_rest"): @@ -314,15 +328,21 @@ def aggregate( if is_proto: features_list = [] names_list = [] + meta_list = [] for msg in request.seldonMessages: (features, meta, datadef, data_type) = extract_request_parts(msg) features_list.append(features) names_list.append(datadef.names) + meta_list.append(meta) client_response = client_aggregate(user_model, features_list, names_list) return construct_response( - user_model, False, request.seldonMessages[0], client_response + user_model, + False, + request.seldonMessages[0], + client_response, + merge_meta(meta_list), ) else: features_list = [] @@ -339,14 +359,18 @@ def aggregate( f"Invalid request data type: {request}" ) + meta_list = [] for msg in msgs: (features, meta, datadef, data_type) = extract_request_parts_json(msg) class_names = datadef["names"] if datadef and "names" in datadef else [] features_list.append(features) names_list.append(class_names) + meta_list.append(meta) client_response = client_aggregate(user_model, features_list, names_list) - return construct_response_json(user_model, False, msgs[0], client_response) + return construct_response_json( + user_model, False, msgs[0], client_response, merge_meta(meta_list) + ) def health_status(user_model: Any) -> Union[prediction_pb2.SeldonMessage, List, Dict]: diff --git a/python/seldon_core/utils.py b/python/seldon_core/utils.py index 4bd212f325..ce8ae63d70 100644 --- a/python/seldon_core/utils.py +++ b/python/seldon_core/utils.py @@ -322,6 +322,7 @@ def construct_response_json( is_request: bool, client_request_raw: Union[List, Dict], client_raw_response: Union[np.ndarray, str, bytes, dict], + meta: dict = None, ) -> Union[List, Dict]: """ This class converts a raw REST response into a JSON object that has the same structure as @@ -410,8 +411,13 @@ def construct_response_json( response["data"]["names"] = names response["meta"] = {} - client_custom_tags(user_model) - tags = client_custom_tags(user_model) + if meta: + tags = meta.get("tags", {}) + else: + tags = {} + custom_tags = client_custom_tags(user_model) + if custom_tags: + tags.update(custom_tags) if tags: response["meta"]["tags"] = tags metrics = client_custom_metrics(user_model) @@ -429,6 +435,7 @@ def construct_response( is_request: bool, client_request: prediction_pb2.SeldonMessage, client_raw_response: Union[np.ndarray, str, bytes, dict], + meta: dict = None, ) -> prediction_pb2.SeldonMessage: """ @@ -449,18 +456,26 @@ def construct_response( """ data_type = client_request.WhichOneof("data_oneof") - meta = prediction_pb2.Meta() + meta_pb = prediction_pb2.Meta() meta_json: Dict = {} - tags = client_custom_tags(user_model) + + if meta: + tags = meta.get("tags", {}) + else: + tags = {} + custom_tags = client_custom_tags(user_model) + if custom_tags: + tags.update(custom_tags) if tags: meta_json["tags"] = tags + metrics = client_custom_metrics(user_model) if metrics: meta_json["metrics"] = metrics if client_request.meta: if client_request.meta.puid: meta_json["puid"] = client_request.meta.puid - json_format.ParseDict(meta_json, meta) + json_format.ParseDict(meta_json, meta_pb) if isinstance(client_raw_response, np.ndarray) or isinstance( client_raw_response, list ): @@ -482,16 +497,16 @@ def construct_response( else: default_data_type = "ndarray" data = array_to_grpc_datadef(default_data_type, client_raw_response, names) - return prediction_pb2.SeldonMessage(data=data, meta=meta) + return prediction_pb2.SeldonMessage(data=data, meta=meta_pb) elif isinstance(client_raw_response, str): - return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta) + return prediction_pb2.SeldonMessage(strData=client_raw_response, meta=meta_pb) elif isinstance(client_raw_response, dict): jsonDataResponse = ParseDict( client_raw_response, prediction_pb2.SeldonMessage().jsonData ) - return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta) + return prediction_pb2.SeldonMessage(jsonData=jsonDataResponse, meta=meta_pb) elif isinstance(client_raw_response, (bytes, bytearray)): - return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta) + return prediction_pb2.SeldonMessage(binData=client_raw_response, meta=meta_pb) else: raise SeldonMicroserviceException( "Unknown data type returned as payload:" + client_raw_response diff --git a/python/tests/test_combiner_microservice.py b/python/tests/test_combiner_microservice.py index 0d238b5019..30892d645b 100644 --- a/python/tests/test_combiner_microservice.py +++ b/python/tests/test_combiner_microservice.py @@ -96,6 +96,33 @@ def test_aggreate_ok_seldon_messages(): assert j["data"]["ndarray"] == [1] +def test_aggreate_combines_tags(): + user_object = UserObject() + app = get_rest_microservice(user_object) + client = app.test_client() + msgs = ( + "[" + '{"meta":{"tags":{"input-1":"yes","common":1}}, "data":{"ndarray":[0]}}, ' + '{"meta":{"tags":{"input-2":"yes","common":2}}, "data":{"ndarray":[1]}}' + "]" + ) + # Note: double "{{}}" used to escape for string formatting + rv = client.get('/aggregate?json={{"seldonMessages":{}}}'.format(msgs)) + logging.info(rv) + j = json.loads(rv.data) + logging.info(j) + assert rv.status_code == 200 + assert j["meta"]["tags"] == { + "common": 2, + "input-1": "yes", + "input-2": "yes", + "mytag": 1, + } + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + assert j["data"]["ndarray"] == [0] + + def test_aggreate_ok_list(): user_object = UserObject() app = get_rest_microservice(user_object) @@ -265,6 +292,46 @@ def test_aggregate_proto_ok(): assert j["data"]["tensor"]["values"] == [1, 2] +def test_aggregate_proto_combines_tags(): + user_object = UserObject() + app = SeldonModelGRPC(user_object) + + arr1 = np.array([1, 2]) + meta1 = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"input-1": "yes", "common": 1}}, meta1) + datadef1 = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr1) + ) + + arr2 = np.array([3, 4]) + meta2 = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"input-2": "yes", "common": 2}}, meta2) + datadef2 = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr2) + ) + + msg1 = prediction_pb2.SeldonMessage(data=datadef1, meta=meta1) + msg2 = prediction_pb2.SeldonMessage(data=datadef2, meta=meta2) + request = prediction_pb2.SeldonMessageList(seldonMessages=[msg1, msg2]) + resp = app.Aggregate(request, None) + jStr = json_format.MessageToJson(resp) + j = json.loads(jStr) + logging.info(j) + + assert j["meta"]["tags"] == { + "common": 2, + "input-1": "yes", + "input-2": "yes", + "mytag": 1, + } + + # add default type + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + assert j["data"]["tensor"]["shape"] == [2, 1] + assert j["data"]["tensor"]["values"] == [1, 2] + + def test_aggregate_proto_bin_data(): user_object = UserObject() app = SeldonModelGRPC(user_object) diff --git a/python/tests/test_microservice.py b/python/tests/test_microservice.py index 548b2d6982..93067cfc74 100644 --- a/python/tests/test_microservice.py +++ b/python/tests/test_microservice.py @@ -15,6 +15,7 @@ import numpy as np import signal import unittest.mock as mock +from google.protobuf import json_format @contextmanager @@ -54,8 +55,8 @@ def start_microservice(app_location, tracing=False, grpc=False, envs={}): ) if tracing: cmd = cmd + ("--tracing",) - logging.info("starting:", " ".join(cmd)) - logging.info("cwd:", app_location) + logging.info("starting: %s", " ".join(cmd)) + logging.info("cwd: %s", app_location) # stdout=PIPE, stderr=PIPE, p = Popen(cmd, cwd=app_location, env=env_vars, preexec_fn=os.setsid) @@ -100,6 +101,22 @@ def test_model_template_app_rest(tracing): assert response.json() == {"data": {"ndarray": []}, "meta": {}} +@pytest.mark.parametrize("tracing", [(False), (True)]) +def test_model_template_app_rest_tags(tracing): + with start_microservice( + join(dirname(__file__), "model-template-app"), tracing=tracing + ): + data = '{"meta":{"tags":{"foo":"bar"}},"data":{"names":["a","b"],"ndarray":[[1.0,2.0]]}}' + response = requests.get( + "http://127.0.0.1:5000/predict", params="json=%s" % data + ) + response.raise_for_status() + assert response.json() == { + "data": {"names": ["t:0", "t:1"], "ndarray": [[1.0, 2.0]]}, + "meta": {"tags": {"foo": "bar"}}, + } + + @pytest.mark.parametrize("tracing", [(False), (True)]) def test_model_template_app_rest_submodule(tracing): with start_microservice( @@ -154,6 +171,31 @@ def test_model_template_app_grpc(tracing): response = stub.SendFeedback(request=request) +@pytest.mark.parametrize("tracing", [(False), (True)]) +def test_model_template_app_grpc_tags(tracing): + with start_microservice( + join(dirname(__file__), "model-template-app"), tracing=tracing, grpc=True + ): + data = np.array([[1, 2]]) + datadef = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=data.shape, values=data.flatten()) + ) + + meta = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"foo": "bar"}}, meta) + + request = prediction_pb2.SeldonMessage(data=datadef, meta=meta) + channel = grpc.insecure_channel("0.0.0.0:5000") + stub = prediction_pb2_grpc.ModelStub(channel) + response = stub.Predict(request=request) + assert response.data.tensor.shape[0] == 1 + assert response.data.tensor.shape[1] == 2 + assert response.data.tensor.values[0] == 1 + assert response.data.tensor.values[1] == 2 + + assert response.meta.tags["foo"].string_value == "bar" + + def test_model_template_app_tracing_config(): envs = { "JAEGER_CONFIG_PATH": join(dirname(__file__), "tracing_config/tracing.yaml") diff --git a/python/tests/test_model_microservice.py b/python/tests/test_model_microservice.py index 2771c0db61..5f6daf3ca4 100644 --- a/python/tests/test_model_microservice.py +++ b/python/tests/test_model_microservice.py @@ -607,12 +607,29 @@ def test_model_gets_meta(): rv = client.get('/predict?json={"meta":{"puid": "abc"},"data":{"ndarray":[]}}') j = json.loads(rv.data) logging.info(j) + assert rv.status_code == 200 assert j["meta"]["tags"] == {"inc_meta": {"puid": "abc"}} assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] +def test_model_passes_through_tags(): + user_object = UserObject() + app = get_rest_microservice(user_object) + client = app.test_client() + rv = client.get( + '/predict?json={"meta":{"tags":{"foo":"bar"}},"data":{"ndarray":[]}}' + ) + j = json.loads(rv.data) + logging.info(j) + + assert rv.status_code == 200 + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + + def test_model_seldon_json_ok(): user_object = UserObject() app = get_rest_microservice(user_object) @@ -681,6 +698,27 @@ def test_proto_ok(): assert j["data"]["tensor"]["values"] == [1, 2] +def test_proto_passes_through_tags(): + user_object = UserObject() + app = SeldonModelGRPC(user_object) + arr = np.array([1, 2]) + datadef = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr), + ) + meta = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"foo": "bar"}}, meta) + request = prediction_pb2.SeldonMessage(data=datadef, meta=meta) + resp = app.Predict(request, None) + jStr = json_format.MessageToJson(resp) + j = json.loads(jStr) + logging.info(j) + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + assert j["data"]["tensor"]["shape"] == [2, 1] + assert j["data"]["tensor"]["values"] == [1, 2] + + def test_proto_lowlevel(): user_object = UserObjectLowLevelGrpc() app = SeldonModelGRPC(user_object) diff --git a/python/tests/test_router_microservice.py b/python/tests/test_router_microservice.py index ab806f9ca7..e2039b01e5 100644 --- a/python/tests/test_router_microservice.py +++ b/python/tests/test_router_microservice.py @@ -20,7 +20,7 @@ def __init__(self, metrics_ok=True, ret_meta=False): self.ret_meta = ret_meta def route(self, X, features_names, **kwargs): - print("Route called") + logging.info("Route called") if self.ret_meta: self.inc_meta = kwargs.get("meta") return 22 @@ -133,7 +133,7 @@ def test_router_gets_meta(): client = app.test_client() rv = client.get('/route?json={"meta":{"puid": "abc"}, "data":{"ndarray":[2]}}') j = json.loads(rv.data) - print(j) + logging.info(j) assert rv.status_code == 200 assert j["meta"]["tags"] == {"inc_meta": {"puid": "abc"}} assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] @@ -146,7 +146,7 @@ def test_router_meta_to_nonmeta_model(): client = app.test_client() rv = client.get('/route?json={"meta":{"puid": "abc"}, "data":{"ndarray":[2]}}') j = json.loads(rv.data) - print(j) + logging.info(j) assert rv.status_code == 200 assert j["data"]["ndarray"] == [[22]] diff --git a/python/tests/test_transformer_microservice.py b/python/tests/test_transformer_microservice.py index 98e27213c4..c357f4aa02 100644 --- a/python/tests/test_transformer_microservice.py +++ b/python/tests/test_transformer_microservice.py @@ -318,6 +318,36 @@ def test_transform_output_gets_meta(): assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] +def test_transform_input_passes_through_tags(): + user_object = UserObject() + app = get_rest_microservice(user_object) + client = app.test_client() + rv = client.get( + '/transform-input?json={"meta":{"tags":{"foo":"bar"}},"data":{"ndarray":[]}}' + ) + j = json.loads(rv.data) + logging.info(j) + assert rv.status_code == 200 + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + + +def test_transform_output_passes_through_tags(): + user_object = UserObject() + app = get_rest_microservice(user_object) + client = app.test_client() + rv = client.get( + '/transform-output?json={"meta":{"tags":{"foo":"bar"}},"data":{"ndarray":[]}}' + ) + j = json.loads(rv.data) + logging.info(j) + assert rv.status_code == 200 + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + + def test_transformer_output_ok(): user_object = UserObject() app = get_rest_microservice(user_object) @@ -586,6 +616,48 @@ def test_transform_output_proto_gets_meta(): assert j["data"]["tensor"]["values"] == [1, 2] +def test_transform_proto_input_passes_through_tags(): + user_object = UserObject() + app = SeldonModelGRPC(user_object) + arr = np.array([1, 2]) + datadef = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr) + ) + meta = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"foo": "bar"}}, meta) + request = prediction_pb2.SeldonMessage(data=datadef, meta=meta) + resp = app.TransformInput(request, None) + jStr = json_format.MessageToJson(resp) + j = json.loads(jStr) + logging.info(j) + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + assert j["data"]["tensor"]["shape"] == [2, 1] + assert j["data"]["tensor"]["values"] == [1, 2] + + +def test_transform_proto_output_passes_through_tags(): + user_object = UserObject() + app = SeldonModelGRPC(user_object) + arr = np.array([1, 2]) + datadef = prediction_pb2.DefaultData( + tensor=prediction_pb2.Tensor(shape=(2, 1), values=arr) + ) + meta = prediction_pb2.Meta() + json_format.ParseDict({"tags": {"foo": "bar"}}, meta) + request = prediction_pb2.SeldonMessage(data=datadef, meta=meta) + resp = app.TransformOutput(request, None) + jStr = json_format.MessageToJson(resp) + j = json.loads(jStr) + logging.info(j) + assert j["meta"]["tags"] == {"foo": "bar", "mytag": 1} + assert j["meta"]["metrics"][0]["key"] == user_object.metrics()[0]["key"] + assert j["meta"]["metrics"][0]["value"] == user_object.metrics()[0]["value"] + assert j["data"]["tensor"]["shape"] == [2, 1] + assert j["data"]["tensor"]["values"] == [1, 2] + + def test_unimplemented_transform_input_raw_on_seldon_component(): class CustomSeldonComponent(SeldonComponent): def transform_input(self, X, features_names, **kwargs): diff --git a/testing/resources/tags_combiner_grpc.json b/testing/resources/tags_combiner_grpc.json new file mode 100644 index 0000000000..6d070f6a90 --- /dev/null +++ b/testing/resources/tags_combiner_grpc.json @@ -0,0 +1,67 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-combiner" + }, + "spec": { + "name": "mymodel-tags-combiner", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_grpc:0.1", + "imagePullPolicy": "Never", + "name": "model-one" + }, + { + "image": "seldonio/test_tags_two_grpc:0.1", + "imagePullPolicy": "Never", + "name": "model-two" + }, + { + "image": "seldonio/test_tags_combiner_grpc:0.1", + "imagePullPolicy": "Never", + "name": "combiner" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [ + { + "children": [], + "endpoint": { + "type": "GRPC" + }, + "name": "model-one", + "type": "MODEL" + }, + { + "children": [], + "endpoint": { + "type": "GRPC" + }, + "name": "model-two", + "type": "MODEL" + } + ], + "endpoint": { + "type": "GRPC" + }, + "name": "combiner", + "type": "COMBINER" + }, + "name": "mymodel-tags-graph", + "replicas": 1 + } + ] + } +} diff --git a/testing/resources/tags_combiner_rest.json b/testing/resources/tags_combiner_rest.json new file mode 100644 index 0000000000..31cc393d47 --- /dev/null +++ b/testing/resources/tags_combiner_rest.json @@ -0,0 +1,67 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-combiner" + }, + "spec": { + "name": "mymodel-tags-combiner", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_rest:0.1", + "imagePullPolicy": "Never", + "name": "model-one" + }, + { + "image": "seldonio/test_tags_two_rest:0.1", + "imagePullPolicy": "Never", + "name": "model-two" + }, + { + "image": "seldonio/test_tags_combiner_rest:0.1", + "imagePullPolicy": "Never", + "name": "combiner" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [ + { + "children": [], + "endpoint": { + "type": "REST" + }, + "name": "model-one", + "type": "MODEL" + }, + { + "children": [], + "endpoint": { + "type": "REST" + }, + "name": "model-two", + "type": "MODEL" + } + ], + "endpoint": { + "type": "REST" + }, + "name": "combiner", + "type": "COMBINER" + }, + "name": "mymodel-tags-graph", + "replicas": 1 + } + ] + } +} diff --git a/testing/resources/tags_graph_grpc.json b/testing/resources/tags_graph_grpc.json new file mode 100644 index 0000000000..c6ac1f327d --- /dev/null +++ b/testing/resources/tags_graph_grpc.json @@ -0,0 +1,54 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-graph" + }, + "spec": { + "name": "mymodel-tags-graph", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_grpc:0.1", + "imagePullPolicy": "Never", + "name": "model-one" + }, + { + "image": "seldonio/test_tags_two_grpc:0.1", + "imagePullPolicy": "Never", + "name": "model-two" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [ + { + "children": [], + "endpoint": { + "type": "GRPC" + }, + "name": "model-two", + "type": "MODEL" + } + ], + "endpoint": { + "type": "GRPC" + }, + "name": "model-one", + "type": "MODEL" + }, + "name": "mymodel-tags-graph", + "replicas": 1 + } + ] + } +} diff --git a/testing/resources/tags_graph_rest.json b/testing/resources/tags_graph_rest.json new file mode 100644 index 0000000000..8336ab6fcd --- /dev/null +++ b/testing/resources/tags_graph_rest.json @@ -0,0 +1,54 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-graph" + }, + "spec": { + "name": "mymodel-tags-graph", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_rest:0.1", + "imagePullPolicy": "Never", + "name": "model-one" + }, + { + "image": "seldonio/test_tags_two_rest:0.1", + "imagePullPolicy": "Never", + "name": "model-two" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [ + { + "children": [], + "endpoint": { + "type": "REST" + }, + "name": "model-two", + "type": "MODEL" + } + ], + "endpoint": { + "type": "REST" + }, + "name": "model-one", + "type": "MODEL" + }, + "name": "mymodel-tags-graph", + "replicas": 1 + } + ] + } +} diff --git a/testing/resources/tags_single_grpc.json b/testing/resources/tags_single_grpc.json new file mode 100644 index 0000000000..5cdb17b8f8 --- /dev/null +++ b/testing/resources/tags_single_grpc.json @@ -0,0 +1,40 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-single" + }, + "spec": { + "name": "mymodel-tags-single", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_grpc:0.1", + "imagePullPolicy": "Never", + "name": "model" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [], + "endpoint": { + "type": "GRPC" + }, + "name": "model", + "type": "MODEL" + }, + "name": "mymodel-tags-single", + "replicas": 1 + } + ] + } +} diff --git a/testing/resources/tags_single_rest.json b/testing/resources/tags_single_rest.json new file mode 100644 index 0000000000..9932835f32 --- /dev/null +++ b/testing/resources/tags_single_rest.json @@ -0,0 +1,40 @@ +{ + "apiVersion": "machinelearning.seldon.io/v1alpha2", + "kind": "SeldonDeployment", + "metadata": { + "name": "mymodel-tags-single" + }, + "spec": { + "name": "mymodel-tags-single", + "oauth_key": "oauth-key", + "oauth_secret": "oauth-secret", + "predictors": [ + { + "componentSpecs": [ + { + "spec": { + "containers": [ + { + "image": "seldonio/test_tags_one_rest:0.1", + "imagePullPolicy": "Never", + "name": "model" + } + ], + "terminationGracePeriodSeconds": 1 + } + } + ], + "graph": { + "children": [], + "endpoint": { + "type": "REST" + }, + "name": "model", + "type": "MODEL" + }, + "name": "mymodel-tags-single", + "replicas": 1 + } + ] + } +} diff --git a/testing/s2i/python-features/tags/Combiner.py b/testing/s2i/python-features/tags/Combiner.py new file mode 100644 index 0000000000..55814a2c27 --- /dev/null +++ b/testing/s2i/python-features/tags/Combiner.py @@ -0,0 +1,11 @@ +import logging +import numpy as np + + +class Combiner(object): + def aggregate(self, X, features_names=[]): + logging.info(X) + return np.array(X).tolist() + + def tags(self): + return {"combiner": "yes"} diff --git a/testing/s2i/python-features/tags/ModelOne.py b/testing/s2i/python-features/tags/ModelOne.py new file mode 100644 index 0000000000..6508d6669c --- /dev/null +++ b/testing/s2i/python-features/tags/ModelOne.py @@ -0,0 +1,12 @@ +import logging + + +class ModelOne: + def predict(self, X, feature_names, meta): + logging.info(X) + logging.info(feature_names) + logging.info(meta) + return ["model-1"] + + def tags(self): + return {"model-1": "yes", "common": 1} diff --git a/testing/s2i/python-features/tags/ModelTwo.py b/testing/s2i/python-features/tags/ModelTwo.py new file mode 100644 index 0000000000..6b8799f678 --- /dev/null +++ b/testing/s2i/python-features/tags/ModelTwo.py @@ -0,0 +1,12 @@ +import logging + + +class ModelTwo: + def predict(self, X, feature_names, meta): + logging.info(X) + logging.info(feature_names) + logging.info(meta) + return ["model-2"] + + def tags(self): + return {"model-2": "yes", "common": 2} diff --git a/testing/s2i/python-features/tags/environment_combiner_grpc b/testing/s2i/python-features/tags/environment_combiner_grpc new file mode 100644 index 0000000000..f158f60f3e --- /dev/null +++ b/testing/s2i/python-features/tags/environment_combiner_grpc @@ -0,0 +1,4 @@ +MODEL_NAME=Combiner +API_TYPE=GRPC +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/s2i/python-features/tags/environment_combiner_rest b/testing/s2i/python-features/tags/environment_combiner_rest new file mode 100644 index 0000000000..ca2d1b6205 --- /dev/null +++ b/testing/s2i/python-features/tags/environment_combiner_rest @@ -0,0 +1,4 @@ +MODEL_NAME=Combiner +API_TYPE=REST +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/s2i/python-features/tags/environment_one_grpc b/testing/s2i/python-features/tags/environment_one_grpc new file mode 100644 index 0000000000..0a7ebca5b8 --- /dev/null +++ b/testing/s2i/python-features/tags/environment_one_grpc @@ -0,0 +1,4 @@ +MODEL_NAME=ModelOne +API_TYPE=GRPC +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/s2i/python-features/tags/environment_one_rest b/testing/s2i/python-features/tags/environment_one_rest new file mode 100644 index 0000000000..a35ae55a67 --- /dev/null +++ b/testing/s2i/python-features/tags/environment_one_rest @@ -0,0 +1,4 @@ +MODEL_NAME=ModelOne +API_TYPE=REST +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/s2i/python-features/tags/environment_two_grpc b/testing/s2i/python-features/tags/environment_two_grpc new file mode 100644 index 0000000000..aa2d146a49 --- /dev/null +++ b/testing/s2i/python-features/tags/environment_two_grpc @@ -0,0 +1,4 @@ +MODEL_NAME=ModelTwo +API_TYPE=GRPC +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/s2i/python-features/tags/environment_two_rest b/testing/s2i/python-features/tags/environment_two_rest new file mode 100644 index 0000000000..5993937406 --- /dev/null +++ b/testing/s2i/python-features/tags/environment_two_rest @@ -0,0 +1,4 @@ +MODEL_NAME=ModelTwo +API_TYPE=REST +SERVICE_TYPE=MODEL +PERSISTENCE=0 diff --git a/testing/scripts/test_rolling_updates.py b/testing/scripts/test_rolling_updates.py index 8160e84e4d..d30f2e33df 100644 --- a/testing/scripts/test_rolling_updates.py +++ b/testing/scripts/test_rolling_updates.py @@ -27,7 +27,7 @@ def to_resources_path(file_name): ) -@pytest.mark.flaky +@pytest.mark.flaky(max_runs=3) @with_api_gateways class TestRollingHttp(object): # Test updating a model to a multi predictor model @@ -226,7 +226,7 @@ def test_rolling_update10(self, namespace, api_gateway): run(f"kubectl delete -f ../resources/graph6svc.json -n {namespace}", shell=True) -@pytest.mark.flaky +@pytest.mark.flaky(max_runs=3) @with_api_gateways @pytest.mark.parametrize( "from_deployment,to_deployment", diff --git a/testing/scripts/test_tags.py b/testing/scripts/test_tags.py new file mode 100644 index 0000000000..e3eebf6c76 --- /dev/null +++ b/testing/scripts/test_tags.py @@ -0,0 +1,241 @@ +import pytest +import time +from google.protobuf import json_format +import json +from subprocess import run +import numpy as np +from seldon_e2e_utils import ( + wait_for_status, + wait_for_rollout, + rest_request_ambassador, + initial_rest_request, + grpc_request_ambassador, + initial_grpc_request, + retry_run, + API_AMBASSADOR, +) +import logging + + +S2I_CREATE = """cd ../s2i/python-features/tags && \ + s2i build -E environment_{model}_{api_type} . \ + seldonio/seldon-core-s2i-python3:{s2i_python_version} \ + seldonio/test_tags_{model}_{api_type}:0.1 +""" +IMAGE_NAME = "seldonio/test_tags_{model}_{api_type}:0.1" + + +def create_s2i_image(s2i_python_version, model, api_type): + cmd = S2I_CREATE.format( + s2i_python_version=s2i_python_version, model=model, api_type=api_type + ) + + logging.info(cmd) + run(cmd, shell=True, check=True) + + +def kind_push_s2i_image(model, api_type): + img = get_image_name(model, api_type) + cmd = "kind load docker-image " + img + logging.info(cmd) + run(cmd, shell=True, check=True) + + +def get_image_name(model, api_type): + return IMAGE_NAME.format(model=model, api_type=api_type) + + +def create_push_s2i_image(s2i_python_version, model, api_type): + create_s2i_image(s2i_python_version, model, api_type) + kind_push_s2i_image(model, api_type) + + +@pytest.mark.sequential +@pytest.mark.usefixtures("s2i_python_version") +class TestPythonS2i(object): + def test_build_model_one_rest(self, s2i_python_version): + create_s2i_image(s2i_python_version, "one", "rest") + img = get_image_name("one", "rest") + run("docker run -d --rm --name 'model-one-rest' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f model-one-rest", shell=True, check=True) + + def test_build_model_two_rest(self, s2i_python_version): + create_s2i_image(s2i_python_version, "two", "rest") + img = get_image_name("two", "rest") + run("docker run -d --rm --name 'model-two-rest' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f model-two-rest", shell=True, check=True) + + def test_build_model_one_grpc(self, s2i_python_version): + create_s2i_image(s2i_python_version, "one", "grpc") + img = get_image_name("one", "grpc") + run("docker run -d --rm --name 'model-one-grpc' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f model-one-grpc", shell=True, check=True) + + def test_build_model_two_grpc(self, s2i_python_version): + create_s2i_image(s2i_python_version, "two", "grpc") + img = get_image_name("two", "grpc") + run("docker run -d --rm --name 'model-two-grpc' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f model-two-grpc", shell=True, check=True) + + def test_build_combiner_rest(self, s2i_python_version): + create_s2i_image(s2i_python_version, "combiner", "rest") + img = get_image_name("combiner", "rest") + run("docker run -d --rm --name 'combiner-rest' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f combiner-rest", shell=True, check=True) + + def test_build_combiner_grpc(self, s2i_python_version): + create_s2i_image(s2i_python_version, "combiner", "grpc") + img = get_image_name("combiner", "grpc") + run("docker run -d --rm --name 'combiner-grpc' " + img, shell=True, check=True) + time.sleep(2) + run("docker rm -f combiner-grpc", shell=True, check=True) + + +@pytest.mark.sequential +@pytest.mark.usefixtures("namespace") +@pytest.mark.usefixtures("s2i_python_version") +class TestPythonS2iK8s(object): + def test_model_single_rest(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "rest") + retry_run(f"kubectl apply -f ../resources/tags_single_rest.json -n {namespace}") + wait_for_status("mymodel-tags-single", namespace) + wait_for_rollout("mymodel-tags-single", namespace) + r = initial_rest_request("mymodel-tags-single", namespace) + arr = np.array([[1, 2, 3]]) + r = rest_request_ambassador( + "mymodel-tags-single", namespace, API_AMBASSADOR, data=arr + ) + res = r.json() + logging.info(res) + assert r.status_code == 200 + assert res["data"]["ndarray"] == ["model-1"] + assert res["meta"]["tags"] == {"common": 1, "model-1": "yes"} + run( + f"kubectl delete -f ../resources/tags_single_rest.json -n {namespace}", + shell=True, + ) + + def test_model_graph_rest(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "rest") + create_push_s2i_image(s2i_python_version, "two", "rest") + retry_run(f"kubectl apply -f ../resources/tags_graph_rest.json -n {namespace}") + wait_for_status("mymodel-tags-graph", namespace) + wait_for_rollout("mymodel-tags-graph", namespace) + r = initial_rest_request("mymodel-tags-graph", namespace) + arr = np.array([[1, 2, 3]]) + r = rest_request_ambassador( + "mymodel-tags-graph", namespace, API_AMBASSADOR, data=arr + ) + res = r.json() + logging.info(res) + assert r.status_code == 200 + assert res["data"]["ndarray"] == ["model-2"] + assert res["meta"]["tags"] == {"common": 2, "model-1": "yes", "model-2": "yes"} + run( + f"kubectl delete -f ../resources/tags_graph_rest.json -n {namespace}", + shell=True, + ) + + def test_model_combiner_rest(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "rest") + create_push_s2i_image(s2i_python_version, "two", "rest") + create_push_s2i_image(s2i_python_version, "combiner", "rest") + retry_run( + f"kubectl apply -f ../resources/tags_combiner_rest.json -n {namespace}" + ) + wait_for_status("mymodel-tags-combiner", namespace) + wait_for_rollout("mymodel-tags-combiner", namespace) + r = initial_rest_request("mymodel-tags-combiner", namespace) + arr = np.array([[1, 2, 3]]) + r = rest_request_ambassador( + "mymodel-tags-combiner", namespace, API_AMBASSADOR, data=arr + ) + res = r.json() + logging.info(res) + assert r.status_code == 200 + assert res["data"]["ndarray"] == [["model-1"], ["model-2"]] + assert res["meta"]["tags"] == { + "combiner": "yes", + "common": 2, + "model-1": "yes", + "model-2": "yes", + } + run( + f"kubectl delete -f ../resources/tags_combiner_rest.json -n {namespace}", + shell=True, + ) + + def test_model_single_grpc(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "grpc") + retry_run(f"kubectl apply -f ../resources/tags_single_grpc.json -n {namespace}") + wait_for_status("mymodel-tags-single", namespace) + wait_for_rollout("mymodel-tags-single", namespace) + r = initial_grpc_request("mymodel-tags-single", namespace) + arr = np.array([[1, 2, 3]]) + r = grpc_request_ambassador( + "mymodel-tags-single", namespace, API_AMBASSADOR, data=arr + ) + res = json.loads(json_format.MessageToJson(r)) + logging.info(res) + # assert r.status_code == 200 + assert res["data"]["ndarray"] == ["model-1"] + assert res["meta"]["tags"] == {"common": 1, "model-1": "yes"} + run( + f"kubectl delete -f ../resources/tags_single_grpc.json -n {namespace}", + shell=True, + ) + + def test_model_graph_grpc(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "grpc") + create_push_s2i_image(s2i_python_version, "two", "grpc") + retry_run(f"kubectl apply -f ../resources/tags_graph_grpc.json -n {namespace}") + wait_for_status("mymodel-tags-graph", namespace) + wait_for_rollout("mymodel-tags-graph", namespace) + r = initial_grpc_request("mymodel-tags-graph", namespace) + arr = np.array([[1, 2, 3]]) + r = grpc_request_ambassador( + "mymodel-tags-graph", namespace, API_AMBASSADOR, data=arr + ) + res = json.loads(json_format.MessageToJson(r)) + logging.info(res) + # assert r.status_code == 200 + assert res["data"]["ndarray"] == ["model-2"] + assert res["meta"]["tags"] == {"common": 2, "model-1": "yes", "model-2": "yes"} + run( + f"kubectl delete -f ../resources/tags_graph_grpc.json -n {namespace}", + shell=True, + ) + + def test_model_combiner_grpc(self, namespace, s2i_python_version): + create_push_s2i_image(s2i_python_version, "one", "grpc") + create_push_s2i_image(s2i_python_version, "two", "grpc") + create_push_s2i_image(s2i_python_version, "combiner", "grpc") + retry_run( + f"kubectl apply -f ../resources/tags_combiner_grpc.json -n {namespace}" + ) + wait_for_status("mymodel-tags-combiner", namespace) + wait_for_rollout("mymodel-tags-combiner", namespace) + r = initial_grpc_request("mymodel-tags-combiner", namespace) + arr = np.array([[1, 2, 3]]) + r = grpc_request_ambassador( + "mymodel-tags-combiner", namespace, API_AMBASSADOR, data=arr + ) + res = json.loads(json_format.MessageToJson(r)) + logging.info(res) + # assert r.status_code == 200 + assert res["data"]["ndarray"] == [["model-1"], ["model-2"]] + assert res["meta"]["tags"] == { + "combiner": "yes", + "common": 2, + "model-1": "yes", + "model-2": "yes", + } + run( + f"kubectl delete -f ../resources/tags_combiner_grpc.json -n {namespace}", + shell=True, + )