Skip to content

Commit

Permalink
[batch] Stop writing to v2 billing tables (hail-is#13892)
Browse files Browse the repository at this point in the history
This PR modifies the billing triggers to stop writing to the v2 billing
tables as well as remove the check for whether the equivalent v2 rows
have been "migrated" when writing to the v3 tables. Stacked on hail-is#13891.
  • Loading branch information
jigold authored Jan 24, 2024
1 parent 0411c89 commit b7bde56
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 123 deletions.
135 changes: 12 additions & 123 deletions batch/sql/estimated-current.sql
Original file line number Diff line number Diff line change
Expand Up @@ -595,85 +595,33 @@ BEGIN
SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v2 (billing_project, user, resource_id, token, `usage`)
SELECT billing_project, `user`,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
SELECT batches.billing_project, batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
INNER JOIN aggregated_billing_project_user_resources_v2 ON
aggregated_billing_project_user_resources_v2.billing_project = batches.billing_project AND
aggregated_billing_project_user_resources_v2.user = batches.user AND
aggregated_billing_project_user_resources_v2.resource_id = attempt_resources.resource_id AND
aggregated_billing_project_user_resources_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
SELECT batch_id,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN aggregated_job_group_resources_v2 ON
aggregated_job_group_resources_v2.batch_id = attempt_resources.batch_id AND
aggregated_job_group_resources_v2.resource_id = attempt_resources.resource_id AND
aggregated_job_group_resources_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_group_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v2 (batch_id, job_id, resource_id, `usage`)
SELECT batch_id, job_id,
resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
SELECT attempt_resources.batch_id, attempt_resources.job_id,
attempt_resources.deduped_resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN aggregated_job_resources_v2 ON
aggregated_job_resources_v2.batch_id = attempt_resources.batch_id AND
aggregated_job_resources_v2.job_id = attempt_resources.job_id AND
aggregated_job_resources_v2.resource_id = attempt_resources.resource_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v2 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
billing_project,
`user`,
resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = `usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
batches.billing_project,
Expand All @@ -683,13 +631,7 @@ BEGIN
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
JOIN aggregated_billing_project_user_resources_by_date_v2 ON
aggregated_billing_project_user_resources_by_date_v2.billing_date = cur_billing_date AND
aggregated_billing_project_user_resources_by_date_v2.billing_project = batches.billing_project AND
aggregated_billing_project_user_resources_by_date_v2.user = batches.user AND
aggregated_billing_project_user_resources_by_date_v2.resource_id = attempt_resources.resource_id AND
aggregated_billing_project_user_resources_by_date_v2.token = rand_token
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id AND migrated = 1
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_by_date_v3.`usage` + msec_diff_rollup * quantity;
END IF;
END $$
Expand Down Expand Up @@ -866,10 +808,6 @@ BEGIN
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;
DECLARE bp_user_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE bp_user_resources_by_date_migrated BOOLEAN DEFAULT FALSE;
DECLARE batch_resources_migrated BOOLEAN DEFAULT FALSE;
DECLARE job_resources_migrated BOOLEAN DEFAULT FALSE;

SELECT billing_project, user INTO cur_billing_project, cur_user
FROM batches WHERE id = NEW.batch_id;
Expand All @@ -887,74 +825,25 @@ BEGIN
SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v2 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO bp_user_resources_migrated
FROM aggregated_billing_project_user_resources_v2
WHERE billing_project = cur_billing_project AND user = cur_user AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF bp_user_resources_migrated THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_job_group_resources_v2 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO batch_resources_migrated
FROM aggregated_job_group_resources_v2
WHERE batch_id = NEW.batch_id AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF batch_resources_migrated THEN
INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_job_resources_v2 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.resource_id, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO job_resources_migrated
FROM aggregated_job_resources_v2
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND resource_id = NEW.resource_id
FOR UPDATE;

IF job_resources_migrated THEN
INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;

INSERT INTO aggregated_billing_project_user_resources_by_date_v2 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.resource_id, rand_token, NEW.quantity * msec_diff_rollup)
INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

SELECT migrated INTO bp_user_resources_by_date_migrated
FROM aggregated_billing_project_user_resources_by_date_v2
WHERE billing_date = cur_billing_date AND billing_project = cur_billing_project AND user = cur_user
AND resource_id = NEW.resource_id AND token = rand_token
FOR UPDATE;

IF bp_user_resources_by_date_migrated THEN
INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;
END IF;
END $$

Expand Down
120 changes: 120 additions & 0 deletions batch/sql/remove-v2-billing-writes.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
DELIMITER $$

DROP TRIGGER IF EXISTS attempts_after_update $$
CREATE TRIGGER attempts_after_update AFTER UPDATE ON attempts
FOR EACH ROW
BEGIN
DECLARE job_cores_mcpu INT;
DECLARE cur_billing_project VARCHAR(100);
DECLARE msec_diff_rollup BIGINT;
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;

SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE;
SET rand_token = FLOOR(RAND() * cur_n_tokens);

SELECT cores_mcpu INTO job_cores_mcpu FROM jobs
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id;

SELECT billing_project INTO cur_billing_project FROM batches WHERE id = NEW.batch_id;

SET msec_diff_rollup = (GREATEST(COALESCE(NEW.rollup_time - NEW.start_time, 0), 0) -
GREATEST(COALESCE(OLD.rollup_time - OLD.start_time, 0), 0));

SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
SELECT batches.billing_project, batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
SELECT attempt_resources.batch_id,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_group_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
SELECT attempt_resources.batch_id, attempt_resources.job_id,
attempt_resources.deduped_resource_id,
msec_diff_rollup * quantity
FROM attempt_resources
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_job_resources_v3.`usage` + msec_diff_rollup * quantity;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
SELECT cur_billing_date,
batches.billing_project,
batches.`user`,
attempt_resources.deduped_resource_id,
rand_token,
msec_diff_rollup * quantity
FROM attempt_resources
JOIN batches ON batches.id = attempt_resources.batch_id
WHERE attempt_resources.batch_id = NEW.batch_id AND attempt_resources.job_id = NEW.job_id AND attempt_id = NEW.attempt_id
ON DUPLICATE KEY UPDATE `usage` = aggregated_billing_project_user_resources_by_date_v3.`usage` + msec_diff_rollup * quantity;
END IF;
END $$

DROP TRIGGER IF EXISTS attempt_resources_after_insert $$
CREATE TRIGGER attempt_resources_after_insert AFTER INSERT ON attempt_resources
FOR EACH ROW
BEGIN
DECLARE cur_start_time BIGINT;
DECLARE cur_rollup_time BIGINT;
DECLARE cur_billing_project VARCHAR(100);
DECLARE cur_user VARCHAR(100);
DECLARE msec_diff_rollup BIGINT;
DECLARE cur_n_tokens INT;
DECLARE rand_token INT;
DECLARE cur_billing_date DATE;

SELECT billing_project, user INTO cur_billing_project, cur_user
FROM batches WHERE id = NEW.batch_id;

SELECT n_tokens INTO cur_n_tokens FROM globals LOCK IN SHARE MODE;
SET rand_token = FLOOR(RAND() * cur_n_tokens);

SELECT start_time, rollup_time INTO cur_start_time, cur_rollup_time
FROM attempts
WHERE batch_id = NEW.batch_id AND job_id = NEW.job_id AND attempt_id = NEW.attempt_id
LOCK IN SHARE MODE;

SET msec_diff_rollup = GREATEST(COALESCE(cur_rollup_time - cur_start_time, 0), 0);

SET cur_billing_date = CAST(UTC_DATE() AS DATE);

IF msec_diff_rollup != 0 THEN
INSERT INTO aggregated_billing_project_user_resources_v3 (billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_job_group_resources_v3 (batch_id, resource_id, token, `usage`)
VALUES (NEW.batch_id, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_job_resources_v3 (batch_id, job_id, resource_id, `usage`)
VALUES (NEW.batch_id, NEW.job_id, NEW.deduped_resource_id, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;

INSERT INTO aggregated_billing_project_user_resources_by_date_v3 (billing_date, billing_project, user, resource_id, token, `usage`)
VALUES (cur_billing_date, cur_billing_project, cur_user, NEW.deduped_resource_id, rand_token, NEW.quantity * msec_diff_rollup)
ON DUPLICATE KEY UPDATE
`usage` = `usage` + NEW.quantity * msec_diff_rollup;
END IF;
END $$

DELIMITER ;
3 changes: 3 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2358,6 +2358,9 @@ steps:
- name: rename-job-groups-tables
script: /io/sql/rename-job-groups-tables.sql
online: false # this must be offline
- name: remove-v2-billing-writes
script: /io/sql/remove-v2-billing-writes.sql
online: true
inputs:
- from: /repo/batch/sql
to: /io/sql
Expand Down

0 comments on commit b7bde56

Please sign in to comment.