Skip to content

Commit

Permalink
Fix tags propogation with seldon client (#3374)
Browse files Browse the repository at this point in the history
* client

* tags

* try catch

* fixes

* comment

* lint
  • Loading branch information
majolo authored Jul 8, 2021
1 parent e0a96b0 commit 87d78e7
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 40 deletions.
74 changes: 39 additions & 35 deletions python/seldon_core/batch_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _send_batch_predict_multi_request(
retries: int,
batch_id: str,
payload_type: str,
) -> str:
) -> [str]:
"""
Send an request using the Seldon Client with batch context including the
unique ID of the batch and the Batch enumerated index as metadata. This
Expand Down Expand Up @@ -297,13 +297,11 @@ def _send_batch_predict_multi_request(
instance_ids = [x[1] for x in input_data]

predict_kwargs = {}
meta = {
"tags": {
"batch_id": batch_id,
}
tags = {
"batch_id": batch_id,
}

predict_kwargs["meta"] = meta
predict_kwargs["meta"] = tags
predict_kwargs["headers"] = {"Seldon-Puid": instance_ids[0]}

try:
Expand All @@ -327,7 +325,7 @@ def _send_batch_predict_multi_request(
except Exception as e:
error_resp = {
"status": {"info": "FAILURE", "reason": str(e), "status": 1},
"meta": meta,
"meta": tags,
}
print("Exception: %s" % e)
str_output = json.dumps(error_resp)
Expand All @@ -343,26 +341,34 @@ def _send_batch_predict_multi_request(
tensor_ndarray = tensor.reshape(shape)

for i in range(len(input_data)):
new_response = copy.deepcopy(response)
if payload_type == "ndarray":
# Format new responses for each original prediction request
new_response["data"]["ndarray"] = [response["data"]["ndarray"][i]]
new_response["meta"]["tags"]["tags"]["batch_index"] = indexes[i]
new_response["meta"]["tags"]["tags"]["batch_instance_id"] = instance_ids[i]
responses.append(json.dumps(new_response))
elif payload_type == "tensor":
# Format new responses for each original prediction request
new_response["data"]["tensor"]["shape"][0] = 1
new_response["data"]["tensor"]["values"] = np.ndarray.tolist(
tensor_ndarray[i]
)
new_response["meta"]["tags"]["tags"]["batch_index"] = indexes[i]
new_response["meta"]["tags"]["tags"]["batch_instance_id"] = instance_ids[i]
responses.append(json.dumps(new_response))
else:
raise RuntimeError(
"Only `ndarray` and `tensor` input are currently supported for batch size greater than 1."
)
try:
new_response = copy.deepcopy(response)
if payload_type == "ndarray":
# Format new responses for each original prediction request
new_response["data"]["ndarray"] = [response["data"]["ndarray"][i]]
new_response["meta"]["tags"]["batch_index"] = indexes[i]
new_response["meta"]["tags"]["batch_instance_id"] = instance_ids[i]
responses.append(json.dumps(new_response))
elif payload_type == "tensor":
# Format new responses for each original prediction request
new_response["data"]["tensor"]["shape"][0] = 1
new_response["data"]["tensor"]["values"] = np.ndarray.tolist(
tensor_ndarray[i]
)
new_response["meta"]["tags"]["batch_index"] = indexes[i]
new_response["meta"]["tags"]["batch_instance_id"] = instance_ids[i]
responses.append(json.dumps(new_response))
else:
raise RuntimeError(
"Only `ndarray` and `tensor` input are currently supported for batch size greater than 1."
)
except Exception as e:
error_resp = {
"status": {"info": "FAILURE", "reason": str(e), "status": 1},
"meta": tags,
}
print("Exception: %s" % e)
responses.append(json.dumps(error_resp))

return responses

Expand Down Expand Up @@ -405,14 +411,12 @@ def _send_batch_predict(
"""

predict_kwargs = {}
meta = {
"tags": {
"batch_id": batch_id,
"batch_instance_id": batch_instance_id,
"batch_index": batch_idx,
}
tags = {
"batch_id": batch_id,
"batch_instance_id": batch_instance_id,
"batch_index": batch_idx,
}
predict_kwargs["meta"] = meta
predict_kwargs["meta"] = tags
predict_kwargs["headers"] = {"Seldon-Puid": batch_instance_id}
try:
data = json.loads(input_raw)
Expand All @@ -439,7 +443,7 @@ def _send_batch_predict(
except Exception as e:
error_resp = {
"status": {"info": "FAILURE", "reason": str(e), "status": 1},
"meta": meta,
"meta": tags,
}
print("Exception: %s" % e)
str_output = json.dumps(error_resp)
Expand Down
16 changes: 11 additions & 5 deletions python/seldon_core/seldon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ def predict(
http_path:
Custom http path for predict call to use
meta:
Custom meta map
Custom meta map, supplied as tags
client_return_type
the return type of all functions can be either dict or proto
raw_data
Expand Down Expand Up @@ -1208,6 +1208,7 @@ def rest_predict_seldon(
names: Iterable[str] = None,
client_return_type: str = "proto",
raw_data: Dict = None,
meta: Dict = {},
**kwargs,
) -> SeldonClientPrediction:
"""
Expand Down Expand Up @@ -1235,28 +1236,33 @@ def rest_predict_seldon(
the return type of all functions can be either dict or proto
raw_data
Raw payload (dictionary) given by the user
meta
Custom meta data map, supplied as tags
kwargs
Returns
-------
Seldon Client Prediction
"""
metaKV = prediction_pb2.Meta()
metaJson = {"tags": meta}
json_format.ParseDict(metaJson, metaKV)
if raw_data:
request = json_to_seldon_message(raw_data)
payload = raw_data
else:
if bin_data is not None:
request = prediction_pb2.SeldonMessage(binData=bin_data)
request = prediction_pb2.SeldonMessage(binData=bin_data, meta=metaKV)
elif str_data is not None:
request = prediction_pb2.SeldonMessage(strData=str_data)
request = prediction_pb2.SeldonMessage(strData=str_data, meta=metaKV)
elif json_data is not None:
request = json_to_seldon_message({"jsonData": json_data})
else:
if data is None:
data = np.random.rand(*shape)
datadef = array_to_grpc_datadef(payload_type, data, names=names)
request = prediction_pb2.SeldonMessage(data=datadef)
request = prediction_pb2.SeldonMessage(data=datadef, meta=metaKV)
payload = seldon_message_to_json(request)

response_raw = requests.post(
Expand Down Expand Up @@ -1808,7 +1814,7 @@ def grpc_predict_gateway(
channel_credentials
Channel credentials - see SeldonChannelCredentials
meta
Custom meta data map
Custom meta data map, supplied as tags
client_return_type
the return type of all functions can be either dict or proto
raw_data
Expand Down

0 comments on commit 87d78e7

Please sign in to comment.