From fb00c268e9a17cc18418190ea252a76523bbab44 Mon Sep 17 00:00:00 2001 From: Micah Wylde Date: Tue, 9 Jul 2024 13:59:02 -0700 Subject: [PATCH] Use an inner join to prevent controller from loading partially-constructed jobs --- crates/arroyo-api/queries/api_queries.sql | 14 +++++++------- .../queries/controller_queries.sql | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/crates/arroyo-api/queries/api_queries.sql b/crates/arroyo-api/queries/api_queries.sql index 5a527d7d5..eb36fc5d1 100644 --- a/crates/arroyo-api/queries/api_queries.sql +++ b/crates/arroyo-api/queries/api_queries.sql @@ -131,7 +131,7 @@ VALUES (:pub_id, :organization_id, :created_by, :name, :type, :textual_repr, :ud SELECT pipelines.id, pipelines.pub_id, name, type, textual_repr, udfs, program, checkpoint_interval_micros, stop, pipelines.created_at, state, parallelism_overrides, ttl_micros FROM pipelines INNER JOIN job_configs on pipelines.id = job_configs.pipeline_id - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id WHERE pipelines.organization_id = :organization_id AND pipelines.pub_id IS NOT NULL AND ttl_micros IS NULL @@ -146,7 +146,7 @@ LIMIT cast(:limit as integer); SELECT pipelines.id, pipelines.pub_id, name, type, textual_repr, udfs, program, checkpoint_interval_micros, stop, pipelines.created_at, state, parallelism_overrides, ttl_micros FROM pipelines INNER JOIN job_configs on pipelines.id = job_configs.pipeline_id - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id WHERE pipelines.pub_id = :pub_id AND pipelines.organization_id = :organization_id; --! get_pipeline_id @@ -196,7 +196,7 @@ INSERT INTO job_statuses (pub_id, id, organization_id) VALUES (:pub_id, :id, :or --! get_jobs: (start_time?, finish_time?, state?, tasks?, textual_repr?, failure_message?, run_id?, udfs) SELECT job_configs.id as id, pipeline_name, stop, textual_repr, start_time, finish_time, state, tasks, pipeline_id, failure_message, run_id, udfs FROM job_configs - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id INNER JOIN pipelines ON pipeline_id = pipelines.id WHERE job_configs.organization_id = :organization_id AND ttl_micros IS NULL ORDER BY COALESCE(job_configs.updated_at, job_configs.created_at) DESC; @@ -204,7 +204,7 @@ ORDER BY COALESCE(job_configs.updated_at, job_configs.created_at) DESC; --! get_pipeline_jobs : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?) SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at FROM job_configs - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id WHERE job_configs.organization_id = :organization_id AND pipelines.pub_id = :pub_id ORDER BY job_configs.created_at DESC; @@ -212,7 +212,7 @@ ORDER BY job_configs.created_at DESC; --! get_all_jobs : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?) SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at FROM job_configs - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id WHERE job_configs.organization_id = :organization_id AND ttl_micros IS NULL ORDER BY job_configs.created_at DESC; @@ -220,7 +220,7 @@ ORDER BY job_configs.created_at DESC; --! get_pipeline_job : DbPipelineJob(start_time?, finish_time?, state?, tasks?, failure_message?, run_id?) SELECT job_configs.id, stop, start_time, finish_time, state, tasks, failure_message, run_id, checkpoint_interval_micros, job_configs.created_at FROM job_configs - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id INNER JOIN pipelines ON pipelines.id = job_configs.pipeline_id WHERE job_configs.organization_id = :organization_id AND job_configs.id = :job_id ORDER BY job_configs.created_at DESC; @@ -229,7 +229,7 @@ ORDER BY job_configs.created_at DESC; --! get_job_details: (start_time?, finish_time?, state?, tasks?, textual_repr?, udfs, failure_message?, run_id?) SELECT pipeline_name, stop, parallelism_overrides, state, start_time, finish_time, tasks, textual_repr, program, pipeline_id, udfs, failure_message, run_id FROM job_configs - LEFT JOIN job_statuses ON job_configs.id = job_statuses.id + INNER JOIN job_statuses ON job_configs.id = job_statuses.id INNER JOIN pipelines ON pipeline_id = pipelines.id WHERE job_configs.organization_id = :organization_id AND job_configs.id = :job_id; diff --git a/crates/arroyo-controller/queries/controller_queries.sql b/crates/arroyo-controller/queries/controller_queries.sql index 4eca2a99b..67c46f818 100644 --- a/crates/arroyo-controller/queries/controller_queries.sql +++ b/crates/arroyo-controller/queries/controller_queries.sql @@ -21,7 +21,7 @@ SELECT s.restart_nonce as status_restart_nonce, restart_mode FROM job_configs c -LEFT JOIN job_statuses s ON c.id = s.id; +INNER JOIN job_statuses s ON c.id = s.id; --! update_job_status (start_time?, finish_time?, tasks?, failure_message?, pipeline_path?, wasm_path?) UPDATE job_statuses