Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: implement sql stats compaction for clean up job #69302

Closed
Tracked by #64743
Azhng opened this issue Aug 24, 2021 · 3 comments
Closed
Tracked by #64743

sql: implement sql stats compaction for clean up job #69302

Azhng opened this issue Aug 24, 2021 · 3 comments
Labels
A-sql-observability Related to observability of the SQL layer C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) no-issue-activity X-stale

Comments

@Azhng
Copy link
Contributor

Azhng commented Aug 24, 2021

Currently, SQL Stats clean up job implements a very simple heuristics that deletes the rows with oldest aggregated_ts.

We would want to eventually implement compaction of the older rows by merging multiple rows into a single row. This means we can store more data for longer duration.

Jira issue: CRDB-9536

@blathers-crl
Copy link

blathers-crl bot commented Aug 24, 2021

Hi @Azhng, please add a C-ategory label to your issue. Check out the label system docs.

While you're here, please consider adding an A- label to help keep our repository tidy.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is otan.

@Azhng Azhng changed the title compaction algorithm sql: implement sql stats compaction for clean up job Aug 24, 2021
@Azhng Azhng added C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) A-sql-observability Related to observability of the SQL layer T-sql-observability and removed T-sql-observability labels Aug 24, 2021
@Azhng
Copy link
Contributor Author

Azhng commented Sep 7, 2021

Adding some additional context for this issue and as an follow up to the original RFC.

As of #68401, we start to enforce the limit on maximum number of rows in system.{statement, transaction}_statistics tables. By default, this limit is set to 1,000,000 rows. With the assumption that each row cost on average about 1 KB (x replication factor), this allows us to store 1 GB worth of historical statistics in each table. To put that into perspective, given a 10-node cluster and the default 1 hour flush interval, assuming the workload is behind a load balancer and is generating 1500 unique fingerprints (both user-generated queries + our own system queries), this means:

  • We generates: 1500 KB * 10 nodes = 15 MB per hour
  • 1 GB storage cap gives us 1 GB / 15 MB = 66.7 hours (2.7 days) of historical storage.

Note: without bumping the storage cap, this number decreases as the number of nodes in the cluster increase. This is because the traffic is routed to the cluster through a load balancer. This means that almost all of the 1500 unique fingerprints will be present in the in-memory store for each node. Each node flushes the stats to disk periodically. The primary key in the table includes the node_id in order to prevent contentions for flushes. This means in a cluster with 100 nodes, we can only retain about 6 hours worth of historical data.

It is clear that the number of nodes in the cluster becomes a amplification factor and can limit the useful of the new persisted sql stats system.

Compaction across node boundary

A very natural first step here would be compacting statistics across different nodes. To reuse the example previously, this means that with same 1 GB storage cap, we can store up to 27 days (3.8 weeks) worth of historical data regardless of the cluster size!

Implementation

The implementation of this is rather simple:

  • we first use SELECT DISTINCT ... query to select a list of distinct fingerprint IDs that's newly flushed into the system table.
  • we loop through the distinct fingerprint ID list (using follower read) and for each fingerprint ID, we start a transaction:
    • we select all statistics entries for that fingerprint ID that are newly flushed and we compute a single aggregated statistics entry.
    • we delete all the entries that we selected
    • we insert the new entry that we just computed as mark it as an "aggregated" entry. This can be done by either using a special value in the node_id column for that row. (E.g. math.IntMax) or an additional field in the metadata JSONB column.

Since we already have a schedule job set up for this, we can hook into that job directly.

Self Throttling

Although the new implementation outlined above can avoid contending with the foreground traffic by:

  1. Using follower read to avoid transaction contention
  2. Avoid touching the statistics residing in the current active flush interval
  3. Have the foreground read traffic also use follower read

we will be consuming a lot more compute and IO resources to perform this compaction. In addition, we would be issuing transactions where a lot of keys will be deleted. This would have negative performance implications. Careful steps need to be taken to avoid excesses CPU/IO consumption during the compaction which can cause latency spikes in the foreground traffic.

Few things we should include in the implementation above to ensure that our background job is non disruptive:

  1. Breaking down big transactions: we could set a maximum number of keys (perhaps sql.stats.compaction.max_removed_rows_per_txn) we would delete within a single transaction. This can be controlled via a cluster-setting. This would prevent us to send big KV batch requests which can latency spikes for the foreground traffic.
  2. In between transactions, check current cluster load. For this, currently we have few options:
    1. Cluster QPS estimation that's recently shipped by @THardy98. This allows us to have a rough understanding of the current QPS load in the cluster and we utilize this subsystem as an indicator of whether we should proceed with more compaction. A threshold can also be defined using cluster setting and if the cluster QPS load exceeds that threshold, we would backoff and retry compaction after some amount of time. Alternatively, we can also use this to calculate a reduced sql.stats.compaction.max_removed_rows_per_txn value defined previously. Cluster QPS estimation subsystem is designed to be lightweight and the entire subsystem lives in-memory, so it's cheap to query.
    2. Observability Infra team has some done some work around monitoring CPU usage via cgroup. In addition, we are already tracking our disk/network IO stats in the RuntimeStatSampler. This is the most accurate way for us to understand the current system load and react to it accordingly.

Downsampling

This section discusses a potential implementation for how we can implement downsampling. This builds on top of the compaction technique described in the previous section and provide more details for the original RFC.

The goal for downsampling is for us to become even more efficient on using the storage space to store more statistics. With the technique described in the previous sections, we will be able to store up to more than 3.8 week worth of statistics with only 1 million rows per table cap (estimated to be 1 GB * replication factor).

Definitions:

  • Data resolution (r): This defines the length of the time interval each data point is aggregated over. Currently as of today, this is 1 hour by default. (that is, each data point represents the statistics aggregated over 1 hour period)
  • Data points per resolution (d): This defines how many data points we store for a given resolution. For example, as of today, we are storing 1 million data points @ resolution = 1 hour.
  • Data cap (C): total number of data point we can store

Currently, to compute maximum amount of time where we can store the statistics for, we compute it as:

image

Now lets defined additional variables:

  • Resolution amplification factor (a): this is used to compute new data resolution from the existing data resolution. Let r' be the new data resolution and r' can be computed as r' = a * r.

Idea

Currently, both r value and d value are static values that does not change. However, if we start to play around these two values, we would be able to enable us to achieve greater efficiency. The main idea here is that, instead of storing all statistics at the exactly the same resolution, we gradually increases the coarseness of our data resolution for the older statistics. The rate of increases is controlled by a.

This means we would have different segments of data storing at different data resolution, and the fresher the data, the more fine-grain the data resolution.

To use our previous example, assuming that a workload generates 1500 unique fingerprints per hour. The total amount of time we can store statistics for becomes:

image

To give an concrete example, our initial data resolution is r = 1 hour. We set data points per resolution d = 150 and data point amplification factor a = 2, this would gives us maximum retention period of 4650 hours (6.46 month). That is, 7x longer than the technique described in the previous section, and 70x longer than our current retention period, while incurring the same amount of the storage overhead.

In this scheme, we would be storing the initial week (150 hours) of data at 1 hour interval, once the data is older than 1 week, it would be stored at 2 hour interval, and we would be doubling the the coarseness of the data resolution as the statistics becomes stale.

Implementation

Implementing this downsampling scheme is straight forward. We can leverage the same infrastructure as the ones described in the previous section for job scheduling and self throttling. Then we would be able to compute a list of time intervals using r, d and a variables. These variables can then be exposed as cluster settings to give users more controls. (Or we can choose to make them non public since the scheme of this downsampling is a bit complicated). Then for each time interval computed, we would simply reads a list of statistics from the disk that resides in that time interval and perform aggregations.

@github-actions
Copy link

We have marked this issue as stale because it has been inactive for
18 months. If this issue is still relevant, removing the stale label
or adding a comment will keep it active. Otherwise, we'll close it in
10 days to keep the issue queue tidy. Thank you for your contribution
to CockroachDB!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-sql-observability Related to observability of the SQL layer C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) no-issue-activity X-stale
Projects
None yet
Development

No branches or pull requests

1 participant