Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reduce number of times connecting to elastic #2120

Merged
merged 4 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 144 additions & 68 deletions components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from flask import Flask, request
from seldon_core.utils import json_to_seldon_message, extract_request_parts, array_to_grpc_datadef, seldon_message_to_json
from seldon_core.utils import (
json_to_seldon_message,
extract_request_parts,
array_to_grpc_datadef,
seldon_message_to_json,
)
from seldon_core.proto import prediction_pb2
import numpy as np
import json
Expand All @@ -10,14 +15,16 @@

MAX_PAYLOAD_BYTES = 300000
app = Flask(__name__)
print('starting logger')
print("starting logger")
sys.stdout.flush()

log = logging.getLogger('werkzeug')
log = logging.getLogger("werkzeug")
log.setLevel(logging.ERROR)

es = log_helper.connect_elasticsearch()

@app.route("/", methods=['GET','POST'])

@app.route("/", methods=["GET", "POST"])
def index():

request_id = log_helper.extract_request_id(request.headers)
Expand All @@ -32,7 +39,16 @@ def index():

body_length = request.headers.get(log_helper.LENGTH_HEADER_NAME)
if body_length and int(body_length) > int(max_payload_bytes):
too_large_message = 'body too large for '+index_name+"/"+log_helper.DOC_TYPE_NAME+"/"+ request_id+ ' adding '+message_type
too_large_message = (
"body too large for "
+ index_name
+ "/"
+ log_helper.DOC_TYPE_NAME
+ "/"
+ request_id
+ " adding "
+ message_type
)
print(too_large_message)
sys.stdout.flush()
return too_large_message
Expand All @@ -46,113 +62,161 @@ def index():
# print('----')
# sys.stdout.flush()

es = log_helper.connect_elasticsearch()


try:

#now process and update the doc
doc = process_and_update_elastic_doc(es, message_type, body, request_id,request.headers, index_name)
# now process and update the doc
doc = process_and_update_elastic_doc(
es, message_type, body, request_id, request.headers, index_name
)

return str(doc)
except Exception as ex:
print(ex)
sys.stdout.flush()
return 'problem logging request'
return "problem logging request"


def process_and_update_elastic_doc(elastic_object, message_type, message_body, request_id, headers, index_name):
def process_and_update_elastic_doc(
elastic_object, message_type, message_body, request_id, headers, index_name
):

if message_type == 'unknown':
print('UNKNOWN REQUEST TYPE FOR '+request_id+' - NOT PROCESSING')
if message_type == "unknown":
print("UNKNOWN REQUEST TYPE FOR " + request_id + " - NOT PROCESSING")
sys.stdout.flush()

#first do any needed transformations
# first do any needed transformations
new_content_part = process_content(message_type, message_body)

#set metadata to go just in this part (request or response) and not top-level
log_helper.field_from_header(content=new_content_part,header_name='ce-time',headers=headers)
log_helper.field_from_header(content=new_content_part, header_name='ce-source', headers=headers)
# set metadata to go just in this part (request or response) and not top-level
log_helper.field_from_header(
content=new_content_part, header_name="ce-time", headers=headers
)
log_helper.field_from_header(
content=new_content_part, header_name="ce-source", headers=headers
)

doc_body = {
message_type: new_content_part
}
doc_body = {message_type: new_content_part}

log_helper.set_metadata(doc_body,headers,message_type,request_id)
log_helper.set_metadata(doc_body, headers, message_type, request_id)

# req or res might be batches of instances so split out into individual docs
if "instance" in new_content_part:

if type(new_content_part["instance"]) == type([]):
#if we've a list then this is batch
#we assume first dimension is always batch
# if we've a list then this is batch
# we assume first dimension is always batch

no_items_in_batch = len(new_content_part["instance"])
index = 0
for item in new_content_part["instance"]:

item_body = doc_body.copy()

