Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] Stop writing to v2 billing tables #13892

Merged
merged 4 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 12 additions & 123 deletions batch/sql/estimated-current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -595,85 +595,33 @@ BEGIN
SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v2 (billing_project, user, resource_id, token, `usage`)
SELECT billing_project, `user`,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
SELECT batches.billing_project, batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
INNER JOIN aggregated_billing_project_user_resources_v2 ON
aggregated_billing_project_user_resources_v2.billing_project = batches.billing_project AND
aggregated_billing_project_user_resources_v2.user = batches.user AND
aggregated_billing_project_user_resources_v2.resource_id = attempt_resources.resource_id AND
aggregated_billing_project_user_resources_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
SELECT batch_id,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN aggregated_job_group_resources_v2 ON
aggregated_job_group_resources_v2.batch_id = attempt_resources.batch_id AND
aggregated_job_group_resources_v2.resource_id = attempt_resources.resource_id AND
aggregated_job_group_resources_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_group_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v2 (batch_id, job_id, resource_id, `usage`)
SELECT batch_id, job_id,
resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
SELECT attempt_resources.batch_id, attempt_resources.job_id,
attempt_resources.deduped_resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN aggregated_job_resources_v2 ON
aggregated_job_resources_v2.batch_id = attempt_resources.batch_id AND
aggregated_job_resources_v2.job_id = attempt_resources.job_id AND
aggregated_job_resources_v2.resource_id = attempt_resources.resource_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v2 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
billing_project,
`user`,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
batches.billing_project,
Expand All @@ -683,13 +631,7 @@ BEGIN
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
JOIN aggregated_billing_project_user_resources_by_date_v2 ON
aggregated_billing_project_user_resources_by_date_v2.billing_date = cur_billing_date AND
aggregated_billing_project_user_resources_by_date_v2.billing_project = batches.billing_project AND
aggregated_billing_project_user_resources_by_date_v2.user = batches.user AND
aggregated_billing_project_user_resources_by_date_v2.resource_id = attempt_resources.resource_id AND
aggregated_billing_project_user_resources_by_date_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_by_date_v3.`usage` + msec_diff_rollup * quantity;
END IF;
END $$
Expand Down Expand Up @@ -866,10 +808,6 @@ BEGIN
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;
DECLARE bp_user_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE bp_user_resources_by_date_migrated BOOLEAN DEFAULT FALSE;
DECLARE batch_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE job_resources_migrated BOOLEAN DEFAULT FALSE;

SELECT billing_project, user INTO cur_billing_project, cur_user
FROM batches WHERE id = NEW.batch_id;
Expand All @@ -887,74 +825,25 @@ BEGIN
SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v2 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And just checking here to make absolutely sure, resource_id and deduped_resource_id are the same in this table?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the billing tables, there is no deduped_resource_id. The v2 table is essentially the "resource_id" in attempt_resources and the v3 table "resource_id" is equivalent to the "deduped_resource_id" in the attempt resources table.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, thanks!

VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO bp_user_resources_migrated
FROM aggregated_billing_project_user_resources_v2
WHERE billing_project = cur_billing_project AND user = cur_user AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF bp_user_resources_migrated THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO batch_resources_migrated
FROM aggregated_job_group_resources_v2
WHERE batch_id = NEW.batch_id AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF batch_resources_migrated THEN
INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_job_resources_v2 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.resource_id, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO job_resources_migrated
FROM aggregated_job_resources_v2
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND resource_id = NEW.resource_id
FOR UPDATE;

IF job_resources_migrated THEN
INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_billing_project_user_resources_by_date_v2 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO bp_user_resources_by_date_migrated
FROM aggregated_billing_project_user_resources_by_date_v2
WHERE billing_date = cur_billing_date AND billing_project = cur_billing_project AND user = cur_user
AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF bp_user_resources_by_date_migrated THEN
INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;
END IF;
END $$

Expand Down
120 changes: 120 additions & 0 deletions batch/sql/remove-v2-billing-writes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
DELIMITER $$

DROP TRIGGER IF EXISTS attempts_after_update $$
CREATE TRIGGER attempts_after_update AFTER UPDATE ON attempts
FOR EACH ROW
BEGIN
DECLARE job_cores_mcpu INT;
DECLARE cur_billing_project VARCHAR(100);
DECLARE msec_diff_rollup BIGINT;
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;

SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE;
SET rand_token = FLOOR(RAND() * cur_n_tokens);

SELECT cores_mcpu INTO job_cores_mcpu FROM jobs
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id;

SELECT billing_project INTO cur_billing_project FROM batches WHERE id = NEW.batch_id;

SET msec_diff_rollup = (GREATEST(COALESCE(NEW.rollup_time - NEW.start_time, 0), 0) -
GREATEST(COALESCE(OLD.rollup_time - OLD.start_time, 0), 0));

SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
SELECT batches.billing_project, batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_group_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
SELECT attempt_resources.batch_id, attempt_resources.job_id,
attempt_resources.deduped_resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
batches.billing_project,
batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_by_date_v3.`usage` + msec_diff_rollup * quantity;
END IF;
END $$

DROP TRIGGER IF EXISTS attempt_resources_after_insert $$
CREATE TRIGGER attempt_resources_after_insert AFTER INSERT ON attempt_resources
FOR EACH ROW
BEGIN
DECLARE cur_start_time BIGINT;
DECLARE cur_rollup_time BIGINT;
DECLARE cur_billing_project VARCHAR(100);
DECLARE cur_user VARCHAR(100);
DECLARE msec_diff_rollup BIGINT;
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;

SELECT billing_project, user INTO cur_billing_project, cur_user
FROM batches WHERE id = NEW.batch_id;

SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE;
SET rand_token = FLOOR(RAND() * cur_n_tokens);

SELECT start_time, rollup_time INTO cur_start_time, cur_rollup_time
FROM attempts
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
LOCK IN SHARE MODE;

SET msec_diff_rollup = GREATEST(COALESCE(cur_rollup_time - cur_start_time, 0), 0);

SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;
END $$

DELIMITER ;
3 changes: 3 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,9 @@ steps:
- name: rename-job-groups-tables
script: /io/sql/rename-job-groups-tables.sql
online: false # this must be offline
- name: remove-v2-billing-writes
script: /io/sql/remove-v2-billing-writes.sql
online: true
inputs:
- from: /repo/batch/sql
to: /io/sql
Expand Down