diff --git a/etl-scripts/stored-procedures/generate_flat_ncd.sql b/etl-scripts/stored-procedures/generate_flat_ncd.sql new file mode 100644 index 0000000..6da0d1c --- /dev/null +++ b/etl-scripts/stored-procedures/generate_flat_ncd.sql @@ -0,0 +1,776 @@ +CREATE DEFINER=`hkorir`@`%` PROCEDURE `generate_flat_ncd`(IN query_type varchar(50), IN queue_number int, IN queue_size int, IN cycle_size int) +BEGIN + set @primary_table := "flat_ncd"; + set @query_type = query_type; + + set @total_rows_written = 0; + + set @encounter_types = "(54,55,75,76,77,78,79,83,96,99,100,104,107,108,109,131,171,172)"; + set @clinical_encounter_types = "(54,55,75,76,77,78,79,83,96,104,107,108,109,171,172)"; + set @non_clinical_encounter_types = "(131)"; + set @other_encounter_types = "(-1)"; + + set @start = now(); + set @table_version = "flat_ncd_v1.0"; + + set session sort_buffer_size=512000000; + + set @sep = " ## "; + set @boundary = "!!"; + set @last_date_created = (select max(max_date_created) from etl.flat_obs); + + create table if not exists flat_ncd ( + date_created TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + person_id int, + uuid varchar(100), + visit_id int, + encounter_id int, + encounter_datetime datetime, + encounter_type int, + is_clinical_encounter int, + location_id int, + location_uuid varchar(100), + death_date datetime, + prev_rtc_date datetime, + rtc_date datetime, + + lmp date, + + sbp smallint, + dbp smallint, + pulse smallint, + + fbs decimal, + rbs decimal, + hb_a1c decimal, + hb_a1c_date datetime, + + creatinine decimal, + creatinine_date datetime, + + total_cholesterol decimal, + hdl decimal, + ldl decimal, + triglycerides decimal, + lipid_panel_date datetime, + + dm_status mediumint, + htn_status mediumint, + dm_meds varchar(500), + htn_meds varchar(500), + prescriptions text, + + problems text, + + comorbidities text, + rheumatologic_disorder text, + kidney_disease text, + ckd_staging text, + cardiovascular_disorder text, + neurological_disease text, + has_past_mhd_tx text, + eligible_for_depression_care text, + anxiety_condition text, + convulsive_disorder text, + mood_disorder text, + indicated_mhd_tx text, + prev_hbp_findings text, + type_of_follow_up text, + review_of_med_history text, + psychiatric_exam_findings text, + + prev_encounter_datetime_ncd datetime, + next_encounter_datetime_ncd datetime, + prev_encounter_type_ncd mediumint, + next_encounter_type_ncd mediumint, + prev_clinical_datetime_ncd datetime, + next_clinical_datetime_ncd datetime, + prev_clinical_location_id_ncd mediumint, + next_clinical_location_id_ncd mediumint, + prev_clinical_rtc_date_ncd datetime, + next_clinical_rtc_date_ncd datetime, + + primary key encounter_id (encounter_id), + index person_date (person_id, encounter_datetime), + index person_uuid (uuid), + index location_enc_date (location_uuid,encounter_datetime), + index enc_date_location (encounter_datetime, location_uuid), + index location_id_rtc_date (location_id,rtc_date), + index location_uuid_rtc_date (location_uuid,rtc_date), + index loc_id_enc_date_next_clinical (location_id, encounter_datetime, next_clinical_datetime_ncd), + index encounter_type (encounter_type), + index date_created (date_created) + + ); + + + + if(@query_type="build") then + select 'BUILDING..........................................'; + + set @write_table = concat("flat_ncd_temp_",queue_number); + set @queue_table = concat("flat_ncd_build_queue_",queue_number); + + + SET @dyn_sql=CONCAT('Create table if not exists ',@write_table,' like ',@primary_table); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @dyn_sql=CONCAT('Create table if not exists ',@queue_table,' (select * from flat_ncd_build_queue limit ', queue_size, ');'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @dyn_sql=CONCAT('delete t1 from flat_ncd_build_queue t1 join ',@queue_table, ' t2 using (person_id);'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + end if; + + + if (@query_type="sync") then + select 'SYNCING..........................................'; + set @write_table = "flat_ncd"; + set @queue_table = "flat_ncd_sync_queue"; + create table if not exists flat_ncd_sync_queue (person_id int primary key); + + + + set @last_update = null; + + select max(date_updated) into @last_update from etl.flat_log where table_name=@table_version; + select @last_update := if(@last_update,@last_update,'1900-01-01'); + + replace into flat_ncd_sync_queue + (select distinct patient_id + from amrs.encounter + where date_changed > @last_update + ); + + replace into flat_ncd_sync_queue + (select distinct person_id + from etl.flat_obs + where max_date_created > @last_update + ); + + replace into flat_ncd_sync_queue + (select distinct person_id + from etl.flat_lab_obs + where max_date_created > @last_update + ); + + replace into flat_ncd_sync_queue + (select distinct person_id + from etl.flat_orders + where max_date_created > @last_update + ); + + replace into flat_ncd_sync_queue + (select person_id from + amrs.person + where date_voided > @last_update); + + + replace into flat_ncd_sync_queue + (select person_id from + amrs.person + where date_changed > @last_update); + + + end if; + + + # Remove test patients + SET @dyn_sql=CONCAT('delete t1 FROM ',@queue_table,' t1 + join amrs.person_attribute t2 using (person_id) + where t2.person_attribute_type_id=28 and value="true" and voided=0'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @person_ids_count = 0; + SET @dyn_sql=CONCAT('select count(*) into @person_ids_count from ',@queue_table); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + select @person_ids_count as 'num patients to update'; + + SET @dyn_sql=CONCAT('delete t1 from ',@primary_table, ' t1 join ',@queue_table,' t2 using (person_id);'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + drop temporary table if exists prescriptions; + create temporary table prescriptions (encounter_id int primary key, prescriptions text) + ( + select + encounter_id, + group_concat(obs separator ' $ ') as prescriptions + from + ( + select + t2.encounter_id, + obs_group_id, + group_concat( + case + when value_coded is not null then concat(@boundary,o.concept_id,'=',value_coded,@boundary) + when value_numeric is not null then concat(@boundary,o.concept_id,'=',value_numeric,@boundary) + when value_datetime is not null then concat(@boundary,o.concept_id,'=',date(value_datetime),@boundary) + when value_text is not null then concat(@boundary,o.concept_id,'=',value_text,@boundary) + when value_drug is not null then concat(@boundary,o.concept_id,'=',value_drug,@boundary) + when value_modifier is not null then concat(@boundary,o.concept_id,'=',value_modifier,@boundary) + end + order by o.concept_id,value_coded + separator ' ## ' + ) as obs + + from amrs.obs o + join (select encounter_id, obs_id, concept_id as grouping_concept from amrs.obs where concept_id in (7307,7334)) t2 on o.obs_group_id = t2.obs_id + group by obs_group_id + ) t + group by encounter_id + ); + + + set @total_time=0; + set @cycle_number = 0; + + + while @person_ids_count > 0 do + + set @loop_start_time = now(); + + #create temp table with a set of person ids + drop temporary table if exists flat_ncd_build_queue__0; + + SET @dyn_sql=CONCAT('create temporary table flat_ncd_build_queue__0 (person_id int primary key) (select * from ',@queue_table,' limit ',cycle_size,');'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + drop temporary table if exists flat_ncd_0a; + SET @dyn_sql = CONCAT( + 'create temporary table flat_ncd_0a + (select + t1.person_id, + t1.visit_id, + t1.encounter_id, + t1.encounter_datetime, + t1.encounter_type, + t1.location_id, + t1.obs, + t1.obs_datetimes, + case + when t1.encounter_type in ',@clinical_encounter_types,' then 1 + else null + end as is_clinical_encounter, + + case + when t1.encounter_type in ',@non_clinical_encounter_types,' then 20 + when t1.encounter_type in ',@clinical_encounter_types,' then 10 + when t1.encounter_type in', @other_encounter_types, ' then 5 + else 1 + end as encounter_type_sort_index, + t2.orders + from etl.flat_obs t1 + join flat_ncd_build_queue__0 t0 using (person_id) + left join etl.flat_orders t2 using(encounter_id) + );'); + + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + insert into flat_ncd_0a + (select + t1.person_id, + null, + t1.encounter_id, + t1.test_datetime, + t1.encounter_type, + null, #t1.location_id, + t1.obs, + null, #obs_datetimes + # in any visit, there many be multiple encounters. for this dataset, we want to include only clinical encounters (e.g. not lab or triage visit) + 0 as is_clinical_encounter, + 1 as encounter_type_sort_index, + null + from etl.flat_lab_obs t1 + join flat_ncd_build_queue__0 t0 using (person_id) + ); + + drop temporary table if exists flat_ncd_0; + create temporary table flat_ncd_0(index encounter_id (encounter_id), index person_enc (person_id,encounter_datetime)) + (select * from flat_ncd_0a + order by person_id, date(encounter_datetime), encounter_type_sort_index + ); + + + + set @prev_id = null; + set @cur_id = null; + set @prev_encounter_date = null; + set @cur_encounter_date = null; + set @enrollment_date = null; + set @cur_location = null; + set @cur_rtc_date = null; + set @prev_rtc_date = null; + + set @death_date = null; + + + #TO DO + # screened for cervical ca + # exposed infant + + drop temporary table if exists flat_ncd_1; + create temporary table flat_ncd_1 (index encounter_id (encounter_id)) + (select + obs, + encounter_type_sort_index, + @prev_id := @cur_id as prev_id, + @cur_id := t1.person_id as cur_id, + t1.person_id, + p.uuid, + t1.visit_id, + t1.encounter_id, + @prev_encounter_date := date(@cur_encounter_date) as prev_encounter_date, + @cur_encounter_date := date(encounter_datetime) as cur_encounter_date, + t1.encounter_datetime, + t1.encounter_type, + t1.is_clinical_encounter, + + death_date, + case + when location_id then @cur_location := location_id + when @prev_id = @cur_id then @cur_location + else null + end as location_id, + + case + when @prev_id=@cur_id then @prev_rtc_date := @cur_rtc_date + else @prev_rtc_date := null + end as prev_rtc_date, + + # 5096 = return visit date + case + when obs regexp "!!5096=" then @cur_rtc_date := etl.GetValues(obs,5096) + when @prev_id = @cur_id then if(@cur_rtc_date > encounter_datetime,@cur_rtc_date,null) + else @cur_rtc_date := null + end as cur_rtc_date, + + @lmp := etl.GetValues(obs,1836) as lmp, + + @sbp := etl.GetValues(obs,5085) as sbp, + + @dbp := etl.GetValues(obs,5086) as dbp, + + @pulse := etl.GetValues(obs,5087) as pulse, + + @fbs := etl.GetValues(obs,6252) as fbs, + @rbs := etl.GetValues(obs,887) as rbs, + + case + when obs regexp "!!6126=" then @hb_a1c := etl.GetValues(obs,6126) + when @prev_id = @cur_id then @hb_a1c + else @hb_a1c := null + end as hb_a1c, + + case + when obs regexp "!!6126=" then @hb_a1c_date := ifnull(etl.GetValues(obs_datetimes,6126),encounter_datetime) + when @prev_id=@cur_id then @hb_a1c_date + else @hb_a1c_date:=null + end as hb_a1c_date, + + case + when obs regexp "!!790=" then @creatinine := etl.GetValues(obs,790) + when @prev_id = @cur_id then @creatinine + else @creatinine := null + end as creatinine, + + case + when obs regexp "!!790=" then @creatinine_date := ifnull(etl.GetValues(obs_datetimes,790),encounter_datetime) + when @prev_id=@cur_id then @creatinine_date + else @creatinine_date:=null + end as creatinine_date, + + + case + when obs regexp "!!1006=" then @total_cholesterol := etl.GetValues(obs,1006) + when @prev_id = @cur_id then @total_cholesterol + else @total_cholesterol := null + end as total_cholesterol, + + case + when obs regexp "!!1007=" then @hdl := etl.GetValues(obs,1007) + when @prev_id = @cur_id then @hdl + else @hdl := null + end as hdl, + + case + when obs regexp "!!1008=" then @ldl := etl.GetValues(obs,1008) + when @prev_id = @cur_id then @ldl + else @ldl := null + end as ldl, + + case + when obs regexp "!!1009=" then @triglycerides := etl.GetValues(obs,1009) + when @prev_id = @cur_id then @triglycerides + else @triglycerides := null + end as triglycerides, + + case + when obs regexp "!!1006=" then @lipid_panel_date := ifnull(etl.GetValues(obs_datetimes,1006),encounter_datetime) + when @prev_id=@cur_id then @lipid_panel_date + else @lipid_panel_date:=null + end as lipid_panel_date, + + + @dm_status := etl.GetValues(obs,7287) as dm_status, + @htn_status := etl.GetValues(obs,7288) as htn_status, + @dm_meds := nullif(concat_ws(' ## ', + etl.GetValues(obs,7290), + etl.GetValues(obs,7304) + ),'') as dm_meds, + @htn_meds := nullif(concat_ws(' ## ', + etl.GetValues(obs,7291), + etl.GetValues(obs,7332), + etl.GetValues(obs,10241) + ),'') as htn_meds, + + t2.prescriptions as prescriptions, + + @problems := nullif(concat_ws(' ## ', + etl.GetValues(obs,6042 ), + etl.GetValues(obs,11679), + etl.GetValues(obs,6796), + etl.GetValues(obs,2072), + etl.GetValues(obs,6097), + etl.GetValues(obs,6461) + ), '') as problems, + @comorbidities := etl.GetValues(obs,10239 ) as comorbidities, + @rheumatologic_disorder := etl.GetValues(obs,12293) as rheumatologic_disorder, + @kidney_disease := etl.GetValues(obs, 6033) as kidney_disease, + @ckd_staging := etl.GetValues(obs,10101) as ckd_staging, + @cardiovascular_disorder := etl.GetValues(obs, 7971) as cardiovascular_disorder, + @neurological_disease := etl.GetValues(obs, 1129) as neurological_disease, + @has_past_mhd_tx := etl.GetValues(obs, 10280) as has_past_mhd_tx, + @eligible_for_depression_care := etl.GetValues(obs, 10293) as eligible_for_depression_care, + @anxiety_condition := etl.GetValues(obs, 11231) as anxiety_condition, + @convulsive_disorder := etl.GetValues(obs, 11791) as convulsive_disorder, + @mood_disorder := etl.GetValues(obs, 11279) as mood_disorder, + @indicated_mhd_tx := etl.GetValues(obs, 7781) as indicated_mhd_tx, + @prev_hbp_findings := etl.GetValues(obs, 9092) as prev_hbp_findings, + @type_of_follow_up := etl.GetValues(obs, 2332) as type_of_follow_up, + @review_of_med_history := etl.GetValues(obs, 6245) as review_of_med_history, + @psychiatric_exam_findings := etl.GetValues(obs, 1130) as psychiatric_exam_findings + + from flat_ncd_0 t1 + join amrs.person p using (person_id) + left outer join prescriptions t2 using (encounter_id) + ); + + + set @prev_id = null; + set @cur_id = null; + set @prev_encounter_datetime = null; + set @cur_encounter_datetime = null; + + set @prev_clinical_datetime = null; + set @cur_clinical_datetime = null; + + set @next_encounter_type = null; + set @cur_encounter_type = null; + + set @prev_clinical_location_id = null; + set @cur_clinical_location_id = null; + + + alter table flat_ncd_1 drop prev_id, drop cur_id; + + drop table if exists flat_ncd_2; + create temporary table flat_ncd_2 + (select *, + @prev_id := @cur_id as prev_id, + @cur_id := person_id as cur_id, + + case + when @prev_id = @cur_id then @prev_encounter_datetime := @cur_encounter_datetime + else @prev_encounter_datetime := null + end as next_encounter_datetime_ncd, + + @cur_encounter_datetime := encounter_datetime as cur_encounter_datetime, + + case + when @prev_id=@cur_id then @next_encounter_type := @cur_encounter_type + else @next_encounter_type := null + end as next_encounter_type_ncd, + + @cur_encounter_type := encounter_type as cur_encounter_type, + + case + when @prev_id = @cur_id then @prev_clinical_datetime := @cur_clinical_datetime + else @prev_clinical_datetime := null + end as next_clinical_datetime_ncd, + + case + when @prev_id = @cur_id then @prev_clinical_location_id := @cur_clinical_location_id + else @prev_clinical_location_id := null + end as next_clinical_location_id_ncd, + + case + when is_clinical_encounter then @cur_clinical_datetime := encounter_datetime + when @prev_id = @cur_id then @cur_clinical_datetime + else @cur_clinical_datetime := null + end as cur_clinic_datetime, + + case + when is_clinical_encounter then @cur_clinical_location_id := location_id + when @prev_id = @cur_id then @cur_clinical_location_id + else @cur_clinical_location_id := null + end as cur_clinic_location_id, + + case + when @prev_id = @cur_id then @prev_clinical_rtc_date := @cur_clinical_rtc_date + else @prev_clinical_rtc_date := null + end as next_clinical_rtc_date_ncd, + + case + when is_clinical_encounter then @cur_clinical_rtc_date := cur_rtc_date + when @prev_id = @cur_id then @cur_clinical_rtc_date + else @cur_clinical_rtc_date:= null + end as cur_clinical_rtc_date + + from flat_ncd_1 + order by person_id, date(encounter_datetime) desc, encounter_type_sort_index desc + ); + + alter table flat_ncd_2 drop prev_id, drop cur_id, drop cur_encounter_type, drop cur_encounter_datetime, drop cur_clinical_rtc_date; + + + set @prev_id = null; + set @cur_id = null; + set @prev_encounter_type = null; + set @cur_encounter_type = null; + set @prev_encounter_datetime = null; + set @cur_encounter_datetime = null; + set @prev_clinical_datetime = null; + set @cur_clinical_datetime = null; + set @prev_clinical_location_id = null; + set @cur_clinical_location_id = null; + + drop temporary table if exists flat_ncd_3; + create temporary table flat_ncd_3 (prev_encounter_datetime datetime, prev_encounter_type int, index person_enc (person_id, encounter_datetime desc)) + (select + *, + @prev_id := @cur_id as prev_id, + @cur_id := t1.person_id as cur_id, + + case + when @prev_id=@cur_id then @prev_encounter_type := @cur_encounter_type + else @prev_encounter_type:=null + end as prev_encounter_type_ncd, + @cur_encounter_type := encounter_type as cur_encounter_type, + + case + when @prev_id=@cur_id then @prev_encounter_datetime := @cur_encounter_datetime + else @prev_encounter_datetime := null + end as prev_encounter_datetime_ncd, + + @cur_encounter_datetime := encounter_datetime as cur_encounter_datetime, + + case + when @prev_id = @cur_id then @prev_clinical_datetime := @cur_clinical_datetime + else @prev_clinical_datetime := null + end as prev_clinical_datetime_ncd, + + case + when @prev_id = @cur_id then @prev_clinical_location_id := @cur_clinical_location_id + else @prev_clinical_location_id := null + end as prev_clinical_location_id_ncd, + + case + when is_clinical_encounter then @cur_clinical_datetime := encounter_datetime + when @prev_id = @cur_id then @cur_clinical_datetime + else @cur_clinical_datetime := null + end as cur_clinical_datetime, + + case + when is_clinical_encounter then @cur_clinical_location_id := location_id + when @prev_id = @cur_id then @cur_clinical_location_id + else @cur_clinical_location_id := null + end as cur_clinical_location_id, + + case + when @prev_id = @cur_id then @prev_clinical_rtc_date := @cur_clinical_rtc_date + else @prev_clinical_rtc_date := null + end as prev_clinical_rtc_date_ncd, + + case + when is_clinical_encounter then @cur_clinical_rtc_date := cur_rtc_date + when @prev_id = @cur_id then @cur_clinical_rtc_date + else @cur_clinical_rtc_date:= null + end as cur_clinic_rtc_date + + from flat_ncd_2 t1 + order by person_id, date(encounter_datetime), encounter_type_sort_index + ); + + + + select count(*) into @new_encounter_rows from flat_ncd_3; + + select @new_encounter_rows; + set @total_rows_written = @total_rows_written + @new_encounter_rows; + select @total_rows_written; + + + #add data to table + SET @dyn_sql=CONCAT('replace into ',@write_table, + '(select + null, + person_id, + t1.uuid, + visit_id, + encounter_id, + encounter_datetime, + encounter_type, + is_clinical_encounter, + location_id, + t2.uuid as location_uuid, + death_date, + prev_rtc_date, + cur_rtc_date, + lmp, + sbp, + dbp, + pulse, + fbs, + rbs, + hb_a1c, + hb_a1c_date, + creatinine, + creatinine_date, + total_cholesterol, + hdl, + ldl, + triglycerides, + lipid_panel_date, + dm_status, + htn_status, + dm_meds, + htn_meds, + prescriptions, + problems, + comorbidities, + rheumatologic_disorder, + kidney_disease, + ckd_staging, + cardiovascular_disorder, + neurological_disease, + has_past_mhd_tx, + eligible_for_depression_care, + anxiety_condition, + convulsive_disorder, + mood_disorder, + indicated_mhd_tx, + prev_hbp_findings, + type_of_follow_up, + review_of_med_history, + psychiatric_exam_findings, + + prev_encounter_datetime_ncd, + next_encounter_datetime_ncd, + prev_encounter_type_ncd, + next_encounter_type_ncd, + prev_clinical_datetime_ncd, + next_clinical_datetime_ncd, + prev_clinical_location_id_ncd, + next_clinical_location_id_ncd, + prev_clinical_rtc_date_ncd, + next_clinical_rtc_date_ncd + + from flat_ncd_3 t1 + join amrs.location t2 using (location_id))'); + + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + #delete from @queue_table where person_id in (select person_id from flat_ncd_build_queue__0); + + SET @dyn_sql=CONCAT('delete t1 from ',@queue_table,' t1 join flat_ncd_build_queue__0 t2 using (person_id);'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + #select @person_ids_count := (select count(*) from flat_ncd_build_queue_2); + SET @dyn_sql=CONCAT('select count(*) into @person_ids_count from ',@queue_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + #select @person_ids_count as remaining_in_build_queue; + + set @cycle_length = timestampdiff(second,@loop_start_time,now()); + #select concat('Cycle time: ',@cycle_length,' seconds'); + set @total_time = @total_time + @cycle_length; + set @cycle_number = @cycle_number + 1; + + #select ceil(@person_ids_count / cycle_size) as remaining_cycles; + set @remaining_time = ceil((@total_time / @cycle_number) * ceil(@person_ids_count / cycle_size) / 60); + #select concat("Estimated time remaining: ", @remaining_time,' minutes'); + + select @person_ids_count as 'persons remaining', @cycle_length as 'Cycle time (s)', ceil(@person_ids_count / cycle_size) as remaining_cycles, @remaining_time as 'Est time remaining (min)'; + + end while; + + if(@query_type="build") then + SET @dyn_sql=CONCAT('drop table ',@queue_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @total_rows_to_write=0; + SET @dyn_sql=CONCAT("Select count(*) into @total_rows_to_write from ",@write_table); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + set @start_write = now(); + select concat(@start_write, " : Writing ",@total_rows_to_write, ' to ',@primary_table); + + SET @dyn_sql=CONCAT('replace into ', @primary_table, + '(select * from ',@write_table,');'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + set @finish_write = now(); + set @time_to_write = timestampdiff(second,@start_write,@finish_write); + select concat(@finish_write, ' : Completed writing rows. Time to write to primary table: ', @time_to_write, ' seconds '); + + SET @dyn_sql=CONCAT('drop table ',@write_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + end if; + + + set @ave_cycle_length = ceil(@total_time/@cycle_number); + select CONCAT('Average Cycle Length: ', @ave_cycle_length, ' second(s)'); + + set @end = now(); + insert into etl.flat_log values (@start,@last_date_created,@table_version,timestampdiff(second,@start,@end)); + select concat(@table_version," : Time to complete: ",timestampdiff(minute, @start, @end)," minutes"); + + END \ No newline at end of file diff --git a/etl-scripts/stored-procedures/generate_ncd_monthly_report_dataset.sql b/etl-scripts/stored-procedures/generate_ncd_monthly_report_dataset.sql new file mode 100644 index 0000000..8abb393 --- /dev/null +++ b/etl-scripts/stored-procedures/generate_ncd_monthly_report_dataset.sql @@ -0,0 +1,663 @@ +CREATE DEFINER=`hkorir`@`%` PROCEDURE `generate_ncd_monthly_report_dataset`(IN query_type varchar(50), IN queue_number int, IN queue_size int, IN cycle_size int, IN start_date varchar(50)) +BEGIN + + set @start = now(); + set @table_version = "ncd_monthly_report_dataset_v1.0"; + set @last_date_created = (select max(date_created) from etl.flat_ncd); + + set @sep = " ## "; + set @lab_encounter_type = 99999; + set @death_encounter_type = 31; + + CREATE TABLE IF NOT EXISTS ncd_monthly_report_dataset ( + `date_created` TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP NOT NULL, + `elastic_id` BIGINT, + `endDate` DATE, + `encounter_id` INT, + `person_id` INT, + `person_uuid` VARCHAR(100), + `birthdate` DATE, + `age` DOUBLE, + `gender` VARCHAR(1), + `location_id` INT, + `location_uuid` VARCHAR(100), + `clinic` VARCHAR(250), + `encounter_datetime` DATETIME, + `visit_this_month` TINYINT, + `is_hypertensive` TINYINT, + `htn_state` TINYINT, + `is_diabetic` TINYINT, + `dm_state` TINYINT, + `has_mhd` TINYINT, + `is_depressive_mhd` TINYINT, + `is_anxiety_mhd` TINYINT, + `is_bipolar_and_related_mhd` TINYINT, + `is_personality_mhd` TINYINT, + `is_feeding_and_eating_mhd` TINYINT, + `is_ocd_mhd` TINYINT, + `has_kd` TINYINT, + `is_ckd` TINYINT, + `ckd_stage` INT, + `has_cvd` TINYINT, + `is_heart_failure_cvd` TINYINT, + `is_myocardinal_infarction` TINYINT, + `has_neurological_disorder` TINYINT, + `has_stroke` TINYINT, + `is_stroke_haemorrhagic` TINYINT, + `is_stroke_ischaemic` TINYINT, + `has_migraine` TINYINT, + `has_seizure` TINYINT, + `has_epilepsy` TINYINT, + `has_convulsive_disorder` TINYINT, + `has_rheumatologic_disorder` TINYINT, + `has_arthritis` TINYINT, + `has_SLE` TINYINT, + PRIMARY KEY elastic_id (elastic_id), + INDEX person_enc_date (person_id , encounter_datetime), + INDEX person_report_date (person_id , endDate), + INDEX endDate_location_id (endDate , location_id), + INDEX date_created (date_created), + INDEX status_change (location_id , endDate) + ); + + if (query_type = "build") then + select "BUILDING......................."; + set @queue_table = concat("ncd_monthly_report_dataset_build_queue_",queue_number); + + SET @dyn_sql=CONCAT('Create table if not exists ',@queue_table,'(person_id int primary key) (select * from ncd_monthly_report_dataset_build_queue limit ', queue_size, ');'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @dyn_sql=CONCAT('delete t1 from ncd_monthly_report_dataset_build_queue t1 join ',@queue_table, ' t2 using (person_id)'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + end if; + + + if (query_type = "sync") then + set @queue_table = "ncd_monthly_report_dataset_sync_queue"; + CREATE TABLE IF NOT EXISTS ncd_monthly_report_dataset_sync_queue ( + person_id INT PRIMARY KEY + ); + + SELECT @last_update:=(SELECT MAX(date_updated) FROM etl.flat_log WHERE table_name = @table_version); + SELECT @last_update := IF(@last_update,@last_update,'1900-01-01'); + + replace into ncd_monthly_report_dataset_sync_queue + (select distinct person_id from flat_ncd where date_created >= @last_update); + end if; + + + SET @num_ids := 0; + SET @dyn_sql=CONCAT('select count(*) into @num_ids from ',@queue_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + SET @person_ids_count = 0; + SET @dyn_sql=CONCAT('select count(*) into @person_ids_count from ',@queue_table); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + SET @dyn_sql=CONCAT('delete t1 from ncd_monthly_report_dataset t1 join ',@queue_table,' t2 using (person_id);'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + set @total_time=0; + set @cycle_number = 0; + + while @person_ids_count > 0 do + + set @loop_start_time = now(); + + drop temporary table if exists ncd_monthly_report_dataset_build_queue__0; + create temporary table ncd_monthly_report_dataset_build_queue__0 (person_id int primary key); + + SET @dyn_sql=CONCAT('insert into ncd_monthly_report_dataset_build_queue__0 (select * from ',@queue_table,' limit ',cycle_size,');'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + set @age =null; + set @status = null; + drop temporary table if exists ncd_monthly_report_dataset_0; + create temporary table ncd_monthly_report_dataset_0 + ( + select + concat(date_format(endDate,"%Y%m"),person_id) as elastic_id, + endDate, + encounter_id, + person_id, + t3.uuid as person_uuid, + date(birthdate) as birthdate, + case + when timestampdiff(year,birthdate,endDate) > 0 then @age := round(timestampdiff(year,birthdate,endDate),0) + else @age :=round(timestampdiff(month,birthdate,endDate)/12,2) + end as age, + t3.gender, + t2.location_id, + t2.location_uuid, + encounter_datetime, + + if(encounter_datetime between date_format(endDate,"%Y-%m-01") and endDate,1,0) as visit_this_month, + + encounter_type, + + case + when htn_status = 7285 or htn_status = 7286 then 1 + when (comorbidities regexp '[[:<:]]903[[:>:]]') then 1 + when (prev_hbp_findings regexp '[[:<:]]1065[[:>:]]') then 1 + when (htn_meds is not null) then 1 + when (problems regexp '[[:<:]]903[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]903[[:>:]]') then 1 + else 0 + end as is_hypertensive, + + case + when ((sbp < 130) and (dbp < 80)) then 1 + when ((sbp >= 130) and (dbp >= 80)) then 2 + else 3 + end as htn_state, + + case + when dm_status = 7281 or dm_status = 7282 then 1 + when (comorbidities regexp '[[:<:]]175[[:>:]]') then 1 + when (dm_meds is not null) then 1 + when (problems regexp '[[:<:]]9324|175[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]175[[:>:]]') then 1 + else 0 + end as is_diabetic, + + case + when (hb_a1c <= 7) then 1 + when (hb_a1c > 7) then 2 + else 3 + end as dm_state, + + case + when (comorbidities regexp '[[:<:]]10860[[:>:]]') then 1 + when (indicated_mhd_tx is not null) then 1 + when (has_past_mhd_tx regexp '[[:<:]]1065[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]77|207[[:>:]]') then 1 + when (eligible_for_depression_care regexp '[[:<:]]1065[[:>:]]') then 1 + when (mood_disorder is not null) AND (mood_disorder not regexp '[[:<:]]1115[[:>:]]') then 1 + when (anxiety_condition is not null) AND (anxiety_condition not regexp '[[:<:]]1115[[:>:]]')then 1 + when (psychiatric_exam_findings is not null) AND (psychiatric_exam_findings not regexp '[[:<:]]1115[[:>:]]') then 1 + else 0 + end as has_mhd, + + case + when (eligible_for_depression_care regexp '[[:<:]]1065[[:>:]]') then 1 + when (mood_disorder regexp '[[:<:]]11278[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]207[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]207[[:>:]]') then 1 + when (psychiatric_exam_findings regexp '[[:<:]]207[[:>:]]') then 1 + else 0 + end as is_depressive_mhd, + + case + when (anxiety_condition is not null) AND (anxiety_condition not regexp '[[:<:]]1115[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]1443[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]207[[:>:]]') then 1 + when (psychiatric_exam_findings regexp '[[:<:]]1443[[:>:]]') then 1 + else 0 + end as is_anxiety_mhd, + + case + when (mood_disorder regexp '[[:<:]]7763[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]7763[[:>:]]') then 1 + else 0 + end as is_bipolar_and_related_mhd, + + case + when (mood_disorder regexp '[[:<:]]7763[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]11281[[:>:]]') then 1 + when (problems regexp '[[:<:]]467[[:>:]]') then 1 + else 0 + end as is_personality_mhd, + + null as is_feeding_and_eating_mhd, + + null as is_ocd_mhd, + + case + when (comorbidities regexp '[[:<:]]77[[:>:]]') then 1 + when (kidney_disease regexp '[[:<:]]1065[[:>:]]') then 1 + when (problems regexp '[[:<:]]8078|11684[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]6033|8078[[:>:]]') then 1 + else 0 + end as has_kd, + + case + when (problems regexp '[[:<:]]8078[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]8078[[:>:]]') then 1 + when (ckd_staging is not null) then 1 + else 0 + end as is_ckd, + + ckd_staging as ckd_stage, + + case + when (cardiovascular_disorder is not null) AND (cardiovascular_disorder not regexp '[[:<:]]1115[[:>:]]') then 1 + when (comorbidities regexp '[[:<:]]7971[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]7971|6237[[:>:]]') then 1 + else 0 + end as has_cvd, + + case + when (cardiovascular_disorder regexp '[[:<:]]1456[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]1456[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]7971[[:>:]]') then 1 + else 0 + end as is_heart_failure_cvd, + + case + when (cardiovascular_disorder regexp '[[:<:]]1535[[:>:]]') then 1 + else 0 + end as is_myocardinal_infarction, + + case + when (neurological_disease is not null) AND (neurological_disease not regexp '[[:<:]]1115[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]1456[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]7971[[:>:]]') then 1 + else 0 + end as has_neurological_disorder, + + case + when (cardiovascular_disorder regexp '[[:<:]]1878[[:>:]]') AND (cardiovascular_disorder not regexp '[[:<:]]1115[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]1456[[:>:]]') then 1 + when (review_of_med_history regexp '[[:<:]]7971[[:>:]]') then 1 + else 0 + end as has_stroke, + + null as is_stroke_haemorrhagic, + + null as is_stroke_ischaemic, + + case + when (problems regexp '[[:<:]]1477[[:>:]]') then 1 + when (neurological_disease regexp '[[:<:]]1477[[:>:]]') then 1 + else 0 + end as has_migraine, + + case + when (problems regexp '[[:<:]]206[[:>:]]') then 1 + when (neurological_disease regexp '[[:<:]]206[[:>:]]') then 1 + when (convulsive_disorder regexp '[[:<:]]206[[:>:]]') then 1 + else 0 + end as has_seizure, + + case + when (problems regexp '[[:<:]]155|11687[[:>:]]') then 1 + when (neurological_disease regexp '[[:<:]]155[[:>:]]') then 1 + when (convulsive_disorder regexp '[[:<:]]155[[:>:]]') then 1 + when (indicated_mhd_tx regexp '[[:<:]]155[[:>:]]') then 1 + else 0 + end as has_epilepsy, + + case + when (neurological_disease regexp '[[:<:]]10806[[:>:]]') then 1 + when (convulsive_disorder regexp '[[:<:]]155|10806[[:>:]]') then 1 + else 0 + end as has_convulsive_disorder, + + case + when (rheumatologic_disorder is not null) AND (rheumatologic_disorder not regexp '[[:<:]]1115[[:>:]]') then 1 + when (comorbidities regexp '[[:<:]]12293[[:>:]]') then 1 + else 0 + end as has_rheumatologic_disorder, + + case + when (rheumatologic_disorder regexp '[[:<:]]116[[:>:]]') then 1 + else 0 + end as has_arthritis, + + case + when (rheumatologic_disorder regexp '[[:<:]]12292[[:>:]]') then 1 + else 0 + end as has_SLE + + from etl.dates t1 + join etl.flat_ncd t2 + join amrs.person t3 using (person_id) + join etl.ncd_monthly_report_dataset_build_queue__0 t5 using (person_id) + + where + t2.encounter_datetime < date_add(endDate, interval 1 day) + -- and (t2.next_clinical_datetime_ncd is null or t2.next_clinical_datetime_ncd >= date_add(t1.endDate, interval 1 day) ) + -- and t2.is_clinical_encounter=1 + and t1.endDate between start_date and date_add(now(),interval 2 year) + order by person_id, endDate, sbp desc, dbp desc, hb_a1c desc + ); + + + set @prev_id = null; + set @cur_id = null; + + set @is_hypertensive = null; + set @has_mhd = null; + set @is_depressive_mhd = null; + set @is_anxiety_mhd = null; + set @is_bipolar_and_related_mhd = null; + set @is_personality_mhd = null; + set @is_feeding_and_eating_mhd = null; + set @is_ocd_mhd = null; + set @has_kd = null; + set @is_ckd = null; + set @ckd_stage = null; + set @has_cvd = null; + set @is_heart_failure_cvd = null; + set @is_myocardinal_infarction = null; + set @has_neurological_disorder = null; + set @has_stroke = null; + set @is_stroke_haemorrhagic = null; + set @is_stroke_ischaemic = null; + set @has_migraine = null; + set @has_seizure = null; + set @has_epilepsy = null; + set @has_convulsive_disorder = null; + set @has_rheumatologic_disorder = null; + set @has_arthritis = null; + set @has_SLE = null; + + drop temporary table if exists ncd_monthly_report_dataset_1; + create temporary table ncd_monthly_report_dataset_1 + ( + select + @prev_id := @cur_id as prev_id, + @cur_id := person_id as cur_id, + elastic_id, + endDate, + encounter_id, + person_id, + person_uuid, + birthdate, + age, + gender, + location_id, + location_uuid, + encounter_datetime, + + visit_this_month, + + encounter_type, + case + when (@prev_id = @cur_id AND (is_hypertensive = 1 OR @is_hypertensive = 1)) then @is_hypertensive := 1 + else @is_hypertensive := is_hypertensive + end as is_hypertensive, + htn_state, + case + when (@prev_id = @cur_id AND (is_diabetic = 1 OR @is_diabetic = 1)) then @is_diabetic := 1 + else @is_diabetic := is_diabetic + end as is_diabetic, + dm_state, + case + when (@prev_id = @cur_id AND (has_mhd = 1 OR @has_mhd = 1)) then @has_mhd := 1 + else @has_mhd := has_mhd + end as has_mhd, + case + when (@prev_id = @cur_id AND (is_depressive_mhd = 1 OR @is_depressive_mhd = 1)) then @is_depressive_mhd := 1 + else @is_depressive_mhd := is_depressive_mhd + end as is_depressive_mhd, + case + when (@prev_id = @cur_id AND (is_anxiety_mhd = 1 OR @is_anxiety_mhd = 1)) then @is_anxiety_mhd := 1 + else @is_anxiety_mhd := is_anxiety_mhd + end as is_anxiety_mhd, + + case + when (@prev_id = @cur_id AND (is_bipolar_and_related_mhd = 1 OR @is_bipolar_and_related_mhd = 1)) then @is_bipolar_and_related_mhd := 1 + else @is_bipolar_and_related_mhd := is_bipolar_and_related_mhd + end as is_bipolar_and_related_mhd, + + case + when (@prev_id = @cur_id AND (is_personality_mhd = 1 OR @is_personality_mhd = 1)) then @is_personality_mhd := 1 + else @is_personality_mhd := is_personality_mhd + end as is_personality_mhd, + + null as is_feeding_and_eating_mhd, + + null as is_ocd_mhd, + + case + when (@prev_id = @cur_id AND (has_kd = 1 OR @has_kd = 1)) then @has_kd := 1 + else @has_kd := has_kd + end as has_kd, + + case + when (@prev_id = @cur_id AND (is_ckd = 1 OR @is_ckd = 1)) then @is_ckd := 1 + else @is_ckd := is_ckd + end as is_ckd, + + case + when (@prev_id = @cur_id AND (@ckd_stage is null) AND (ckd_stage is not null)) then @ckd_stage := ckd_stage + when (@prev_id = @cur_id AND (@ckd_stage is not null) AND (ckd_stage is null)) then @ckd_stage + else @ckd_stage := ckd_stage + end as ckd_stage, + + case + when (@prev_id = @cur_id AND (has_cvd = 1 OR @has_cvd = 1)) then @has_cvd := 1 + else @has_cvd := has_cvd + end as has_cvd, + + case + when (@prev_id = @cur_id AND (is_heart_failure_cvd = 1 OR @is_heart_failure_cvd = 1)) then @is_heart_failure_cvd := 1 + else @is_heart_failure_cvd := is_heart_failure_cvd + end as is_heart_failure_cvd, + + case + when (@prev_id = @cur_id AND (is_myocardinal_infarction = 1 OR @is_myocardinal_infarction = 1)) then @is_myocardinal_infarction := 1 + else @is_myocardinal_infarction := is_myocardinal_infarction + end as is_myocardinal_infarction, + + case + when (@prev_id = @cur_id AND (has_neurological_disorder = 1 OR @has_neurological_disorder = 1)) then @has_neurological_disorder := 1 + else @has_neurological_disorder := has_neurological_disorder + end as has_neurological_disorder, + + case + when (@prev_id = @cur_id AND (has_stroke = 1 OR @has_stroke = 1)) then @has_stroke := 1 + else @has_stroke := has_stroke + end as has_stroke, + + is_stroke_haemorrhagic, + + is_stroke_ischaemic, + + case + when (@prev_id = @cur_id AND (has_migraine = 1 OR @has_migraine = 1)) then @has_migraine := 1 + else @has_migraine := has_migraine + end as has_migraine, + + case + when (@prev_id = @cur_id AND (has_seizure = 1 OR @has_seizure = 1)) then @has_seizure := 1 + else @has_seizure := has_seizure + end as has_seizure, + + case + when (@prev_id = @cur_id AND (has_epilepsy = 1 OR @has_epilepsy = 1)) then @has_epilepsy := 1 + else @has_epilepsy := has_epilepsy + end as has_epilepsy, + + case + when (@prev_id = @cur_id AND (has_convulsive_disorder = 1 OR @has_convulsive_disorder = 1)) then @has_convulsive_disorder := 1 + else @has_convulsive_disorder := has_convulsive_disorder + end as has_convulsive_disorder, + + case + when (@prev_id = @cur_id AND (has_rheumatologic_disorder = 1 OR @has_rheumatologic_disorder = 1)) then @has_rheumatologic_disorder := 1 + else @has_rheumatologic_disorder := has_rheumatologic_disorder + end as has_rheumatologic_disorder, + + case + when (@prev_id = @cur_id AND (has_arthritis = 1 OR @has_arthritis = 1)) then @has_arthritis := 1 + else @has_arthritis := has_arthritis + end as has_arthritis, + + case + when (@prev_id = @cur_id AND (has_SLE = 1 OR @has_SLE = 1)) then @has_SLE := 1 + else @has_SLE := has_SLE + end as has_SLE + + from ncd_monthly_report_dataset_0 + order by person_id, encounter_datetime, endDate + ); + + alter table ncd_monthly_report_dataset_1 drop prev_id, drop cur_id; + + set @prev_id = null; + set @cur_id = null; + set @prev_location_id = null; + set @cur_location_id = null; + drop temporary table if exists ncd_monthly_report_dataset_2; + create temporary table ncd_monthly_report_dataset_2 + (select + *, + @prev_id := @cur_id as prev_id, + @cur_id := person_id as cur_id, + + case + when @prev_id=@cur_id then @prev_location_id := @cur_location_id + else @prev_location_id := null + end as next_location_id, + + @cur_location_id := location_id as cur_location_id + + from ncd_monthly_report_dataset_1 + order by person_id, endDate desc + ); + + alter table ncd_monthly_report_dataset_2 drop prev_id, drop cur_id, drop cur_location_id; + + set @prev_id = null; + set @cur_id = null; + set @cur_location_id = null; + set @prev_location_id = null; + drop temporary table if exists ncd_monthly_report_dataset_3; + create temporary table ncd_monthly_report_dataset_3 + (select + *, + @prev_id := @cur_id as prev_id, + @cur_id := person_id as cur_id, + + case + when @prev_id=@cur_id then @prev_location_id := @cur_location_id + else @prev_location_id := null + end as prev_location_id, + + @cur_location_id := location_id as cur_location_id + + from ncd_monthly_report_dataset_2 + order by person_id, endDate + ); + + SELECT NOW(); + SELECT COUNT(*) AS num_rows_to_be_inserted FROM ncd_monthly_report_dataset_3; + + #add data to table + replace into ncd_monthly_report_dataset + (select + null, + elastic_id, + endDate, + encounter_id, + person_id, + person_uuid, + birthdate, + age, + gender, + location_id, + location_uuid, + t2.name as clinic, + encounter_datetime, + visit_this_month, + + is_hypertensive, + htn_state, + + is_diabetic, + dm_state, + + has_mhd, + is_depressive_mhd, + is_anxiety_mhd, + is_bipolar_and_related_mhd, + is_personality_mhd, + is_feeding_and_eating_mhd, + is_ocd_mhd, + + has_kd, + is_ckd, + ckd_stage, + + has_cvd, + is_heart_failure_cvd, + is_myocardinal_infarction, + + has_neurological_disorder, + has_stroke, + is_stroke_haemorrhagic, + is_stroke_ischaemic, + + has_migraine, + has_seizure, + has_epilepsy, + has_convulsive_disorder, + + has_rheumatologic_disorder, + has_arthritis, + has_SLE + + from ncd_monthly_report_dataset_3 t1 + join amrs.location t2 using (location_id) + ); + + + SET @dyn_sql=CONCAT('delete t1 from ',@queue_table,' t1 join ncd_monthly_report_dataset_build_queue__0 t2 using (person_id);'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + SET @dyn_sql=CONCAT('select count(*) into @person_ids_count from ',@queue_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + + + set @cycle_length = timestampdiff(second,@loop_start_time,now()); + + set @total_time = @total_time + @cycle_length; + set @cycle_number = @cycle_number + 1; + + set @remaining_time = ceil((@total_time / @cycle_number) * ceil(@person_ids_count / cycle_size) / 60); + set @num_in_nmrd := (select count(*) from ncd_monthly_report_dataset); + + SELECT + @num_in_nmrd AS num_in_nmrd, + @person_ids_count AS num_remaining, + @cycle_length AS 'Cycle time (s)', + CEIL(@person_ids_count / cycle_size) AS remaining_cycles, + @remaining_time AS 'Est time remaining (min)'; + end while; + + if(query_type = "build") then + SET @dyn_sql=CONCAT('drop table ',@queue_table,';'); + PREPARE s1 from @dyn_sql; + EXECUTE s1; + DEALLOCATE PREPARE s1; + end if; + + set @end = now(); + + #log the operation for next starting point + insert into etl.flat_log values (@start,@last_date_created,@table_version,timestampdiff(second,@start,@end)); + + SELECT CONCAT(@table_version, ' : Time to complete: ',TIMESTAMPDIFF(MINUTE, @start, @end),' minutes'); + +END \ No newline at end of file