item_body[message_type]['instance'] = item
item_request_id = build_request_id_batched(request_id,no_items_in_batch,index)
upsert_doc_to_elastic(elastic_object,message_type,item_body,item_request_id,index_name)
item_body[message_type]["instance"] = item
item_request_id = build_request_id_batched(
request_id, no_items_in_batch, index
)
upsert_doc_to_elastic(
elastic_object, message_type, item_body, item_request_id, index_name
)
index = index + 1
else:
item_request_id = build_request_id_batched(request_id, 1, 0)
upsert_doc_to_elastic(elastic_object, message_type, doc_body, item_request_id, index_name)
elif "data" in new_content_part and message_type == 'outlier':
upsert_doc_to_elastic(
elastic_object, message_type, doc_body, item_request_id, index_name
)
elif "data" in new_content_part and message_type == "outlier":
no_items_in_batch = len(doc_body[message_type]["data"]["is_outlier"])
index = 0
for item in doc_body[message_type]["data"]["is_outlier"]:
item_body = doc_body.copy()
item_body[message_type]["data"]["is_outlier"] = item
if "feature_score" in item_body[message_type]["data"] and item_body[message_type]["data"]["feature_score"] is not None and len(item_body[message_type]["data"]["feature_score"]) == no_items_in_batch:
item_body[message_type]["data"]["feature_score"] = item_body[message_type]["data"]["feature_score"][index]
if "instance_score" in item_body[message_type]["data"] and item_body[message_type]["data"]["instance_score"] is not None and len(item_body[message_type]["data"]["instance_score"]) == no_items_in_batch:
item_body[message_type]["data"]["instance_score"] = item_body[message_type]["data"]["instance_score"][index]
item_request_id = build_request_id_batched(request_id, no_items_in_batch, index)
upsert_doc_to_elastic(elastic_object, message_type, item_body, item_request_id, index_name)
if (
"feature_score" in item_body[message_type]["data"]
and item_body[message_type]["data"]["feature_score"] is not None
and len(item_body[message_type]["data"]["feature_score"])
== no_items_in_batch
):
item_body[message_type]["data"]["feature_score"] = item_body[
message_type
]["data"]["feature_score"][index]
if (
"instance_score" in item_body[message_type]["data"]
and item_body[message_type]["data"]["instance_score"] is not None
and len(item_body[message_type]["data"]["instance_score"])
== no_items_in_batch
):
item_body[message_type]["data"]["instance_score"] = item_body[
message_type
]["data"]["instance_score"][index]
item_request_id = build_request_id_batched(
request_id, no_items_in_batch, index
)
upsert_doc_to_elastic(
elastic_object, message_type, item_body, item_request_id, index_name
)
index = index + 1
else:
print('unexpected data format')
print("unexpected data format")
print(new_content_part)
return


def build_request_id_batched(request_id, no_items_in_batch, item_index):
item_request_id = request_id
if no_items_in_batch > 1:
item_request_id = item_request_id + "-item-" + str(item_index)
return item_request_id

def upsert_doc_to_elastic(elastic_object, message_type, upsert_body, request_id, index_name):

def upsert_doc_to_elastic(
elastic_object, message_type, upsert_body, request_id, index_name
):
upsert_doc = {
"doc_as_upsert": True,
"doc": upsert_body,
}
new_content = elastic_object.update(index=index_name, doc_type=log_helper.DOC_TYPE_NAME, id=request_id,
body=upsert_doc, retry_on_conflict=3, refresh=True, timeout="60s")
print('upserted to doc ' + index_name + "/" + log_helper.DOC_TYPE_NAME + "/" + request_id + ' adding ' + message_type)
new_content = elastic_object.update(
index=index_name,
doc_type=log_helper.DOC_TYPE_NAME,
id=request_id,
body=upsert_doc,
retry_on_conflict=3,
refresh=True,
timeout="60s",
)
print(
"upserted to doc "
+ index_name
+ "/"
+ log_helper.DOC_TYPE_NAME
+ "/"
+ request_id
+ " adding "
+ message_type
)
sys.stdout.flush()
return str(new_content)


# take request or response part and process it by deriving metadata
def process_content(message_type,content):
def process_content(message_type, content):

if content is None:
print('content is empty')
print("content is empty")
sys.stdout.flush()
return content

#if we have dataType then have already parsed before
# if we have dataType then have already parsed before
if "dataType" in content:
print('dataType already in content')
print("dataType already in content")
sys.stdout.flush()
return content

requestCopy = content.copy()

#extract data part out and process for req or resp - handle differently later for outlier
if message_type == 'request' or message_type == 'response':
# extract data part out and process for req or resp - handle differently later for outlier
if message_type == "request" or message_type == "response":
requestCopy = extract_data_part(content)

return requestCopy
Expand Down Expand Up @@ -189,8 +253,8 @@ def extract_data_part(content):

(req_features, _, req_datadef, req_datatype) = extract_request_parts(requestMsg)

#set sensible defaults for non-tabular dataTypes
#tabular should be iterable and get inferred through later block
# set sensible defaults for non-tabular dataTypes
# tabular should be iterable and get inferred through later block
if req_datatype == "strData":
copy["dataType"] = "text"
if req_datatype == "binData":
Expand All @@ -203,23 +267,25 @@ def extract_data_part(content):
if isinstance(elements, Iterable):

