diff --git a/dags/veda_data_pipeline/groups/discover_group.py b/dags/veda_data_pipeline/groups/discover_group.py index 422f7596..a17e985a 100644 --- a/dags/veda_data_pipeline/groups/discover_group.py +++ b/dags/veda_data_pipeline/groups/discover_group.py @@ -4,20 +4,21 @@ from airflow.models.variable import Variable from airflow.models.xcom import LazyXComAccess from airflow.operators.dummy_operator import DummyOperator as EmptyOperator -from airflow.decorators import task_group +from airflow.decorators import task_group, task +from airflow.models.baseoperator import chain from airflow.operators.python import BranchPythonOperator, PythonOperator, ShortCircuitOperator from airflow.utils.trigger_rule import TriggerRule -from airflow_multi_dagrun.operators import TriggerMultiDagRunOperator +from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator from veda_data_pipeline.utils.s3_discovery import ( s3_discovery_handler, EmptyFileListError ) -from veda_data_pipeline.groups.processing_group import subdag_process +from veda_data_pipeline.groups.processing_tasks import build_stac_kwargs, submit_to_stac_ingestor_task group_kwgs = {"group_id": "Discover", "tooltip": "Discover"} - -def discover_from_s3_task(ti, event={}, **kwargs): +@task +def discover_from_s3_task(ti=None, event={}, **kwargs): """Discover grouped assets/files from S3 in batches of 2800. Produce a list of such files stored on S3 to process. This task is used as part of the discover_group subdag and outputs data to EVENT_BUCKET. """ @@ -25,6 +26,7 @@ def discover_from_s3_task(ti, event={}, **kwargs): **event, **ti.dag_run.conf, } + # TODO test that this context var is available in taskflow last_successful_execution = kwargs.get("prev_start_date_success") if event.get("schedule") and last_successful_execution: config["last_successful_execution"] = last_successful_execution.isoformat() @@ -43,17 +45,15 @@ def discover_from_s3_task(ti, event={}, **kwargs): ) except EmptyFileListError as ex: print(f"Received an exception {ex}") - return [] + # TODO replace short circuit + return {} @task -def get_files_to_process(**kwargs): +def get_files_to_process(payload, ti=None): """Get files from S3 produced by the discovery task. Used as part of both the parallel_run_process_rasters and parallel_run_process_vectors tasks. """ - ti = kwargs.get("ti") - dynamic_group_id = ti.task_id.split(".")[0] - payload = ti.xcom_pull(task_ids=f"{dynamic_group_id}.discover_from_s3") - if isinstance(payload, LazyXComAccess): + if isinstance(payload, LazyXComAccess): # if used as part of a dynamic task mapping payloads_xcom = payload[0].pop("payload", []) payload = payload[0] else: @@ -66,52 +66,31 @@ def get_files_to_process(**kwargs): } for indx, payload_xcom in enumerate(payloads_xcom)] -def vector_raster_choice(ti): - """Choose whether to process rasters or vectors based on the payload.""" - payload = ti.dag_run.conf - dynamic_group_id = ti.task_id.split(".")[0] - - if payload.get("vector"): - return f"{dynamic_group_id}.parallel_run_process_generic_vectors" - if payload.get("vector_eis"): - return f"{dynamic_group_id}.parallel_run_process_vectors" - return f"{dynamic_group_id}.parallel_run_process_rasters" - +# this task group is defined for reference, but can not be used in expanded taskgroup maps @task_group def subdag_discover(event={}): - discover_from_s3 = ShortCircuitOperator( - task_id="discover_from_s3", - python_callable=discover_from_s3_task, - op_kwargs={"text": "Discover from S3", "event": event}, - trigger_rule=TriggerRule.NONE_FAILED, - provide_context=True, - ) + # Define operators for non-taskflow tasks + discover_from_s3 = discover_from_s3_task(event=event) - raster_vector_branching = BranchPythonOperator( - task_id="raster_vector_branching", - python_callable=vector_raster_choice, + submit_to_stac_ingestor = PythonOperator( + task_id="submit_to_stac_ingestor", + python_callable=submit_to_stac_ingestor_task, ) + + # define DAG using taskflow notation + discover_from_s3 = discover_from_s3_task(event=event) + get_files = get_files_to_process() + + chain(discover_from_s3, get_files) + + build_stac_kwargs_task = build_stac_kwargs.expand(event=get_files) + build_stac = EcsRunTaskOperator.partial( + task_id="build_stac" + ).expand_kwargs(build_stac_kwargs_task) - run_process_raster = subdag_process.expand(get_files_to_process()) + submit_to_stac_ingestor.expand(build_stac) - # TODO don't let me merge this without spending more time with vector ingest - run_process_vector = TriggerMultiDagRunOperator( - task_id="parallel_run_process_vectors", - trigger_dag_id="veda_ingest_vector", - python_callable=get_files_to_process, - ) - run_process_generic_vector = TriggerMultiDagRunOperator( - task_id="parallel_run_process_generic_vectors", - trigger_dag_id="veda_generic_ingest_vector", - python_callable=get_files_to_process, - ) - # extra no-op, needed to run in dynamic mapping context - end_discover = EmptyOperator(task_id="end_discover", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,) - discover_from_s3 >> raster_vector_branching >> [run_process_raster, run_process_vector, run_process_generic_vector] - run_process_raster >> end_discover - run_process_vector >> end_discover - run_process_generic_vector >> end_discover diff --git a/dags/veda_data_pipeline/groups/processing_group.py b/dags/veda_data_pipeline/groups/processing_group.py deleted file mode 100644 index c9329232..00000000 --- a/dags/veda_data_pipeline/groups/processing_group.py +++ /dev/null @@ -1,95 +0,0 @@ -import json -import logging -from datetime import timedelta - -import smart_open -from airflow.models.variable import Variable -from airflow.operators.python import PythonOperator -from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator -from airflow.utils.task_group import TaskGroup -from veda_data_pipeline.utils.submit_stac import ( - submission_handler, -) - -group_kwgs = {"group_id": "Process", "tooltip": "Process"} - - -def log_task(text: str): - logging.info(text) - - -def submit_to_stac_ingestor_task(ti): - """Submit STAC items to the STAC ingestor API.""" - print("Submit STAC ingestor") - event = json.loads(ti.xcom_pull(task_ids=f"{group_kwgs['group_id']}.build_stac")) - success_file = event["payload"]["success_event_key"] - with smart_open.open(success_file, "r") as _file: - stac_items = json.loads(_file.read()) - - for item in stac_items: - submission_handler( - event=item, - endpoint="/ingestions", - cognito_app_secret=Variable.get("COGNITO_APP_SECRET"), - stac_ingestor_api_url=Variable.get("STAC_INGESTOR_API_URL"), - ) - return event - -@task_group -def subdag_process(event={}, **kwargs): - if not event: - event = kwargs.get("dag_run").conf - mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) - build_stac = EcsRunTaskOperator( - task_id="build_stac", - trigger_rule="none_failed", - cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", - task_definition=f"{mwaa_stack_conf.get('PREFIX')}-tasks", - launch_type="FARGATE", - do_xcom_push=True, - execution_timeout=timedelta(minutes=60), - overrides={ - "containerOverrides": [ - { - "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", - "command": [ - "/usr/local/bin/python", - "handler.py", - "--payload", - "{}".format("{{ event }}"), - ], - "environment": [ - { - "name": "EXTERNAL_ROLE_ARN", - "value": Variable.get( - "ASSUME_ROLE_READ_ARN", default_var="" - ), - }, - { - "name": "BUCKET", - "value": "veda-data-pipelines-staging-lambda-ndjson-bucket", - }, - { - "name": "EVENT_BUCKET", - "value": mwaa_stack_conf.get("EVENT_BUCKET"), - }, - ], - "memory": 2048, - "cpu": 1024, - }, - ], - }, - network_configuration={ - "awsvpcConfiguration": { - "securityGroups": mwaa_stack_conf.get("SECURITYGROUPS"), - "subnets": mwaa_stack_conf.get("SUBNETS"), - }, - }, - awslogs_group=mwaa_stack_conf.get("LOG_GROUP_NAME"), - awslogs_stream_prefix=f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", # prefix with container name - ) - submit_to_stac_ingestor = PythonOperator( - task_id="submit_to_stac_ingestor", - python_callable=submit_to_stac_ingestor_task, - ) - build_stac >> submit_to_stac_ingestor diff --git a/dags/veda_data_pipeline/groups/processing_tasks.py b/dags/veda_data_pipeline/groups/processing_tasks.py new file mode 100644 index 00000000..e5cd0f1f --- /dev/null +++ b/dags/veda_data_pipeline/groups/processing_tasks.py @@ -0,0 +1,165 @@ +import json +import logging + +import smart_open +from airflow.models.variable import Variable +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator +from airflow.decorators import task_group, task +from veda_data_pipeline.utils.submit_stac import submission_handler + +group_kwgs = {"group_id": "Process", "tooltip": "Process"} + + +def log_task(text: str): + logging.info(text) + +@task() +def submit_to_stac_ingestor_task(built_stac:str): + """Submit STAC items to the STAC ingestor API.""" + event = json.loads(built_stac) + success_file = event["payload"]["success_event_key"] + with smart_open.open(success_file, "r") as _file: + stac_items = json.loads(_file.read()) + + for item in stac_items: + submission_handler( + event=item, + endpoint="/ingestions", + cognito_app_secret=Variable.get("COGNITO_APP_SECRET"), + stac_ingestor_api_url=Variable.get("STAC_INGESTOR_API_URL"), + ) + return event + +@task +def build_stac_kwargs(event={}): + """Build kwargs for the ECS operator.""" + mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) + if event: + intermediate = { + **event + } # this is dumb but it resolves the MappedArgument to a dict that can be JSON serialized + payload = json.dumps(intermediate) + else: + payload = "{{ task_instance.dag_run.conf }}" + + return { + "overrides": { + "containerOverrides": [ + { + "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", + "command": [ + "/usr/local/bin/python", + "handler.py", + "--payload", + payload, + ], + "environment": [ + { + "name": "EXTERNAL_ROLE_ARN", + "value": Variable.get( + "ASSUME_ROLE_READ_ARN", default_var="" + ), + }, + { + "name": "BUCKET", + "value": "veda-data-pipelines-staging-lambda-ndjson-bucket", + }, + { + "name": "EVENT_BUCKET", + "value": mwaa_stack_conf.get("EVENT_BUCKET"), + }, + ], + "memory": 2048, + "cpu": 1024, + }, + ], + }, + "network_configuration": { + "awsvpcConfiguration": { + "securityGroups": mwaa_stack_conf.get("SECURITYGROUPS"), + "subnets": mwaa_stack_conf.get("SUBNETS"), + }, + }, + "awslogs_group": mwaa_stack_conf.get("LOG_GROUP_NAME"), + "awslogs_stream_prefix": f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-stac-build", + } + +@task +def build_vector_kwargs(event={}): + """Build kwargs for the ECS operator.""" + mwaa_stack_conf = Variable.get( + "MWAA_STACK_CONF", default_var={}, deserialize_json=True + ) + vector_ecs_conf = Variable.get( + "VECTOR_ECS_CONF", default_var={}, deserialize_json=True + ) + + if event: + intermediate = { + **event + } + payload = json.dumps(intermediate) + else: + payload = "{{ task_instance.dag_run.conf }}" + + return { + "trigger_rule": "none_failed", + "cluster": f"{mwaa_stack_conf.get('PREFIX')}-cluster", + "task_definition": f"{mwaa_stack_conf.get('PREFIX')}-vector-tasks", + "launch_type": "FARGATE", + "do_xcom_push": True, + "execution_timeout": timedelta(minutes=120), + "overrides": { + "containerOverrides": [ + { + "name": f"{mwaa_stack_conf.get('PREFIX')}-veda-vector_ingest", + "command": [ + "/var/lang/bin/python", + "handler.py", + "--payload", + payload, + ], + "environment": [ + { + "name": "EXTERNAL_ROLE_ARN", + "value": Variable.get( + "ASSUME_ROLE_READ_ARN", default_var=None + ), + }, + { + "name": "AWS_REGION", + "value": mwaa_stack_conf.get("AWS_REGION"), + }, + { + "name": "VECTOR_SECRET_NAME", + "value": Variable.get("VECTOR_SECRET_NAME"), + }, + ], + }, + ], + }, + "network_configuration": { + "awsvpcConfiguration": { + "securityGroups": vector_ecs_conf.get("VECTOR_SECURITY_GROUP"), + "subnets": vector_ecs_conf.get("VECTOR_SUBNETS"), + }, + }, + "awslogs_group": mwaa_stack_conf.get("LOG_GROUP_NAME"), + "awslogs_stream_prefix": f"ecs/{mwaa_stack_conf.get('PREFIX')}-veda-vector_ingest", + } + + +@task_group +def subdag_process(event={}): + + build_stac = EcsRunTaskOperator.partial( + task_id="build_stac" + ).expand_kwargs(build_stac_kwargs(event=event)) + + submit_to_stac_ingestor = PythonOperator( + task_id="submit_to_stac_ingestor", + python_callable=submit_to_stac_ingestor_task, + ) + + build_stac >> submit_to_stac_ingestor \ No newline at end of file diff --git a/dags/veda_data_pipeline/utils/s3_discovery.py b/dags/veda_data_pipeline/utils/s3_discovery.py index 5a275701..4b164298 100644 --- a/dags/veda_data_pipeline/utils/s3_discovery.py +++ b/dags/veda_data_pipeline/utils/s3_discovery.py @@ -216,7 +216,7 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No key = f"s3://{bucket_output}/events/{collection}" records = 0 out_keys = [] - discovered = 0 + discovered = [] kwargs = assume_role(role_arn=role_arn) if role_arn else {} s3client = boto3.client("s3", **kwargs) @@ -277,13 +277,13 @@ def s3_discovery_handler(event, chunk_size=2800, role_arn=None, bucket_output=No if records == chunk_size: out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) records = 0 - discovered += len(payload["objects"]) + discovered.append(len(payload["objects"])) payload["objects"] = [] records += 1 if payload["objects"]: out_keys.append(generate_payload(s3_prefix_key=key, payload=payload)) - discovered += len(payload["objects"]) + discovered.append(len(payload["objects"])) # We need to make sure the payload isn't too large for ECS overrides try: del event["assets"] diff --git a/dags/veda_data_pipeline/utils/submit_stac.py b/dags/veda_data_pipeline/utils/submit_stac.py index 1d4edfca..3ec47955 100644 --- a/dags/veda_data_pipeline/utils/submit_stac.py +++ b/dags/veda_data_pipeline/utils/submit_stac.py @@ -109,7 +109,8 @@ def submission_handler( stac_item = event - if stac_item.get("dry_run"): + # TODO remove debug bypass + if stac_item.get("dry_run") or True: print("Dry run, not inserting, would have inserted:") print(json.dumps(stac_item, indent=2)) return @@ -122,7 +123,6 @@ def submission_handler( base_url=stac_ingestor_api_url, ) ingestor.submit(event=stac_item, endpoint=endpoint) - # print("Successfully submitted STAC item") if __name__ == "__main__": diff --git a/dags/veda_data_pipeline/veda_dataset_pipeline.py b/dags/veda_data_pipeline/veda_dataset_pipeline.py index d456a80a..905b96b3 100644 --- a/dags/veda_data_pipeline/veda_dataset_pipeline.py +++ b/dags/veda_data_pipeline/veda_dataset_pipeline.py @@ -1,10 +1,16 @@ import pendulum +from datetime import timedelta + from airflow import DAG from airflow.decorators import task from airflow.operators.dummy_operator import DummyOperator as EmptyOperator from airflow.utils.trigger_rule import TriggerRule +from airflow.models.variable import Variable +from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator + from veda_data_pipeline.groups.collection_group import collection_task_group -from veda_data_pipeline.groups.discover_group import subdag_discover +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process +from veda_data_pipeline.groups.processing_tasks import build_stac_kwargs, submit_to_stac_ingestor_task dag_doc_md = """ ### Dataset Pipeline @@ -70,11 +76,31 @@ def extract_discovery_items(**kwargs): } with DAG("veda_dataset_pipeline", params=template_dag_run_conf, **dag_args) as dag: + # ECS dependency variable + mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) + start = EmptyOperator(task_id="start", dag=dag) end = EmptyOperator(task_id="end", dag=dag) collection_grp = collection_task_group() - discover_grp = subdag_discover.expand(event=extract_discovery_items()) + discover = discover_from_s3_task.expand(event=extract_discovery_items()) + discover.set_upstream(collection_grp) # do not discover until collection exists + get_files = get_files_to_process(payload=discover) + build_stac_kwargs_task = build_stac_kwargs.expand(event=get_files) + # partial() is needed for the operator to be used with taskflow inputs + build_stac = EcsRunTaskOperator.partial( + task_id="build_stac", + execution_timeout=timedelta(minutes=60), + trigger_rule=TriggerRule.NONE_FAILED, + cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", + task_definition=f"{mwaa_stack_conf.get('PREFIX')}-tasks", + launch_type="FARGATE", + do_xcom_push=True + ).expand_kwargs(build_stac_kwargs_task) + # .output is needed coming from a non-taskflow operator + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac.output) + + collection_grp.set_upstream(start) + submit_stac.set_downstream(end) - start >> collection_grp >> discover_grp >> end diff --git a/dags/veda_data_pipeline/veda_discover_pipeline.py b/dags/veda_data_pipeline/veda_discover_pipeline.py index 37a5d520..19bb1048 100644 --- a/dags/veda_data_pipeline/veda_discover_pipeline.py +++ b/dags/veda_data_pipeline/veda_discover_pipeline.py @@ -1,8 +1,15 @@ import pendulum + +from datetime import timedelta from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.utils.trigger_rule import TriggerRule -from veda_data_pipeline.groups.discover_group import subdag_discover +from airflow.models.variable import Variable +from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator + +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process +from veda_data_pipeline.groups.processing_tasks import build_stac_kwargs, submit_to_stac_ingestor_task + dag_doc_md = """ ### Discover files from S3 @@ -46,7 +53,7 @@ "is_paused_upon_creation": False, } -templat_dag_run_conf = { +template_dag_run_conf = { "collection": "", "bucket": "", "prefix": "/", @@ -70,23 +77,44 @@ def get_discover_dag(id, event={}): - params_dag_run_conf = event or templat_dag_run_conf + params_dag_run_conf = event or template_dag_run_conf with DAG( id, schedule_interval=event.get("schedule"), params=params_dag_run_conf, **dag_args ) as dag: + # ECS dependency variable + mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) + start = DummyOperator(task_id="Start", dag=dag) end = DummyOperator( task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag ) + # define DAG using taskflow notation + + discover = discover_from_s3_task(event=event) + get_files = get_files_to_process(payload=discover) + build_stac_kwargs_task = build_stac_kwargs.expand(event=get_files) + # partial() is needed for the operator to be used with taskflow inputs + build_stac = EcsRunTaskOperator.partial( + task_id="build_stac", + execution_timeout=timedelta(minutes=60), + trigger_rule=TriggerRule.NONE_FAILED, + cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", + task_definition=f"{mwaa_stack_conf.get('PREFIX')}-tasks", + launch_type="FARGATE", + do_xcom_push=True + ).expand_kwargs(build_stac_kwargs_task) + # .output is needed coming from a non-taskflow operator + submit_stac = submit_to_stac_ingestor_task.expand(built_stac=build_stac.output) - discover_grp = subdag_discover(event) - - start >> discover_grp >> end + discover.set_upstream(start) + submit_stac.set_downstream(end) return dag + + a >> b >> c get_discover_dag("veda_discover") diff --git a/dags/veda_data_pipeline/veda_process_raster_pipeline.py b/dags/veda_data_pipeline/veda_process_raster_pipeline.py deleted file mode 100644 index 2555c6a9..00000000 --- a/dags/veda_data_pipeline/veda_process_raster_pipeline.py +++ /dev/null @@ -1,52 +0,0 @@ -import pendulum -from airflow import DAG -from airflow.operators.dummy_operator import DummyOperator -from airflow.utils.trigger_rule import TriggerRule -from veda_data_pipeline.groups.processing_group import subdag_process - -dag_doc_md = """ -### Build and submit stac -#### Purpose -This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API - -#### Notes -- This DAG can run with the following configuration
-```json -{ - "collection": "geoglam", - "prefix": "geoglam/", - "bucket": "veda-data-store-staging", - "filename_regex": "^(.*).tif$", - "discovery": "s3", - "datetime_range": "month", - "discovered": 33, - "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-.json" -} -``` -- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) -""" - -template_dag_run_conf = { - "collection": "", - "prefix": "/", - "bucket": "", - "filename_regex": "", - "discovery": "", - "datetime_range": "|", - "payload": "> process_grp >> end diff --git a/dags/veda_data_pipeline/veda_vector_pipeline.py b/dags/veda_data_pipeline/veda_vector_pipeline.py new file mode 100644 index 00000000..eaa46a24 --- /dev/null +++ b/dags/veda_data_pipeline/veda_vector_pipeline.py @@ -0,0 +1,79 @@ +import pendulum +from datetime import timedelta + +from airflow import DAG +from airflow.models.variable import Variable +from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator +from airflow.operators.dummy_operator import DummyOperator +from airflow.utils.trigger_rule import TriggerRule +from airflow.models.variable import Variable + +from veda_data_pipeline.groups.processing_tasks import build_vector_kwargs +from veda_data_pipeline.groups.discover_group import discover_from_s3_task, get_files_to_process + + +dag_doc_md = """ +### Build and submit stac +#### Purpose +This DAG is supposed to be triggered by `veda_discover`. But you still can trigger this DAG manually or through an API + +#### Notes +- This DAG can run with the following configuration
+```json +{ + "collection": "geoglam", + "prefix": "geoglam/", + "bucket": "veda-data-store-staging", + "filename_regex": "^(.*).tif$", + "discovery": "s3", + "datetime_range": "month", + "upload": false, + "cogify": false, + "discovered": 33, + "payload": "s3://veda-uah-sit-mwaa-853558080719/events/geoglam/s3_discover_output_6c46b57a-7474-41fe-977a-19d164531cdc.json" +} +``` +- [Supports linking to external content](https://github.com/NASA-IMPACT/veda-data-pipelines) +""" + +template_dag_run_conf = { + "collection": "", + "prefix": "/", + "bucket": "", + "filename_regex": "", + "discovery": "|cmr", + "datetime_range": "|", + "upload": " | true", + "cogify": "false | true" +} +dag_args = { + "start_date": pendulum.today("UTC").add(days=-1), + "schedule_interval": None, + "catchup": False, + "doc_md": dag_doc_md, +} + +with DAG(dag_id="veda_ingest_vector", params=template_dag_run_conf, **dag_args) as dag: + # ECS dependency variable + mwaa_stack_conf = Variable.get("MWAA_STACK_CONF", deserialize_json=True) + + start = DummyOperator(task_id="Start", dag=dag) + end = DummyOperator(task_id="End", trigger_rule=TriggerRule.ONE_SUCCESS, dag=dag) + + discover = discover_from_s3_task() + get_files = get_files_to_process(payload=discover) + build_vector_kwargs_task = build_vector_kwargs(event=get_files) + vector_ingest = EcsRunTaskOperator.partial( + task_id="ingest_vector" + execution_timeout=timedelta(minutes=60), + trigger_rule=TriggerRule.NONE_FAILED, + cluster=f"{mwaa_stack_conf.get('PREFIX')}-cluster", + task_definition=f"{mwaa_stack_conf.get('PREFIX')}-tasks", + launch_type="FARGATE", + do_xcom_push=True + ).expand_kwargs(build_vector_kwargs_task) + + discover.set_upstream(start) + vector_ingest.set_downstream(end) + + diff --git a/docker_tasks/build_stac/handler.py b/docker_tasks/build_stac/handler.py index 748e7eb4..bdfd9a1a 100644 --- a/docker_tasks/build_stac/handler.py +++ b/docker_tasks/build_stac/handler.py @@ -148,8 +148,8 @@ def stac_handler(payload_event): # For cloud watch log to work the task should stay alife for at least 30 s start = time() print(f"Start at {start}") - - payload_event = ast.literal_eval(args.payload) + print(args) + payload_event = json.loads(args.payload) building_stac_response = stac_handler(payload_event) response = json.dumps({**payload_event, **building_stac_response}) end = time() - start