Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental CAgg refresh using a work queue #40

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

feikesteenbergen
Copy link
Member

@feikesteenbergen feikesteenbergen commented Dec 4, 2024

Continous Aggregates, incremental parallel setup

This code is exploring the possibilities to do incremental CAgg refreshes in
parallel. The setup it uses is as following.

At a very high level these are the components:

  • a table that acts as a work queue:
    _timescaledb_additional.incremental_continuous_aggregate_refreshes
  • one (or more) producer jobs that schedule CAgg refreshes
  • one (or more) consumer jobs that process the jobs based on priority

The producer jobs can be scheduled very frequently, as no duplicate tasks will
be written to the work queue.

Producer

We have a producer procedure
(schedule_refresh_continuous_aggregate_incremental), which schedules tasks to
be picked up by the consumers.

The configuration for this call contains the following keys:

{
    "end_offset": "similar to end-offset in the policy",
    "start_offset": "similar to start-offset in the policy",
    "continuous_aggregate": "regclass / fully qualified name of the user view for the CAgg",
    "increment_size": "the size of each individual task, default: chunk_interval",
    "priority": "priority for these tasks. Lower numbers get processed earlier, default: 100"
}

Producer Examples

Schedule multiple jobs for this cagg, with increments of 1 week

We schedule 2 sets

CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental(
    job_id => null,
    config => '
{
    "end_offset": "6 weeks",
    "start_offset": "3 years",
    "continuous_aggregate": "public.test_cagg_incr_refresh_cagg",
    "increment_size": "3 days"
}');

with the most recent data having the highest priority:

CALL _timescaledb_additional.schedule_refresh_continuous_aggregate_incremental(
    job_id => null,
    config => '
{
    "end_offset": "1 day",
    "start_offset": "6 weeks",
    "continuous_aggregate": "public.test_cagg_incr_refresh_cagg",
    "increment_size": "1 week",
    "priority": 1
}');

Consumer

For the consumer(s), we schedule as many jobs as we want to be able to run in
parallel. Likely, a reasonable maximum for these is not too high, for example,
4-6. While we can do incremental CAgg refreshes, we cannot (as of december
2024) schedule parallel refreshes for the same CAgg. This should therefore never
be higher than your number of CAggs.

These jobs will be consuming a connection all the time, as they are designed to
run all the time.

SELECT
    public.add_job(
        proc => '_timescaledb_additional.task_refresh_continuous_aggregate_incremental_runner'::regproc,
        -- This isn't really needed, but this ensures the workers do not run forever,
        -- but once they terminate, they will be restarted within 15 minutes or so.
        schedule_interval => interval '15 minutes',
        config => '{"max_runtime": "11 hours"}',
        initial_start => now()
    )
FROM
    generate_series(1, 4);

@feikesteenbergen feikesteenbergen force-pushed the feike/producer_consumer_incr_cagg branch 3 times, most recently from db27250 to 8dbf03d Compare December 4, 2024 15:06
Comment on lines 3 to 15
CREATE TABLE IF NOT EXISTS _timescaledb_additional.incremental_continuous_aggregate_refreshes (
continuous_aggregate regclass not null,
window_start timestamptz not null,
window_end timestamptz not null CHECK (window_end > window_start),
scheduled timestamptz not null default pg_catalog.clock_timestamp(),
worker_pid integer
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we need a column to know when it was processed??? Or it will be deleted?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we need a column to know when it was processed

Let's do that, then we can create some reports afterwards.
We do need to think about retention that way though, especiallly if the number of tasks is large.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    started timestamptz,
    finished timestamptz,

Adding these, and changing unique index:

) WHERE worker_pid IS NULL AND finished IS NULL;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can turn it into a hypertable then to make our life easier to apply retention policies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alas, then my trick with the unique index won't work!

IF increment_size IS NULL THEN
SELECT
-- We default to the dimension interval_length if not explicitly specified
coalesce(increment_size, interval_length * interval '1 microsecond')
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increment_size is always NULL in this context, right?

-            coalesce(increment_size, interval_length * interval '1 microsecond')
+            interval_length * interval '1 microsecond'

@feikesteenbergen feikesteenbergen force-pushed the feike/producer_consumer_incr_cagg branch from e000246 to 9623bdf Compare December 16, 2024 14:08
The initial idea behind this work was to create incremental *parallel*
refresh, however, there still are explicit locks being held on CAgg
hypertables during refresh, so it has become *only* incremental
refresh.

We introduce 3 concepts:

- producer job
- consumer job
- work queue

By having these items separate, we can schedule work for CAgg refreshes
in smaller increments (say 12 hours instead of 3 weeks), yet also allow
us to intervene by injecting higher priority refreshes if needed.

For details, see README.md
@feikesteenbergen feikesteenbergen force-pushed the feike/producer_consumer_incr_cagg branch from 9623bdf to 631dda8 Compare December 16, 2024 14:11
@feikesteenbergen feikesteenbergen changed the title Wip, reword Incremental CAgg refresh using a work queue Dec 17, 2024
Comment on lines +91 to +94
ELSE
SET application_name TO 'cagg incremental refresh consumer - waiting for next task';
PERFORM pg_catalog.pg_sleep(0.1);
CONTINUE;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabriziomello we can also decide to RETURN here, so that we are not occupying a max_workers slot / max_background_workers slot

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this is something that definitely we need to thing about... then our scheduler can launch again the worker

@feikesteenbergen feikesteenbergen force-pushed the feike/producer_consumer_incr_cagg branch from 6b4ae13 to 631dda8 Compare December 17, 2024 15:34

-- By serializing the picking items from the queue, we prevent some race conditions.
LOCK TABLE _timescaledb_additional.incremental_continuous_aggregate_refreshes
IN ACCESS EXCLUSIVE MODE;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My be worthy of a comment, but the tx that takes this lock is always short lived.
It doesn't do heavy lifting, it populates worker_pid and commits.

Copy link
Contributor

@fabriziomello fabriziomello Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this?? We're using FOR UPDATE when picking an item from the queue. Asking cause it makes FOR UPDATE SKIP LOCKED useless no?

Comment on lines +30 to +39
DECLARE
p_id bigint;
p_cagg regclass;
p_window_start timestamptz;
p_window_end timestamptz;
p_start_time timestamptz;
p_end_time timestamptz;
p_mat_hypertable_id int;
p_job_id int;
BEGIN
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need a subtransaction here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants