From 738c6ee779d059e9a80b2e045c42929c2b9c8d61 Mon Sep 17 00:00:00 2001 From: Tom White Date: Sat, 1 Jul 2023 13:16:37 +0100 Subject: [PATCH] Don't materialize input iterables to `map_unordered` (#242) (although Lithops will still materialize internally, see #239) --- cubed/runtime/executors/lithops.py | 4 ++-- cubed/runtime/executors/modal.py | 2 +- cubed/runtime/executors/modal_async.py | 2 +- cubed/runtime/executors/python_async.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cubed/runtime/executors/lithops.py b/cubed/runtime/executors/lithops.py index 6f1510f1..b4b3ec6d 100644 --- a/cubed/runtime/executors/lithops.py +++ b/cubed/runtime/executors/lithops.py @@ -195,7 +195,7 @@ def execute_dag(dag, callbacks=None, array_names=None, resume=None, **kwargs): for _, stats in map_unordered( executor, run_func, - list(stage.mappable), + stage.mappable, func=stage.function, config=pipeline.config, name=name, @@ -229,7 +229,7 @@ def stage_func(lithops_function_executor): for _, stats in map_unordered( lithops_function_executor, sf, - list(stage.mappable), + stage.mappable, use_backups=use_backups, return_stats=True, ): diff --git a/cubed/runtime/executors/modal.py b/cubed/runtime/executors/modal.py index e694aa2a..8c489ec9 100644 --- a/cubed/runtime/executors/modal.py +++ b/cubed/runtime/executors/modal.py @@ -107,7 +107,7 @@ def execute_dag( if stage.mappable is not None: task_create_tstamp = time.time() for _, stats in app_function.map( - list(stage.mappable), + stage.mappable, order_outputs=False, kwargs=dict(func=stage.function, config=pipeline.config), ): diff --git a/cubed/runtime/executors/modal_async.py b/cubed/runtime/executors/modal_async.py index d95875ff..e7fc74b3 100644 --- a/cubed/runtime/executors/modal_async.py +++ b/cubed/runtime/executors/modal_async.py @@ -135,7 +135,7 @@ async def async_execute_dag( if stage.mappable is not None: async for _, stats in map_unordered( app_function, - list(stage.mappable), + stage.mappable, return_stats=True, name=name, func=stage.function, diff --git a/cubed/runtime/executors/python_async.py b/cubed/runtime/executors/python_async.py index 743945d7..ce9a43e9 100644 --- a/cubed/runtime/executors/python_async.py +++ b/cubed/runtime/executors/python_async.py @@ -72,7 +72,7 @@ def pipeline_to_stream(concurrent_executor, name, pipeline, **kwargs): map_unordered, concurrent_executor, run_func, - list(stage.mappable), + stage.mappable, return_stats=True, name=name, func=stage.function,