diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index 3423364d..4ea5338e 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -265,6 +265,8 @@ def execute_single_node( # # There are a few entry points that make graph dynamically and do not have a dag defined statically. # run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_id, full=False) # run_context.dag = graph.create_graph(run_log.run_config["pipeline"]) + assert run_context.dag + map_variable_dict = utils.json_to_ordered_dict(map_variable) step_internal_name = nodes.BaseNode._get_internal_name_from_command_name(step_name) diff --git a/runnable/extensions/executor/argo/implementation.py b/runnable/extensions/executor/argo/implementation.py index 39a6cc9d..6b0193f1 100644 --- a/runnable/extensions/executor/argo/implementation.py +++ b/runnable/extensions/executor/argo/implementation.py @@ -885,7 +885,7 @@ def create_container_template( if working_on.name == self._context.dag.start_at and self.expose_parameters_as_inputs: for key, value in self._get_parameters().items(): - value = value.get_value() + value = value.get_value() # type: ignore # Get the value from work flow parameters for dynamic behavior if isinstance(value, int) or isinstance(value, float) or isinstance(value, str): env_var = EnvVar( @@ -1087,11 +1087,11 @@ def execute_graph(self, dag: Graph, map_variable: Optional[dict] = None, **kwarg # Expose "simple" parameters as workflow arguments for dynamic behavior if self.expose_parameters_as_inputs: for key, value in self._get_parameters().items(): - value = value.get_value() + value = value.get_value() # type: ignore if isinstance(value, dict) or isinstance(value, list): continue - env_var = EnvVar(name=key, value=value) + env_var = EnvVar(name=key, value=value) # type: ignore arguments.append(env_var) run_id_var = EnvVar(name="run_id", value="{{workflow.uid}}")