Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved features and interoperability for SQLMetrics #679

Closed
alamb opened this issue Jul 5, 2021 · 5 comments · Fixed by #908
Closed

Improved features and interoperability for SQLMetrics #679

alamb opened this issue Jul 5, 2021 · 5 comments · Fixed by #908
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Jul 5, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Usecase:

  1. Connecting DataFusion metrics to existing state of the art metrics collection systems (prometheus, influxdb, opentelemetry)
  2. Gaining access to real time values of metrics (not just snapshots), for visualization as well as real time plan adjustments

When running plans with multiple operators of the same type (e.g. multiple HashJoinExec) there us currently be no way to programmatically gain access to the individual HashJoin's statistics. For example, in #662 the interface will return metrics with a single string like inputRows. For the usecase of printing metrics per operator, that is just fine, however for usecases across the whole plan it is less fine.

It is currently very awkward to create metrics with finer granularity (such as per partition of the hash join or parquet metrics per file). The current string = metric interface means you have to make a compound string key (such as "metrics for file foo") and then have to parse that key (as I did in #657)

Other metric systems such as prometheus, opentelemetry, and influxdb, allow for name=value pairs on each metric to address these problems. So when trying to integrate DataFusion metrics, it will be a challenge to integrate with these other systems

The other systems allow you to get access to things like:

operator=ParquetExec,filename="my_filename",partition_number=0 rows_scanned=100
operator=ParquetExec,filename="my_other_filename",partition_number=0 rows_scanned=200

or for hash join

operator=HashJoin,partition_number=0 rows_scanned=100
operator=HashJoin,partition_number=1 rows_scanned=200

Another challenge with the current metrics interface is that despite using Arc and atomic counters internally, the only external interface is to get a snapshot of the metrics. If it were to return the Arcs themselves, we could implement interactive visualizations showing how the metrics evolved over time.

Describe the solution you'd like

This, I propose the following changes to metrics to create them (with name/vale pairs)

let metrics = SQLMetric::counter("numRows")
  .with("partition", 1)
  .with("filename", "my_file");

let sub_metric = SQLMetric:counter("otherMetric")
  // inherit all name/value pairs on `metric`
  .with_family(metrics)
  .with("new_detail", "awesome");

And then the collection interface should return a list of these metrics rather than HashSet with the id. For example, rather than

pub trait ExecutionPlan {
...

    /// Return a snapshot of the metrics collected during execution
    fn metrics(&self) -> HashMap<String, SQLMetric> {
        HashMap::new()
    }

Something like

pub trait ExecutionPlan {
...

    /// Return the metrics for this execution
    fn metrics(&self) -> Vec<Arc<SQLMetric>> {
        HashMap::new()
    }

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

@alamb alamb added the enhancement New feature or request label Jul 5, 2021
@alamb
Copy link
Contributor Author

alamb commented Jul 5, 2021

FYI @andygrove @Dandandan

@andygrove
Copy link
Member

This looks great. One thing I would like to add to this is that we need a way to get aggregate metrics for a query plan when the query execution is distributed. My thought was to add an id to each operator (that is unique within one plan) so that the Ballista executors can return a hashmap of metrics to the scheduler after a query plan fragment has been executed.

@alamb
Copy link
Contributor Author

alamb commented Jul 5, 2021

is that we need a way to get aggregate metrics for a query plan when the query execution is distributed

Assigning a unique id (metrics id?) to each node in the plan (perhaps as a physical opimtizer pass) sounds like a good idea to me 👍

@Dandandan
Copy link
Contributor

This sounds great.

Another use case of programmatically accessing the statistics is for re-optimization of the query (join reordering, removing empty results from the plan, using statistics for count(*), etc.)

From this, at least the requirement should be that having an aggregate / aggregating the metrics / statistics should be straightforward.

@alamb
Copy link
Contributor Author

alamb commented Aug 16, 2021

FWIW I am making good progress on this feature and hope to have a PR up later today with a proposed implementation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants