-
Notifications
You must be signed in to change notification settings - Fork 130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Model Monitoring Inference Aggregator block #818
base: main
Are you sure you want to change the base?
Conversation
description="Reference data to extract property from", | ||
examples=["$steps.my_step.predictions"], | ||
) | ||
frequency: Union[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered using the RateLimiter block, but this export block would always require that as a dependency. I'm not sure if there's a pattern for making another block required to run, so I just added this to the params for now, open to doing it another way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is misleading, as it suggests compaction of predictions to be sent in batches, which does not take place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah maybe something like SamplePredictions, ReportPredictionsSample, ModelMonitoringPredictionsSampler
"model_type": detections.data.get("prediction_type", [""])[i], | ||
} | ||
results.append(formatted_det) | ||
elif isinstance(detections, dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What case would predictions be returned as a dict? I saw in other Blocks that predictions will either be of type sv.Detections or dict
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have general concern with this PR - what is the quality of data acquired in that way? Is that meaningful monitoring at the end of the day? What do we want to display as the result of running monitoring about a week-long stream? How would people understand that something is wrong with the model?
I mean, we have stream running and we sub sample of predictions based on time, suggesting to update once per few seconds. We do it as this is basically unfeasible to push through the wire all predictions - but maybe - in this scenario it makes sense to compute aggregates of predictions on the client end and submit compacted results once per interval?
def get_manifest(cls) -> Type[WorkflowBlockManifest]: | ||
return BlockManifest | ||
|
||
def is_in_reporting_range(self, frequency: int) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please move into usage first, then declaration (run(...)
method first
def is_in_reporting_range(self, frequency: int) -> bool: | ||
now = datetime.now() | ||
last_report_time_str = self._cache.get(LAST_REPORT_TIME_CACHE_KEY) | ||
print( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove print statements
self, | ||
fire_and_forget: bool, | ||
predictions: Union[sv.Detections, dict], | ||
frequency: int = 3, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
defaults should not be set here, but in manifest
BLOCK_NAME = "Roboflow Model Monitoring Exporter" | ||
|
||
|
||
class BlockManifest(WorkflowBlockManifest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please follow migration guide: https://inference.roboflow.com/workflows/execution_engine_changelog/#execution-engine-v130-inference-v0270
) | ||
type: Literal[ | ||
"roboflow_core/roboflow_model_monitoring_exporter@v1", | ||
BLOCK_NAME, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove name aliasing, this is not needed for new blocks
"error_status": False, | ||
"message": "Not in reporting range, skipping report. (Ok)", | ||
} | ||
if self._api_key is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be validated first - fail fast approach - clearer error handling
|
||
|
||
# TODO: maybe make this a helper or decorator, it's used in multiple places | ||
def get_workspace_name( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remember about this and if I see re-ocurring pattern of usage, I will move to common inference utils
description="Reference data to extract property from", | ||
examples=["$steps.my_step.predictions"], | ||
) | ||
frequency: Union[ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name is misleading, as it suggests compaction of predictions to be sent in batches, which does not take place
|
||
def is_in_reporting_range(self, frequency: int) -> bool: | ||
now = datetime.now() | ||
last_report_time_str = self._cache.get(LAST_REPORT_TIME_CACHE_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
global key? how would that work on hosted platform?
This is particularly meant for inferences made by InferencePipeline in edge deployments. Model Monitoring currently does not get any data from InferencePipeline. This is because of the large volume of requests made by IP would overload our system. So the goal here is not to have comprehensive inference results data or an aggregation of it (that's coming later), but more of a health/status check at a regular interval that IP is running and making inferences. Is there a recommended way of doing this?
Aggregating is fine, but we were trying to defer doing aggregations until we had a better idea of what type of aggregation makes sense for this use case. I thought the Aggregator block could be used in conjunction with this but maybe not? |
is model monitoring in given form even acceptable to run on video? |
Today we need to close the topic, otherwise |
self._cache = cache | ||
self._background_tasks = background_tasks | ||
self._thread_pool_executor = thread_pool_executor | ||
self._last_report_time_cache_key = "roboflow_model_monitoring_last_report_time" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that would have a side effect for all blocks of this type to have shared state of last report time - is that desired end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh no, the intent was for each instance of the block to have it's own last_report_time. Should I add a random string to the key name to make that happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- if the expectation is that it works at hosted, I would use redis with the following structure of keys
workflows:steps_cache:roboflow_core/model_monitoring_inference_aggregator@v1:{uuid4()}:last_report_time
last_report_time = now | ||
else: | ||
last_report_time = datetime.fromisoformat(last_report_time_str) | ||
time_elapsed = int((now - last_report_time).total_seconds()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type conversion to int seems not to be needed
) | ||
|
||
|
||
def format_sv_detections_for_model_monitoring( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please change the name to reflect that sv.Detections
and cls results are handled
) -> List[Prediction]: | ||
results = [] | ||
if isinstance(detections, sv.Detections): | ||
num_detections = len(detections.data.get("detection_id", [])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is an iter in sv.Detections
for detection in detections:
would be easier and less error-prone with getting the list of size 1, that seems to fail with strange error if other blocks break the contract
if system_info: | ||
for key, value in system_info.items(): | ||
inference_data[key] = value | ||
inference_data["inference_results"] = predictions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a strange feeling it may fail on serialisation - predictions
is List of dataclasses which does not get automatically converted into json afaik
LONG_DESCRIPTION = """ | ||
This block periodically reports an aggregated sample of inference results to Roboflow Model Monitoring. | ||
|
||
It aggregates predictions in memory between reports and then sends a representative sample of predictions at a regular interval specified by the `frequency` parameter. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would elaborate on what we understand by representative - seems like "most confident for given class"
Description
Type of change
Please delete options that are not relevant.
How has this change been tested, please provide a testcase or example of how you tested the change?
Any specific deployment considerations
Docs