Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEAT] Adiciona cadunico e geolocalizacao utilizando lib prefeitura-rio #1

Open
wants to merge 112 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
112 commits
Select commit Hold shift + click to select a range
29c3fec
feat: add cadunico and rafactor to prefeitura-rio package
d116626 Dec 4, 2023
fab45da
feat: add cadunico and rafactor to prefeitura-rio package
d116626 Dec 4, 2023
a85938d
feat: add address model
d116626 Dec 4, 2023
e7ffa50
feat: add address model
d116626 Dec 4, 2023
e94111b
feat: add cadunico geolocate
d116626 Dec 5, 2023
d2d6eec
fix: change agent label name
d116626 Dec 5, 2023
928b12d
chore: use prefeitura-rio repo instead of PyPI source
gabriel-milan Dec 8, 2023
fd90b7c
fix: add git dep to Dockerfile
gabriel-milan Dec 8, 2023
401cf1a
fix(deps): add missing extras
gabriel-milan Dec 8, 2023
9186237
fix: move assert dependencies to inside task
gabriel-milan Dec 8, 2023
1eda42f
fix: settings
gabriel-milan Dec 8, 2023
3af6c6c
chore: bump prefeitura-rio version
gabriel-milan Dec 8, 2023
9e7b3dc
chore: pump version
d116626 Dec 8, 2023
24d679a
chore: change source_table_ref
d116626 Dec 8, 2023
fdcfea0
chore: bump prefeitura-rio version
d116626 Dec 8, 2023
ad837a1
chore: update endereco model
d116626 Dec 8, 2023
b4a7b5c
chore: change constants agent
d116626 Dec 8, 2023
d498650
chore: change constants agent
d116626 Dec 8, 2023
f866b50
chore: inject bd credential
d116626 Dec 8, 2023
71d6798
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
996a311
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
8c0b40d
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
241b16a
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
9781c24
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
064d316
chore: pump prefeitura-rio version
d116626 Dec 8, 2023
a598262
chore: add state_handler
d116626 Dec 8, 2023
97b9315
chore: add state_handlers
d116626 Dec 8, 2023
9f585d9
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
c194515
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
85ffed4
chore: log infisical enviroment
d116626 Dec 9, 2023
5008cff
chore: log infisical enviroment
d116626 Dec 9, 2023
90fe930
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
2c890c7
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
1eba0dd
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
9f1fa79
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
565c97e
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
9154040
chore: add upstream and dataframe_to_csv_task path log
d116626 Dec 9, 2023
b857470
chore: add upstream and dataframe_to_csv_task path log
d116626 Dec 9, 2023
5e22279
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
4ed1cfd
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
0a7811e
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
3e5cf1c
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
383e269
chore: pump prefeitura-rio version
d116626 Dec 9, 2023
a0b7a14
chore: add more parameters to geolocate pipeline
d116626 Dec 11, 2023
ead04cd
chore: add new flow parameters and sort models before materialization
d116626 Dec 11, 2023
14d117b
chore: some refactors
d116626 Dec 11, 2023
f6482b0
fix: Database not imported
d116626 Dec 11, 2023
24b3fcf
fix: Database not imported
d116626 Dec 11, 2023
4d5f381
fix: Database not imported
d116626 Dec 11, 2023
ea88b4d
fix: Database not imported
d116626 Dec 11, 2023
d4335a6
fix: Database not imported
d116626 Dec 11, 2023
ec37ecd
fix: Database not imported
d116626 Dec 11, 2023
4e960d4
feat: add new parameter source_table_address_query
d116626 Dec 11, 2023
c907004
feat: add new parameter source_table_address_query
d116626 Dec 11, 2023
3b79624
feat: add new parameter source_table_address_query
d116626 Dec 11, 2023
68713db
feat: refresh models
d116626 Dec 11, 2023
8266f2a
feat: refresh models
d116626 Dec 11, 2023
7969d64
feat: refresh models
d116626 Dec 11, 2023
6b79c21
feat: refresh models
d116626 Dec 11, 2023
dba2c06
feat: refresh models
d116626 Dec 11, 2023
ac50700
feat: refresh models
d116626 Dec 11, 2023
f30bdf0
feat: refresh models
d116626 Dec 11, 2023
69bb56c
feat: refresh models
d116626 Dec 11, 2023
d75dab6
chore: trigger register
d116626 Dec 11, 2023
c123373
chore: trigger register
d116626 Dec 11, 2023
6f4f692
fix: registros model
d116626 Dec 11, 2023
57396dc
fix: registros model
d116626 Dec 11, 2023
3c532c5
fix: registros model
d116626 Dec 11, 2023
0488c63
fix: registros model
d116626 Dec 11, 2023
b3ae0bf
fix: registros model
d116626 Dec 12, 2023
53acbfc
chore: update models
d116626 Dec 13, 2023
84be2b4
fix: geolocate data filter
d116626 Dec 14, 2023
caf7192
fix: geolocate data filter
d116626 Dec 14, 2023
e1f5f7d
feat: pump prefeitura-rio version
d116626 Dec 19, 2023
da63c88
feat: pump prefeitura-rio version
d116626 Dec 19, 2023
0dc7d22
chore: pump prefeitura-rio version
d116626 Dec 19, 2023
a6d59b4
chore: update layout models
d116626 Feb 2, 2024
35a4855
feat: add utils queries
d116626 Feb 6, 2024
9d89496
feat: get execute dbt model from smas template
d116626 Feb 6, 2024
0c3c507
feat: get execute dbt model from smas template
d116626 Feb 6, 2024
ac3f54c
feat: get execute dbt model from smas template
d116626 Feb 6, 2024
4321331
chore: pump wait-on-check-action version
d116626 Feb 6, 2024
46cc5b1
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 6, 2024
065a461
fix: typo
d116626 Feb 6, 2024
07ad4c9
Merge branch 'staging/cadunico' of https://github.com/prefeitura-rio/…
d116626 Feb 6, 2024
6965752
chore: change template calls
d116626 Feb 7, 2024
89cdd88
chore: add new columns
d116626 Jul 29, 2024
0ee0e5f
chore: increase dbt timeout
d116626 Oct 30, 2024
90cc33f
fix: agent_label parameter now it is a list and not_set
d116626 Oct 30, 2024
8da618b
fix: agent_label parameter now it is a list and not_set
d116626 Oct 30, 2024
2baa6ac
fix: agent_label parameter now it is a list and not_set
d116626 Oct 30, 2024
c0917f4
feat: add identidade unica initial queries
d116626 Dec 10, 2024
b6afc54
chore: add identidade_unica table
d116626 Dec 11, 2024
4d97d79
chore: add identidade_unica table
d116626 Dec 11, 2024
945a763
chore: trigger identidade unica model register
d116626 Dec 11, 2024
b418a75
chore: trigger identidade unica model register
d116626 Dec 11, 2024
6fd8f8f
chore: trigger identidade unica model register
d116626 Dec 11, 2024
1f21225
chore: add column with original value for float64 decimal columns
d116626 Dec 11, 2024
2bd4177
chore: add column with original value for float64 decimal columns
d116626 Dec 11, 2024
320137a
chore: add column with original value for float64 decimal columns
d116626 Dec 11, 2024
5fe3508
chore: add escolaridade id dict
d116626 Dec 12, 2024
28cc684
feat: add more fields to identidade unica model
d116626 Dec 12, 2024
b540ac1
feat: add more fields to identidade unica model
d116626 Dec 12, 2024
35eb3e3
feat: add more fields to identidade unica model
d116626 Dec 12, 2024
34adc76
feat: add schema.yml to identidade unica model
d116626 Dec 12, 2024
84c0d9b
feat: add schema.yml to identidade unica model
d116626 Dec 12, 2024
ec49e3e
chore: use decimal ajust equal 1 for float fields
d116626 Dec 12, 2024
28601c7
chore: use decimal ajust equal 1 for float fields
d116626 Dec 12, 2024
5e42582
chore: remove the current person from members array, padronize names …
d116626 Dec 12, 2024
efa76dc
chore: deduplicate membros using cpf
d116626 Dec 12, 2024
73ce67f
chore: add app_identidade_unica.cadastros manualy to tables to be mat…
d116626 Dec 16, 2024
b6fb6e4
chore: remove app_identidade_unica.cadastros manualy to tables to be …
d116626 Dec 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .DS_Store
Binary file not shown.
2 changes: 1 addition & 1 deletion .github/workflows/cd_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ jobs:
echo $PREFECT_AUTH_TOML | base64 --decode > $HOME/.prefect/auth.toml

