Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mendanha] Adding gypscie preprocessing on pipeline #23

Open
wants to merge 66 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
ed034bb
adding gypscie preprocessing on flow
patriciacatandi Oct 9, 2024
5df9769
Merge branch 'staging/radar_mendanha' into staging/alertario_gypscie
patriciacatandi Oct 9, 2024
c93fbf6
adding parameters on scheduler
patriciacatandi Oct 9, 2024
d414816
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 9, 2024
1d7b4cb
adding preprocessing on mendanha flow
patriciacatandi Oct 9, 2024
6e63aa7
changin path where dfr was saved
patriciacatandi Oct 9, 2024
8f37f36
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 9, 2024
b3828c8
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
94f7bbf
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
e1decc2
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
c69468b
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
8edc143
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
7f4992b
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 10, 2024
af14bae
adding treatment version on gypscie register dataset
patriciacatandi Oct 17, 2024
1472377
adding treatment version on gypscie register dataset
patriciacatandi Oct 17, 2024
92f5e92
Merge branch 'staging/alertario_gypscie' of github.com:prefeitura-rio…
patriciacatandi Oct 17, 2024
e285859
adding functions to treat data on gypscie
patriciacatandi Oct 30, 2024
2ab57c9
changing gypscie tasks
patriciacatandi Oct 30, 2024
fad2954
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 30, 2024
631d375
bugfix"
patriciacatandi Oct 30, 2024
88c5a38
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 30, 2024
a70fc67
trying to solve TypeError: object of type 'Parameter' has no len()
patriciacatandi Oct 31, 2024
ff0b24f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 31, 2024
7bae005
Merge branch 'main' into staging/alertario_gypscie
mergify[bot] Oct 31, 2024
96ed1ad
trying to solve TypeError: object of type 'Parameter' has no len()
patriciacatandi Oct 31, 2024
abdcc4b
trying to solve TypeError: object of type 'Parameter' has no len()
patriciacatandi Oct 31, 2024
be6872b
bugfix
patriciacatandi Oct 31, 2024
b849697
fixing partition column name and save_data function
patriciacatandi Oct 31, 2024
6278169
bugfix
patriciacatandi Oct 31, 2024
a46a023
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 31, 2024
03b1de2
bugfix
patriciacatandi Oct 31, 2024
382527f
fixing gypscie api
patriciacatandi Oct 31, 2024
fb946ca
bugfix
patriciacatandi Oct 31, 2024
0aab912
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 31, 2024
1288be1
bugfix
patriciacatandi Oct 31, 2024
0601c6d
bugfix
patriciacatandi Oct 31, 2024
8d913cc
bugfix
patriciacatandi Oct 31, 2024
f0094ef
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 31, 2024
9506ed0
converting utc date to specific format
patriciacatandi Oct 31, 2024
26acf66
changing task_create_partitions
patriciacatandi Oct 31, 2024
987ab9f
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Oct 31, 2024
14e3b87
bugfix
patriciacatandi Oct 31, 2024
e5435a0
Merge branch 'staging/alertario_gypscie' of github.com:prefeitura-rio…
patriciacatandi Oct 31, 2024
e66fce7
bugfix
patriciacatandi Oct 31, 2024
bda27b7
renaming file to dados_alertario_raw
patriciacatandi Nov 1, 2024
d7c8e29
bugfix"
patriciacatandi Nov 1, 2024
0b2ad3d
bugfix"
patriciacatandi Nov 1, 2024
3856421
changing column type before registring dataset on gypscie
patriciacatandi Nov 1, 2024
f55b2b5
changin return of get function
patriciacatandi Nov 1, 2024
1d714c9
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 1, 2024
879e360
adding task to functions
patriciacatandi Nov 1, 2024
6929b46
bugfix
patriciacatandi Nov 1, 2024
0f73172
returning a path instead a list on task_create_partitions
patriciacatandi Nov 1, 2024
a43d044
changing path where to save table
patriciacatandi Nov 1, 2024
0649aa0
adding rename function and adapting code to treat radar data on gypscie
patriciacatandi Nov 4, 2024
b4ecf9e
forgot to add other changes
patriciacatandi Nov 4, 2024
787aaf2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 4, 2024
3c73291
trying to fix fail on flow registration
patriciacatandi Nov 4, 2024
ef7f92c
testing init
patriciacatandi Nov 4, 2024
cf69f93
chore: force register
patriciacatandi Nov 4, 2024
8be7b2f
adding utils on init
patriciacatandi Nov 4, 2024
49b39f3
removing paralelism from flow
patriciacatandi Nov 4, 2024
11356e6
changin flow run config
patriciacatandi Nov 4, 2024
6508243
rolling back modification on init
patriciacatandi Nov 4, 2024
e80364a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 4, 2024
a3cea13
fix: variable name
gabriel-milan Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ jobs:

