Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

computing correlation for aggregated results on fly #27983

Closed
panffeng opened this issue Dec 25, 2017 · 11 comments
Closed

computing correlation for aggregated results on fly #27983

panffeng opened this issue Dec 25, 2017 · 11 comments
Assignees
Labels
:Analytics/Aggregations Aggregations >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)

Comments

@panffeng
Copy link

panffeng commented Dec 25, 2017

Describe the feature: Elasticsearch is often used for data and time series analysis. Yet, some common functions like computing correlation between 2 time series are not supported.

The matrix_stats aggregations can only use fields in the documents. (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-matrix-stats-aggregation.html) Frequently, the time series are aggregated on fly. To write down intermediate results as new documents is not a good option.

Script aggregation is another choice to implement correlation. Yet the script to compute correlation would complicate the overall processing. (https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-pipeline-bucket-script-aggregation.html)

Currently, it would need to do this calculation client-side or indeed implement it in a plugin.

Suppose correl aggregation is directly available and it accepts aggregated results as inputs. A simple query would give out the correlation. For example, the following DSL would compute correlation for the monthly total Sales and monthly red cars sales. ( The data set is about car sales at https://www.elastic.co/guide/en/elasticsearch/guide/current/_aggregation_test_drive.html )

{
    "size": 0,
    "aggs": {
        "sales_per_month": {
            "date_histogram": {
                "field": "sold",
                "interval": "month"
            },
            "aggs": {
                "total_sales": {
                    "sum": {
                        "field": "price"
                    }
                },
                "red_cars": {
                    "filter": {
                        "term": {
                            "color": "red"
                        }
                    },
                    "aggs": {
                        "sales": {
                            "sum": {
                                "field": "price"
                            }
                        }
                    }
                },
                "correlation": {
                    "correl": {
                        "buckets_path": {
                            "redCarsSales": "red_cars>sales",
                            "totalSales": "total_sales"
                        },
                        "lag": 1
                    }
                }
            }
        }
    }
}
@polyfractal
Copy link
Contributor

Hi @panffeng! We chatted about this and we'd really like to have a generic correlation pipeline agg, but unsure when/if we'll get time to work on it. We'd be happy to work with you on a PR though, if you were wanting to contribute some code.

I started an autocorrelation pipeline aggregation back in the 2.x age (#10377). In the PR, I opted to use the FFT approach since it's O(n log n) rather the non-FFT O(n2). But that required an external library dependency (I used JTransforms at the time), which we wanted to avoid and so the PR was never merged.

Moving forward, I think we have some options:

  1. Implement correlation agg using the naive O(n2) approach. Pipeline aggs generally operate on small magnitudes anyway (thousands of buckets), so the expensive runtime complexity may not be a concern in practice
  2. Implement using FFT + library dependency. The new plugin SPI work allows plugins to extend the framework and include their own dependencies without "polluting" the core with dependencies. We'd just have to modify pipeline aggs to allow extending their functionality through SPI

@panffeng
Copy link
Author

Hi @polyfractal , good to know that the correlation is a candidate feature. I would like to work with you guys for a PR.

I guess we can take a progressive approach for the two options. First, for a small batch of buckets in a few thousand, naive approach should work. Then, for a large batch of buckets, we can do fft for the time series, compute the inner product, and do reverse fft.

I checked your PR. The PR was based on 2.x. So now, we should base on current master, right? Also the PR was mainly about ACF. Now we are going to implement a generic correlation including ACF, aren't we?

@colings86 colings86 assigned polyfractal and unassigned colings86 Jan 15, 2018
@polyfractal
Copy link
Contributor

Sounds good!

I guess we can take a progressive approach for the two options. First, for a small batch of buckets in a few thousand, naive approach should work. Then, for a large batch of buckets, we can do fft for the time series, compute the inner product, and do reverse fft.

This seems reasonable. I never tested the naive approach, perhaps it is fast enough... we can do some benchmarks to see if we need to go through the effort of the fft approach. Although @colings86 reminded me that a few thousand buckets turns into a million iterations with O(n2), so I guess we'll see :)

I checked your PR. The PR was based on 2.x. So now, we should base on current master, right?

Correct, we'd want to target master for the new PR. If/once merged, we can backport it to the appropriate branches. No need to use that old PR code either, it's probably so hopelessly out of date it isn't even worth looking at.

Also the PR was mainly about ACF. Now we are going to implement a generic correlation including ACF, aren't we?

++ I think it makes more sense to do a generic correlation, more widely useful.

I wonder if we should make the API syntax a bit more explicit? buckets_path in other aggs lets you specify as many paths as you want (and to name them for use in scripts), but here we only want two. Maybe something like:

"correl": {
  "first_series_path": "red_cars>sales",
  "second_series_path": "total_sales",
  "lag": 1
}

Not great naming, but that's the idea. What do you think?

Also, I wonder if we should support multiple lags (e.g. "lag": [1,2,3,4,5]), so that correlogram plots can be created with a single aggregation? Not sure how much that'd complicate the code or math, haven't looked at this sort of thing in a long time :)

@panffeng
Copy link
Author

Hi @polyfractal, I started writing codes for the aggregation. I will take the suggestions for 2 time series and multiple lags into consideration and implementation.

I got an issue here. It's about the aggregation type and the result position.

If the correlation is a sub aggregation of a date_histogram, then the correlation will extend AbstractPipelineAggregationBuilder and put the correlation result within the result of date histogram. It would be confusing for the correlation within the results of 2 time series.

We can make the correlation sibling aggregation of a date_histogram. Then the correlation will extend BucketMetricsPipelineAggregationBuilder and put the correlation result in sibling of the result of date histogram. However, the method reduce in class BucketMetricsPipelineAggregator specifies bucketsPaths()[0], which uses only the first buckets path. For a generic correlation, we use at least 2 buckets path. So we would have to introduce a new Aggregator, say MultipleBucketMetricsPipelineAggregator, for this case?

Another question is about missing values or nulls in the time series. We will just drop the pair if either of values is missing or null as R cor and Excel CORREL operators do, won't we?

@colings86
Copy link
Contributor

@panffeng note that a sibling pipeline aggregator does not need to extend BucketMetricsPipelineAggregationBuilder for the builder it can just extend AbstractPipelineAggregationBuilder. The requirement for a sibling pipeline aggregation is that the aggregator class extend SiblingPipelineAggregator

@panffeng
Copy link
Author

Hi @colings86 , thanks. I will look into the class AbstractPipelineAggregationBuilder.

@polyfractal
Copy link
Contributor

Another question is about missing values or nulls in the time series. We will just drop the pair if either of values is missing or null as R cor and Excel CORREL operators do, won't we?

I think we should follow how other pipeline aggs and use the gap_policy parameter to control the behavior. Defaulting to skip seems appropriate, and would behave as you mentioned: drop the pair if either values are missing/null.

The insert_zero gap policy probably won't get used much with this aggregation since it doesn't make sense, but it'll be consistent with the other aggs to support it. And in the future we could add more intelligent gap policies that are suited for correlation (replacing with mean, expectation-maximization, nearest-neighbor, etc etc)

@panffeng
Copy link
Author

Hi @polyfractal, I wrote a very basic version with basic test here panffeng@d64f3b2

The correlation aggregator extends SiblingPipelineAggregator directly.

Multiple lags are not yet supported. Lags are used much more frequent for autocorrelation. So my plan is to compute autocorrelation with lags and to compute cross correlation with zero lag. Does this make sense?

Also, is there any coding example for the new plugin SPI you mentioned earlier? Or is it for plugins only? I want to add the implementation of FFT approach.

Thanks.

@markharwood
Copy link
Contributor

cc @elastic/es-search-aggs

@polyfractal
Copy link
Contributor

Hiya @panffeng, sorry for the delay. This slipped through my inbox and I didn't notice your reply.

We don't have any examples of a plugin using a pipeline aggregation, the closest is a module that adds the matrix-stats aggregation (https://github.com/elastic/elasticsearch/tree/master/modules/aggs-matrix-stats). That would contain a lot of similar boilerplate, but probably not identical. There may be parts that just don't work with trying to plug a pipeline agg in yet either... I'm not sure, would have to dig into the code closer.

I quickly skimmed your commit, only note is that if possible you could try using the newer static parser style (like this: https://github.com/elastic/elasticsearch/blob/master/server/src/main/java/org/elasticsearch/search/aggregations/pipeline/movfn/MovFnPipelineAggregationBuilder.java#L62). It tends to be easier to read since it's more compact. It isn't always possible to use however, depending on what you need to do.

It'd probably be easiest to move this forward as a PR so we can comment on it directly and help out, if you're still interested in working on it.

Sorry again for the delay!

@rjernst rjernst added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label May 4, 2020
@polyfractal
Copy link
Contributor

Closing as this seems to have stalled. If this is still of interest, feel free to open a PR and the team can revisit!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Analytics/Aggregations Aggregations >feature Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo)
Projects
None yet
Development

No branches or pull requests

7 participants