- name: Wait for Docker image to be available
uses: lewagon/wait-on-check-action@v1.3.1
uses: lewagon/wait-on-check-action@v1.3.3
with:
ref: ${{ github.event.pull_request.head.sha || github.sha }}
check-name: 'Build Docker image'
Expand Down
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@
# Start Python image
FROM python:${PYTHON_VERSION}

# Install git
RUN apt-get update && \

Check failure on line 8 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`

Check failure on line 8 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3015 info: Avoid additional packages by specifying `--no-install-recommends`
apt-get install -y git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Setting environment with prefect version
ARG PREFECT_VERSION=1.4.1
ENV PREFECT_VERSION $PREFECT_VERSION
Expand Down
3 changes: 3 additions & 0 deletions pipelines/cadunico/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from pipelines.cadunico.geolocate.flows import * # noqa: F401, F403
from pipelines.cadunico.ingest_raw.flows import * # noqa: F401, F403
Empty file.
46 changes: 46 additions & 0 deletions pipelines/cadunico/geolocate/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
"""
GEOLOCATE CADUNICO ADDRESS FLOW......
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.geolocate.flows import utils_geolocate_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.cadunico.geolocate.schedules import cadunico_geolocate_schedule
from pipelines.constants import constants

cadunico_geolocate_flow = deepcopy(utils_geolocate_flow)
cadunico_geolocate_flow.state_handlers = [handler_inject_bd_credentials]
cadunico_geolocate_flow.name = "GEOLOCATE: cadunico - Geolocalização de endereços do Cadunico"
cadunico_geolocate_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

cadunico_geolocate_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMAS_AGENT_LABEL.value,
],
)

cadunico_geolocate_default_parameters = {
"source_dataset_id": "protecao_social_cadunico",
"source_table_id": "endereco",
"source_table_address_column": "address",
"source_table_address_query": "SELECT DISTINCT address FROM `rj-smas.protecao_social_cadunico.endereco` LIMIT 10", # noqa
"use_source_table_address_query": False,
"destination_dataset_id": "protecao_social_cadunico",
"destination_table_id": "endereco_geolocalizado",
"georeference_mode": "distinct",
"retry_request_number": 5,
"retry_request_time": 60,
"time_between_requests": 2,
}
cadunico_geolocate_flow = set_default_parameters(
cadunico_geolocate_flow, default_parameters=cadunico_geolocate_default_parameters
)

cadunico_geolocate_flow.schedule = cadunico_geolocate_schedule
34 changes: 34 additions & 0 deletions pipelines/cadunico/geolocate/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
Schedules for cor
"""

from datetime import datetime, timedelta

from prefect.schedules import Schedule
from prefect.schedules.clocks import IntervalClock

from pipelines.constants import constants

cadunico_geolocate_schedule = Schedule(
clocks=[
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 1, 1),
labels=[
constants.RJ_SMAS_AGENT_LABEL.value,
],
parameter_defaults={
"source_dataset_id": "protecao_social_cadunico",
"source_table_id": "endereco",
"source_table_address_column": "address",
"destination_dataset_id": "protecao_social_cadunico",
"destination_table_id": "endereco_geolocalizado",
"georeference_mode": "distinct",
"retry_request_number": 5,
"retry_request_time": 60,
"time_between_requests": 2,
},
),
],
)
Empty file.
162 changes: 162 additions & 0 deletions pipelines/cadunico/ingest_raw/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
"""
CADUNICO: INGEST RAW FLOW........
"""

from prefect import Parameter, case
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefeitura_rio.core import settings
from prefeitura_rio.pipelines_utils.custom import Flow
from prefeitura_rio.pipelines_utils.prefect import (
task_get_current_flow_run_labels,
task_get_flow_group_id,
)
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.cadunico.ingest_raw.tasks import (
append_data_to_storage,
create_table_if_not_exists,
get_dbt_models_to_materialize_task,
get_existing_partitions,
get_files_to_ingest,
get_project_id_task,
ingest_file,
need_to_ingest,
)
from pipelines.constants import constants

with Flow(
name="CadUnico: Ingestão de dados brutos", state_handlers=[handler_inject_bd_credentials]
) as cadunico__ingest_raw__flow:
# Parameters
dataset_id = Parameter("dataset_id", default="protecao_social_cadunico")
table_id = Parameter("table_id", default="registro_familia")
dump_mode = Parameter("dump_mode", default="append")
biglake_table = Parameter("biglake_table", default=True)
ingested_files_output = Parameter("ingested_files_output", default="/tmp/ingested_files/")
prefix_raw_area = Parameter(
"prefix_raw_area", default="raw/protecao_social_cadunico/registro_familia"
)
prefix_staging_area = Parameter(
"prefix_staging_area", default="staging/protecao_social_cadunico/registro_familia"
)

materialize_after_dump = Parameter("materialize_after_dump", default=True, required=False)
layout_dataset_id = Parameter("layout_dataset_id", default="protecao_social_cadunico")
layout_table_id = Parameter("layout_table_id", default="layout")
layout_output_path = Parameter("layout_output_path", default="/tmp/cadunico/layout_parsed/")
force_create_models = Parameter("force_create_models", default=False, required=False)
force_materialize_harmonized_dbt_models = Parameter(
"force_materialize_harmonized_dbt_models", default=False, required=False
)

aditional_dbt_models_to_materialize = Parameter(
"aditional_dbt_models_to_materialize", default="endereco,bairro,endereco", required=False
)
# Tasks
project_id = get_project_id_task()
materialization_flow_id = task_get_flow_group_id(flow_name=settings.FLOW_NAME_EXECUTE_DBT_MODEL)
materialization_labels = task_get_current_flow_run_labels()

existing_partitions = get_existing_partitions(
prefix=prefix_staging_area, bucket_name=project_id
)
existing_partitions.set_upstream(project_id)

files_to_ingest = get_files_to_ingest(
prefix=prefix_raw_area, partitions=existing_partitions, bucket_name=project_id
)
files_to_ingest.set_upstream(existing_partitions)

need_to_ingest_bool = need_to_ingest(files_to_ingest=files_to_ingest)
need_to_ingest_bool.set_upstream(files_to_ingest)

with case(need_to_ingest_bool, True):
ingested_files = ingest_file.map(
blob=files_to_ingest, output_directory=unmapped(ingested_files_output)
)
ingested_files.set_upstream(need_to_ingest_bool)

create_table = create_table_if_not_exists(
dataset_id=dataset_id,
table_id=table_id,
biglake_table=biglake_table,
)
create_table.set_upstream(ingested_files)

append_data_to_gcs = append_data_to_storage(
data_path=ingested_files_output,
dataset_id=dataset_id,
table_id=table_id,
dump_mode=dump_mode,
biglake_table=biglake_table,
)

append_data_to_gcs.set_upstream(create_table)

with case(materialize_after_dump, True):
# PROD TABLES
dump_prod_tables_to_materialize_parameters = get_dbt_models_to_materialize_task(
dataset_id=dataset_id,
table_id=table_id,
project_id=project_id,
layout_dataset_id=layout_dataset_id,
layout_table_id=layout_table_id,
layout_output_path=layout_output_path,
force_create_models=force_create_models,
aditional_dbt_models_to_materialize=aditional_dbt_models_to_materialize,
)
dump_prod_tables_to_materialize_parameters.set_upstream(append_data_to_gcs)

dum_prod_materialization_flow_runs = create_flow_run.map(
flow_id=unmapped(materialization_flow_id),
parameters=dump_prod_tables_to_materialize_parameters,
labels=unmapped(materialization_labels),
)

dump_prod_wait_for_flow_run = wait_for_flow_run.map(
flow_run_id=dum_prod_materialization_flow_runs,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)
# materialize only prod dbt models
with case(force_materialize_harmonized_dbt_models, True):
# PROD TABLES
only_prod_tables_to_materialize_parameters = get_dbt_models_to_materialize_task(
dataset_id=dataset_id,
table_id=table_id,
project_id=project_id,
layout_dataset_id=layout_dataset_id,
layout_table_id=layout_table_id,
layout_output_path=layout_output_path,
force_create_models=force_create_models,
aditional_dbt_models_to_materialize=aditional_dbt_models_to_materialize,
)
only_prod_tables_to_materialize_parameters.set_upstream(need_to_ingest_bool)

only_prod_materialization_flow_runs = create_flow_run.map(
flow_id=unmapped(materialization_flow_id),
parameters=only_prod_tables_to_materialize_parameters,
labels=unmapped(materialization_labels),
)

only_prod_wait_for_flow_run = wait_for_flow_run.map(
flow_run_id=only_prod_materialization_flow_runs,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
)


# Storage and run configs
cadunico__ingest_raw__flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
cadunico__ingest_raw__flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMAS_AGENT_LABEL.value,
],
)
Loading
Loading