- name: Register Prefect flows
run: |-
python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --schedule --filter-affected-flows
python .github/workflows/scripts/register_flows.py --project $PREFECT__SERVER__PROJECT --path pipelines/ --schedule --no-filter-affected-flows
1 change: 1 addition & 0 deletions pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ class constants(Enum):
}

# Infisical
INFISICAL_PATH = "/gypscie_dexl"
INFISICAL_URL = "URL"
INFISICAL_USERNAME = "USERNAME"
INFISICAL_PASSWORD = "PASSWORD"
Expand Down
207 changes: 196 additions & 11 deletions pipelines/meteorologia/precipitacao_alertario/flows.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
# -*- coding: utf-8 -*-
# pylint: disable=C0103
# pylint: disable=C0103, line-too-long
"""
Flows for precipitacao_alertario.
"""
from datetime import timedelta

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect import Parameter, case # pylint: disable=E0611, E0401
from prefect.run_configs import KubernetesRun # pylint: disable=E0611, E0401
from prefect.storage import GCS # pylint: disable=E0611, E0401
from prefect.tasks.prefect import ( # pylint: disable=E0611,E0401
create_flow_run,
wait_for_flow_run,
)
from prefeitura_rio.pipelines_utils.custom import Flow # pylint: disable=E0611, E0401

# pylint: disable=E0611, E0401
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials
from prefeitura_rio.pipelines_utils.tasks import ( # pylint: disable=E0611, E0401
create_table_and_upload_to_gcs,
get_now_datetime,
task_run_dbt_model_task,
)