for i, e in enumerate(elements):
reqJson = extractRow(i, requestMsg, req_datatype, req_features, req_datadef)
reqJson = extractRow(
i, requestMsg, req_datatype, req_features, req_datadef
)
reqJson["elements"] = e
copy = reqJson

copy["instance"] = json.loads(json.dumps(req_features, cls=log_helper.NumpyEncoder))
copy["instance"] = json.loads(
json.dumps(req_features, cls=log_helper.NumpyEncoder)
)

if isinstance(req_features, np.ndarray):
set_datatype_from_numpy(req_features, copy, req_features.item(0))



#copy names into its own section of request
# copy names into its own section of request
if "data" in content:
if "names" in content["data"]:
copy["names"] = content["data"]["names"]

#should now have processed content of seldon message so don't want its various constructs on top-level anymore
# should now have processed content of seldon message so don't want its various constructs on top-level anymore
if "data" in copy:
del copy["data"]
if "strData" in copy:
Expand All @@ -229,7 +295,7 @@ def extract_data_part(content):
if "binData" in copy:
del copy["binData"]

copy['payload'] = content
copy["payload"] = content

return copy

Expand All @@ -241,7 +307,7 @@ def set_datatype_from_numpy(content_np, copy, first_element):
if first_element is not None and isinstance(first_element, (int, float)):
copy["dataType"] = "number"
if content_np.shape is not None and len(content_np.shape) > 1:
#first dim is batch so second reveals whether instance is array
# first dim is batch so second reveals whether instance is array
if content_np.shape[1] > 1:
copy["dataType"] = "tabular"
if len(content_np.shape) > 2:
Expand All @@ -250,19 +316,28 @@ def set_datatype_from_numpy(content_np, copy, first_element):
copy["dataType"] = "image"



def extractRow(i:int,requestMsg: prediction_pb2.SeldonMessage,req_datatype: str,req_features: np.ndarray,req_datadef: "prediction_pb2.SeldonMessage.data"):
def extractRow(
i: int,
requestMsg: prediction_pb2.SeldonMessage,
req_datatype: str,
req_features: np.ndarray,
req_datadef: "prediction_pb2.SeldonMessage.data",
):
datatyReq = "ndarray"
dataType = "tabular"
if len(req_features.shape) == 2:
dataReq = array_to_grpc_datadef(datatyReq, np.expand_dims(req_features[i], axis=0), req_datadef.names)
dataReq = array_to_grpc_datadef(
datatyReq, np.expand_dims(req_features[i], axis=0), req_datadef.names
)
else:
if len(req_features.shape) > 2:
dataType="image"
dataType = "image"
else:
dataType="text"
req_features= np.char.decode(req_features.astype('S'),"utf-8")
dataReq = array_to_grpc_datadef(datatyReq, np.expand_dims(req_features[i], axis=0), req_datadef.names)
dataType = "text"
req_features = np.char.decode(req_features.astype("S"), "utf-8")
dataReq = array_to_grpc_datadef(
datatyReq, np.expand_dims(req_features[i], axis=0), req_datadef.names
)
requestMsg2 = prediction_pb2.SeldonMessage(data=dataReq, meta=requestMsg.meta)
reqJson = {}
reqJson["payload"] = seldon_message_to_json(requestMsg2)
Expand All @@ -272,15 +347,15 @@ def extractRow(i:int,requestMsg: prediction_pb2.SeldonMessage,req_datatype: str,
return reqJson


def createElelmentsArray(X: np.ndarray,names: list):
def createElelmentsArray(X: np.ndarray, names: list):
results = None
if isinstance(X,np.ndarray):
if isinstance(X, np.ndarray):
if len(X.shape) == 1:
results = []
for i in range(X.shape[0]):
d = {}
for num, name in enumerate(names, start=0):
if isinstance(X[i],bytes):
if isinstance(X[i], bytes):
d[name] = X[i].decode("utf-8")
else:
d[name] = X[i]
Expand All @@ -290,9 +365,10 @@ def createElelmentsArray(X: np.ndarray,names: list):
for i in range(X.shape[0]):
d = {}
for num, name in enumerate(names, start=0):
d[name] = np.expand_dims(X[i,num], axis=0).tolist()
d[name] = np.expand_dims(X[i, num], axis=0).tolist()
results.append(d)
return results


if __name__ == "__main__":
app.run(host='0.0.0.0', port=8080)
app.run(host="0.0.0.0", port=8080)
Loading