Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding elastic loggging for drift detector data #3560

Merged
merged 7 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion components/seldon-request-logger/app/default_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,43 @@ def process_and_update_elastic_doc(
elastic_object, message_type, item_body, item_request_id, index_name
)
index = index + 1
elif "data" in new_content_part and message_type == "drift":
SachinVarghese marked this conversation as resolved.
Show resolved Hide resolved
item_body = doc_body.copy()

namespace = log_helper.get_header(log_helper.NAMESPACE_HEADER_NAME, headers)
inferenceservice_name = log_helper.get_header(log_helper.INFERENCESERVICE_HEADER_NAME, headers)
endpoint_name = log_helper.get_header(log_helper.ENDPOINT_HEADER_NAME, headers)
serving_engine = log_helper.serving_engine(headers)
item_body[message_type]["data"]["is_drift"] = bool(item_body[message_type]["data"]["is_drift"])
item_body[message_type]["data"]["drift_type"] = "batch"
if (
"distance" in item_body[message_type]["data"]
and item_body[message_type]["data"]["distance"] is not None
and isinstance(item_body[message_type]["data"]["distance"], list)
):
content_dist = np.array(item_body[message_type]["data"]["distance"])
x = np.expand_dims(content_dist, axis=0)
item_body[message_type]["data"]["drift_type"] = "feature"
item_body[message_type]["data"]["distance"] = createElelmentsArray(x, None, namespace, serving_engine, inferenceservice_name, endpoint_name, "request")
if (
"p_val" in item_body[message_type]["data"]
and item_body[message_type]["data"]["p_val"] is not None
and isinstance(item_body[message_type]["data"]["p_val"], list)
):
content_dist = np.array(item_body[message_type]["data"]["p_val"])
x = np.expand_dims(content_dist, axis=0)
item_body[message_type]["data"]["drift_type"] = "feature"
item_body[message_type]["data"]["p_val"] = createElelmentsArray(x, None, namespace, serving_engine, inferenceservice_name, endpoint_name, "request")
detectorName=None
ce_source = item_body[message_type]["ce-source"]
if ce_source.startswith("io.seldon.serving."):
detectorName = ce_source[len("io.seldon.serving."):]
elif ce_source.startswith("org.kubeflow.serving."):
detectorName = ce_source[len("org.kubeflow.serving."):]
index_name = log_helper.build_index_name(request.headers, message_type, False, detectorName)
upsert_doc_to_elastic(
elastic_object, message_type, item_body, request_id, index_name
)
else:
print("unexpected data format")
print(new_content_part)
Expand Down Expand Up @@ -477,7 +514,6 @@ def extractRow(
reqJson["dataType"] = dataType
return reqJson


def createElelmentsArray(X: np.ndarray, names: list, namespace_name, serving_engine, inferenceservice_name, endpoint_name, message_type):
metadata_schema = None

Expand Down
39 changes: 24 additions & 15 deletions components/seldon-request-logger/app/log_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,46 @@ def extract_request_id(headers):
request_id = headers.get(CLOUD_EVENT_ID)
return request_id


def build_index_name(headers):
def build_index_name(headers, prefix = None, suffix = True, name_override = None):
# use a fixed index name if user chooses to do so
index_name = os.getenv("INDEX_NAME")
if index_name:
return index_name

if prefix is None:
prefix = "inference"
SachinVarghese marked this conversation as resolved.
Show resolved Hide resolved
# Adding seldon_environment (dev/test/staging/prod) to index_name if defined as a environment variable
seldon_environment = os.getenv("SELDON_ENVIRONMENT")
if seldon_environment:
index_name = "inference-log-" + seldon_environment + "-" + serving_engine(headers)
index_name = prefix+"-log-" + seldon_environment + "-" + serving_engine(headers)
else:
index_name = "inference-log-" + serving_engine(headers)

index_name = prefix+"-log-" + serving_engine(headers)
# otherwise create an index per deployment
# index_name = "inference-log-" + serving_engine(headers)
namespace = clean_header(NAMESPACE_HEADER_NAME, headers)
if not namespace:
index_name = index_name + "-unknown-namespace"
else:
index_name = index_name + "-" + namespace
inference_service_name = clean_header(INFERENCESERVICE_HEADER_NAME, headers)
# won't get inference service name for older kfserving versions i.e. prior to https://github.com/kubeflow/kfserving/pull/699/
if not inference_service_name:
inference_service_name = clean_header(MODELID_HEADER_NAME, headers)

if name_override is None:
inference_service_name = clean_header(INFERENCESERVICE_HEADER_NAME, headers)
# won't get inference service name for older kfserving versions i.e. prior to https://github.com/kubeflow/kfserving/pull/699/
SachinVarghese marked this conversation as resolved.
Show resolved Hide resolved
if not inference_service_name:
inference_service_name = clean_header(MODELID_HEADER_NAME, headers)
else:
inference_service_name = name_override
if not inference_service_name:
index_name = index_name + "-unknown-inferenceservice"
else:
index_name = index_name + "-" + inference_service_name

endpoint_name = clean_header(ENDPOINT_HEADER_NAME, headers)
if not endpoint_name:
index_name = index_name + "-unknown-endpoint"
else:
index_name = index_name + "-" + endpoint_name
if suffix:
endpoint_name = clean_header(ENDPOINT_HEADER_NAME, headers)
if not endpoint_name:
index_name = index_name + "-unknown-endpoint"
else:
index_name = index_name + "-" + endpoint_name

return index_name

Expand All @@ -96,6 +100,11 @@ def parse_message_type(type_header):
or type_header == "org.kubeflow.serving.inference.outlier"
):
return "outlier"
if (
type_header == "io.seldon.serving.inference.drift"
or type_header == "org.kubeflow.serving.inference.drift"
):
return "drift"
return "unknown"


Expand Down
4 changes: 2 additions & 2 deletions components/seldon-request-logger/app/log_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def fetch_metadata(namespace, serving_engine, inferenceservice_name, predictor_n

if runtime_metadata is not None and runtime_metadata and \
runtime_metadata.runtime_metadata is not None and runtime_metadata.runtime_metadata:
print(runtime_metadata.runtime_metadata)
# print(runtime_metadata.runtime_metadata)
if len(runtime_metadata.runtime_metadata) == 0:
print('no runtime metadata for '+namespace+'/'+inferenceservice_name)
return None
Expand All @@ -206,7 +206,7 @@ def fetch_metadata(namespace, serving_engine, inferenceservice_name, predictor_n
return None

print('prediction schema for '+namespace+'/'+inferenceservice_name)
print(model_metadata.models[0].prediction_schema)
# print(model_metadata.models[0].prediction_schema)
SachinVarghese marked this conversation as resolved.
Show resolved Hide resolved
if model_metadata.models[0].prediction_schema:
return model_metadata.models[0].prediction_schema.to_dict()
else:
Expand Down