Expand All @@ -23,6 +29,7 @@
from pipelines.meteorologia.precipitacao_alertario.schedules import minute_schedule
from pipelines.meteorologia.precipitacao_alertario.tasks import (
check_to_run_dbt,
convert_sp_timezone_to_utc,
download_data,
save_data,
save_last_dbt_update,
Expand All @@ -31,11 +38,31 @@
from pipelines.rj_escritorio.rain_dashboard.constants import (
constants as rain_dashboard_constants,
)
from pipelines.utils.constants import constants as utils_constants
from pipelines.tasks import task_create_partitions # pylint: disable=E0611, E0401

# from pipelines.utils.constants import constants as utils_constants
from pipelines.utils.custom import wait_for_flow_run_with_timeout
from pipelines.utils.dump_db.constants import constants as dump_db_constants

# from pipelines.utils.dump_db.constants import constants as dump_db_constants
from pipelines.utils.dump_to_gcs.constants import constants as dump_to_gcs_constants

# preprocessing imports
from pipelines.utils.gypscie.tasks import ( # pylint: disable=E0611, E0401
access_api,
add_caracterization_columns_on_dfr,
convert_columns_type,
download_datasets_from_gypscie,
execute_dataflow_on_gypscie,
get_dataflow_alertario_params,
get_dataset_info,
get_dataset_name_on_gypscie,
get_dataset_processor_info,
path_to_dfr,
register_dataset_on_gypscie,
rename_files,
unzip_files,
)

wait_for_flow_run_with_5min_timeout = wait_for_flow_run_with_timeout(timeout=timedelta(minutes=5))

with Flow(
Expand Down Expand Up @@ -73,6 +100,54 @@
default=dump_to_gcs_constants.MAX_BYTES_PROCESSED_PER_TABLE.value,
)

# Preprocessing gypscie parameters
preprocessing_gypscie = Parameter("preprocessing_gypscie", default=False, required=False)
# Gypscie parameters
workflow_id = Parameter("workflow_id", default=41, required=False)
environment_id = Parameter("environment_id", default=1, required=False)
domain_id = Parameter("domain_id", default=1, required=False)
project_id = Parameter("project_id", default=1, required=False)
project_name = Parameter("project_name", default="rionowcast_precipitation", required=False)
treatment_version = Parameter("treatment_version", default=1, required=False)

# Gypscie processor parameters
processor_name = Parameter("processor_name", default="etl_alertario22", required=True)
dataset_processor_id = Parameter("dataset_processor_id", default=43, required=False) # mudar

load_data_function_id = Parameter("load_data_function_id", default=53, required=False)
parse_date_time_function_id = Parameter(
"parse_date_time_function_id", default=54, required=False
)
drop_duplicates_function_id = Parameter(
"drop_duplicates_function_id", default=55, required=False
)
replace_inconsistent_values_function_id = Parameter(
"replace_inconsistent_values_function_id", default=56, required=False
)
add_lat_lon_function_id = Parameter("add_lat_lon_function_id", default=57, required=False)
save_data_function_id = Parameter("save_data_function_id", default=58, required=False)
rain_gauge_metadata_path = Parameter("rain_gauge_metadata_path", default=227, required=False)

# Parameters for saving data preprocessed on GCP
dataset_id_previsao_chuva = Parameter(
"dataset_id_previsao_chuva", default="clima_previsao_chuva", required=False
)
table_id_previsao_chuva = Parameter(
"table_id_previsao_chuva", default="preprocessamento_pluviometro_alertario", required=False
)

# Dataset parameters
station_type = Parameter("station_type", default="rain_gauge", required=False)
source = Parameter("source", default="alertario", required=False)

# Dataset path, if it was saved on ETL flow or it will be None
dataset_path = Parameter("dataset_path", default=None, required=False) # dataset_path
model_version = Parameter("model_version", default=1, required=False)

#########################
# Start alertario flow #
#########################

dfr_pluviometric, dfr_meteorological = download_data()
(dfr_pluviometric, empty_data_pluviometric,) = treat_pluviometer_and_meteorological_data(
dfr=dfr_pluviometric,
Expand All @@ -88,8 +163,11 @@
)

with case(empty_data_pluviometric, False):
path_pluviometric = save_data(
dfr_pluviometric, "pluviometric", wait=empty_data_pluviometric
path_pluviometric, full_path_pluviometric = save_data(
dfr_pluviometric,
data_name="pluviometric",
treatment_version=treatment_version,
wait=empty_data_pluviometric,
)
# Create table in BigQuery
UPLOAD_TABLE = create_table_and_upload_to_gcs(
Expand Down Expand Up @@ -321,8 +399,8 @@

# Save and materialize meteorological data
with case(empty_data_meteorological, False):
path_meteorological = save_data(
dfr_meteorological, "meteorological", wait=empty_data_meteorological
path_meteorological, full_path_meteorological = save_data(
dfr_meteorological, data_name="meteorological", wait=empty_data_meteorological
)
# Create table in BigQuery
UPLOAD_TABLE_METEOROLOGICAL = create_table_and_upload_to_gcs(
Expand Down Expand Up @@ -366,6 +444,113 @@
# raise_final_state=True,
# )

####################################
# Start preprocessing flow #
####################################

with case(empty_data_pluviometric, False):
with case(preprocessing_gypscie, True):
api = access_api()

dataset_info = get_dataset_info(station_type, source)

# Get processor information on gypscie
with case(dataset_processor_id, None):
dataset_processor_response, dataset_processor_id = get_dataset_processor_info(
api, processor_name
)
dfr_pluviometric_converted = convert_columns_type(
dfr_pluviometric, columns=["id_estacao"], new_types=[int]
)
dfr_pluviometric_gypscie = convert_sp_timezone_to_utc(dfr_pluviometric_converted)
path_pluviometric_gypscie, full_path_pluviometric_gypscie = save_data(
dfr_pluviometric_gypscie,
data_name="gypscie",
columns=["id_estacao", "data_medicao", "acumulado_chuva_5min"],
data_type="parquet",
suffix=False,
)
full_path_pluviometric_gypscie_ = rename_files(
full_path_pluviometric_gypscie, rename="dados_alertario_raw"
)
register_dataset_response = register_dataset_on_gypscie(
api, filepath=full_path_pluviometric_gypscie_[0], domain_id=domain_id
)

model_params = get_dataflow_alertario_params(
workflow_id=workflow_id,
environment_id=environment_id,
project_id=project_id,
rain_gauge_data_id=register_dataset_response["id"],
rain_gauge_metadata_path=rain_gauge_metadata_path,
load_data_funtion_id=load_data_function_id,
parse_date_time_function_id=parse_date_time_function_id,
drop_duplicates_function_id=drop_duplicates_function_id,
replace_inconsistent_values_function_id=replace_inconsistent_values_function_id,
add_lat_lon_function_id=add_lat_lon_function_id,
save_data_function_id=save_data_function_id,
)

# Send dataset ids to gypscie to get predictions
output_dataset_ids = execute_dataflow_on_gypscie(
api,
model_params,
)

# dataset_processor_task_id = execute_dataset_processor(
# api,
# processor_id=dataset_processor_id,
# dataset_id=[dataset_response["id"]],
# environment_id=environment_id,
# project_id=project_id,
# parameters=processor_parameters,
# )
# wait_run = task_wait_run(api, dataset_processor_task_id, flow_type="processor")
# dataset_path = download_datasets_from_gypscie(
# api, dataset_names=[dataset_response["id"]], wait=wait_run
# )
dataset_names = get_dataset_name_on_gypscie(api, output_dataset_ids) # new
ziped_dataset_paths = download_datasets_from_gypscie(api, dataset_names=dataset_names)
dataset_paths = unzip_files(ziped_dataset_paths)
dfr_gypscie_ = path_to_dfr(dataset_paths)
# output_datasets_id = get_output_dataset_ids_on_gypscie(api, dataset_processor_task_id)
dfr_gypscie = add_caracterization_columns_on_dfr(
dfr_gypscie_, model_version, update_time=True
)

# Save pre-treated data on local file with partitions
now_datetime = get_now_datetime()
prediction_data_path, prediction_data_full_path = task_create_partitions(
data=dfr_gypscie,
partition_date_column=dataset_info["partition_date_column"],
savepath="model_prediction",
suffix=now_datetime,
wait=dfr_gypscie,
)

################################
# Save preprocessing on GCP #
################################

# Upload data to BigQuery
create_table = create_table_and_upload_to_gcs(
data_path=prediction_data_path,
dataset_id=dataset_id_previsao_chuva,
table_id=table_id_previsao_chuva,
dump_mode=DUMP_MODE,
biglake_table=False,
)

# Trigger DBT flow run
with case(MATERIALIZE_AFTER_DUMP, True):
run_dbt = task_run_dbt_model_task(
dataset_id=dataset_id_previsao_chuva,
table_id=table_id_previsao_chuva,
# mode=materialization_mode,
# materialize_to_datario=materialize_to_datario,
)
run_dbt.set_upstream(create_table)

# para rodar na cloud
cor_meteorologia_precipitacao_alertario.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cor_meteorologia_precipitacao_alertario.run_config = KubernetesRun(
Expand Down
22 changes: 22 additions & 0 deletions pipelines/meteorologia/precipitacao_alertario/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@
"materialize_to_datario": False,
"mode": "prod",
"dump_to_gcs": False,
"maximum_bytes_processed": None,
"preprocessing_gypscie": True,
"workflow_id": 41,
"environment_id": 1,
"domain_id": 1,
"project_id": 1,
"project_name": "rionowcast_precipitation",
"treatment_version": 1,
"processor_name": "etl_alertario22",
"dataset_processor_id": 43,
"load_data_function_id": 53,
"parse_date_time_function_id": 54,
"drop_duplicates_function_id": 55,
"replace_inconsistent_values_function_id": 56,
"add_lat_lon_function_id": 57,
"save_data_function_id": 58,
"rain_gauge_metadata_path": 227,
"dataset_id_previsao_chuva": "clima_previsao_chuva",
"table_id_previsao_chuva": "preprocessamento_pluviometro_alertario",
"station_type": "rain_gauge",
"source": "alertario",
"model_version": 1,
},
),
]
Expand Down
Loading
Loading