diff --git a/examples/common/functions.py b/examples/common/functions.py index 489f1a64..0c475026 100644 --- a/examples/common/functions.py +++ b/examples/common/functions.py @@ -32,7 +32,6 @@ def read_initial_params_as_pydantic( pydantic_param: ComplexParams, envvar: str, ): - print(envvar) assert integer == 1 assert floater == 3.14 assert stringer == "hello" @@ -132,14 +131,14 @@ def read_processed_chunk(chunk: int, processed_python: int, processed_notebook: A downstream step of process_chunk of map state which reads the processed chunk. Since the process_chunk returns the chunk multiplied by 10, we assert that. """ - assert chunk * 10 == processed_python + assert int(chunk) * 10 == processed_python assert processed_python * 10 == processed_notebook assert processed_notebook * 10 == processed_shell def assert_default_reducer( processed_python: List[int], processed_notebook: List[int], processed_shell: List[int], chunks: List[int] -) -> int: +): """ Demonstrates the default reducer which just returns the list of processed chunks. """ @@ -148,9 +147,7 @@ def assert_default_reducer( assert processed_shell == [chunk * 1000 for chunk in chunks] -def assert_custom_reducer( - processed_python: int, processed_notebook: int, processed_shell: int, chunks: List[int] -) -> int: +def assert_custom_reducer(processed_python: int, processed_notebook: int, processed_shell: int, chunks: List[int]): """ Asserts the custom reducer returns the max of all the processed chunks. """ diff --git a/examples/common/process_chunk.ipynb b/examples/common/process_chunk.ipynb index 2ac01390..55d705c6 100644 --- a/examples/common/process_chunk.ipynb +++ b/examples/common/process_chunk.ipynb @@ -15,6 +15,16 @@ "processed_python = None" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "a8dc8e64", + "metadata": {}, + "outputs": [], + "source": [ + "chunk = int(chunk)" + ] + }, { "cell_type": "code", "execution_count": null, diff --git a/examples/configs/argo-config.yaml b/examples/configs/argo-config.yaml index 33a27986..95d27953 100644 --- a/examples/configs/argo-config.yaml +++ b/examples/configs/argo-config.yaml @@ -8,7 +8,7 @@ executor: mount_path: /mnt run_log_store: # (4) - type: file-system + type: chunked-fs config: log_folder: /mnt/run_log_store diff --git a/runnable/__init__.py b/runnable/__init__.py index 3adf0b74..fec5e87d 100644 --- a/runnable/__init__.py +++ b/runnable/__init__.py @@ -12,7 +12,7 @@ dictConfig(defaults.LOGGING_CONFIG) logger = logging.getLogger(defaults.LOGGER_NAME) -console = Console() +console = Console(record=True) console.print(":runner: Lets go!!") from runnable.sdk import ( # noqa @@ -30,7 +30,8 @@ pickled, ) -os.environ["_PLOOMBER_TELEMETRY_DEBUG"] = "false" +# Needed to disable ploomber telemetry +os.environ["PLOOMBER_STATS_ENABLED"] = "false" ## TODO: Summary should be a bit better for catalog. ## If the execution fails, hint them about the retry executor. diff --git a/runnable/entrypoints.py b/runnable/entrypoints.py index 4a1b6da8..4ea5338e 100644 --- a/runnable/entrypoints.py +++ b/runnable/entrypoints.py @@ -19,18 +19,11 @@ def get_default_configs() -> RunnableConfig: """ User can provide extensions as part of their code base, runnable-config.yaml provides the place to put them. """ - user_configs = {} + user_configs: RunnableConfig = {} if utils.does_file_exist(defaults.USER_CONFIG_FILE): - user_configs = utils.load_yaml(defaults.USER_CONFIG_FILE) + user_configs = cast(RunnableConfig, utils.load_yaml(defaults.USER_CONFIG_FILE)) - if not user_configs: - return {} - - user_defaults = user_configs.get("defaults", {}) - if user_defaults: - return user_defaults - - return {} + return user_configs def prepare_configurations( @@ -198,6 +191,7 @@ def execute( run_context.progress = progress executor.execute_graph(dag=run_context.dag) # type: ignore + # Non local executors have no run logs if not executor._local: executor.send_return_code(stage="traversal") return @@ -245,6 +239,8 @@ def execute_single_node( """ from runnable import nodes + console.print(f"Executing the single node: {step_name} with map variable: {map_variable}") + configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file) run_context = prepare_configurations( @@ -264,19 +260,32 @@ def execute_single_node( executor.prepare_for_node_execution() - if not run_context.dag: - # There are a few entry points that make graph dynamically and do not have a dag defined statically. - run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_id, full=False) - run_context.dag = graph.create_graph(run_log.run_config["pipeline"]) - - step_internal_name = nodes.BaseNode._get_internal_name_from_command_name(step_name) + # TODO: may be make its own entry point + # if not run_context.dag: + # # There are a few entry points that make graph dynamically and do not have a dag defined statically. + # run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_id, full=False) + # run_context.dag = graph.create_graph(run_log.run_config["pipeline"]) + assert run_context.dag map_variable_dict = utils.json_to_ordered_dict(map_variable) + step_internal_name = nodes.BaseNode._get_internal_name_from_command_name(step_name) node_to_execute, _ = graph.search_node_by_internal_name(run_context.dag, step_internal_name) logger.info("Executing the single node of : %s", node_to_execute) - executor.execute_node(node=node_to_execute, map_variable=map_variable_dict) + ## This step is where we save the log file + try: + executor.execute_node(node=node_to_execute, map_variable=map_variable_dict) + except Exception: # noqa: E722 + log_file_name = utils.make_log_file_name( + node=node_to_execute, + map_variable=map_variable_dict, + ) + console.save_text(log_file_name) + + # Put the log file in the catalog + run_context.catalog_handler.put(name=log_file_name, run_id=run_context.run_id) + os.remove(log_file_name) executor.send_return_code(stage="execution") diff --git a/runnable/extensions/executor/argo/implementation.py b/runnable/extensions/executor/argo/implementation.py index ef894e7b..6b0193f1 100644 --- a/runnable/extensions/executor/argo/implementation.py +++ b/runnable/extensions/executor/argo/implementation.py @@ -5,7 +5,7 @@ import string from abc import ABC, abstractmethod from collections import OrderedDict -from typing import Any, Dict, List, Optional, Union, cast +from typing import Dict, List, Optional, Union, cast from pydantic import ( BaseModel, @@ -19,7 +19,7 @@ from ruamel.yaml import YAML from typing_extensions import Annotated -from runnable import defaults, exceptions, integration, parameters, utils +from runnable import defaults, exceptions, integration, utils from runnable.defaults import TypeMapVariable from runnable.extensions.executor import GenericExecutor from runnable.extensions.nodes import DagNode, MapNode, ParallelNode @@ -378,6 +378,7 @@ def render(self, list_of_iter_values: Optional[List] = None): self.node, over_write_run_id=self.executor._run_id_placeholder, map_variable=map_variable, + log_level=self.executor._log_level, ) inputs = [] @@ -502,12 +503,16 @@ def render(self, list_of_iter_values: Optional[List] = None): self.node = cast(MapNode, self.node) task_template_arguments = [] dag_inputs = [] - if list_of_iter_values: - for value in list_of_iter_values: - task_template_arguments.append(Argument(name=value, value="{{inputs.parameters." + value + "}}")) - dag_inputs.append(Parameter(name=value)) + + if not list_of_iter_values: + list_of_iter_values = [] + + for value in list_of_iter_values: + task_template_arguments.append(Argument(name=value, value="{{inputs.parameters." + value + "}}")) + dag_inputs.append(Parameter(name=value)) clean_name = self.executor.get_clean_name(self.node) + fan_out_template = self.executor._create_fan_out_template( composite_node=self.node, list_of_iter_values=list_of_iter_values ) @@ -518,9 +523,6 @@ def render(self, list_of_iter_values: Optional[List] = None): ) fan_in_template.arguments = task_template_arguments if task_template_arguments else None - if not list_of_iter_values: - list_of_iter_values = [] - list_of_iter_values.append(self.node.iterate_as) self.executor._gather_task_templates_of_dag( @@ -580,8 +582,12 @@ class Spec(BaseModel): node_selector: Optional[Dict[str, str]] = Field(default_factory=dict, serialization_alias="nodeSelector") tolerations: Optional[List[Toleration]] = Field(default=None, serialization_alias="tolerations") parallelism: Optional[int] = Field(default=None, serialization_alias="parallelism") + # TODO: This has to be user driven - pod_gc: Dict[str, str] = Field(default={"strategy": "OnPodCompletion"}, serialization_alias="podGC") + pod_gc: Dict[str, str] = Field( + default={"strategy": "OnPodSuccess", "deleteDelayDuration": "600s"}, + serialization_alias="podGC", + ) retry_strategy: Retry = Field(default=Retry(), serialization_alias="retryStrategy") service_account_name: Optional[str] = Field(default=None, serialization_alias="serviceAccountName") @@ -674,6 +680,8 @@ class ArgoExecutor(GenericExecutor): service_name: str = "argo" _local: bool = False + # TODO: Add logging level as option. + model_config = ConfigDict(extra="forbid") image: str @@ -719,6 +727,7 @@ class ArgoExecutor(GenericExecutor): persistent_volumes: List[UserVolumeMounts] = Field(default_factory=list) _run_id_placeholder: str = "{{workflow.parameters.run_id}}" + _log_level: str = "{{workflow.parameters.log_level}}" _container_templates: List[ContainerTemplate] = [] _dag_templates: List[DagTemplate] = [] _clean_names: Dict[str, str] = {} @@ -828,17 +837,7 @@ def fan_out(self, node: BaseNode, map_variable: TypeMapVariable = None): iterate_on = self._context.run_log_store.get_parameters(self._context.run_id)[node.iterate_on] with open("/tmp/output.txt", mode="w", encoding="utf-8") as myfile: - json.dump(iterate_on, myfile, indent=4) - - def _get_parameters(self) -> Dict[str, Any]: - params = {} - if self._context.parameters_file: - # Parameters from the parameters file if defined - params.update(utils.load_yaml(self._context.parameters_file)) - # parameters from environment variables supersede file based - params.update(parameters.get_user_set_parameters()) - - return params + json.dump(iterate_on.get_value(), myfile, indent=4) def sanitize_name(self, name): return name.replace(" ", "-").replace(".", "-").replace("_", "-") @@ -886,6 +885,7 @@ def create_container_template( if working_on.name == self._context.dag.start_at and self.expose_parameters_as_inputs: for key, value in self._get_parameters().items(): + value = value.get_value() # type: ignore # Get the value from work flow parameters for dynamic behavior if isinstance(value, int) or isinstance(value, float) or isinstance(value, str): env_var = EnvVar( @@ -943,6 +943,7 @@ def _create_fan_out_template(self, composite_node, list_of_iter_values: Optional node=composite_node, run_id=self._run_id_placeholder, map_variable=map_variable, + log_level=self._log_level, ) outputs = [] @@ -984,6 +985,7 @@ def _create_fan_in_template(self, composite_node, list_of_iter_values: Optional[ node=composite_node, run_id=self._run_id_placeholder, map_variable=map_variable, + log_level=self._log_level, ) step_config = {"command": command, "type": "task", "next": "dummy"} @@ -1033,6 +1035,8 @@ def _gather_task_templates_of_dag( if working_on.node_type not in ["success", "fail"] and working_on._get_on_failure_node(): failure_node = dag.get_node_by_name(working_on._get_on_failure_node()) + # same logic, if a template exists, retrieve it + # if not, create a new one render_obj = get_renderer(working_on)(executor=self, node=failure_node) render_obj.render(list_of_iter_values=list_of_iter_values.copy()) @@ -1083,18 +1087,19 @@ def execute_graph(self, dag: Graph, map_variable: Optional[dict] = None, **kwarg # Expose "simple" parameters as workflow arguments for dynamic behavior if self.expose_parameters_as_inputs: for key, value in self._get_parameters().items(): + value = value.get_value() # type: ignore if isinstance(value, dict) or isinstance(value, list): continue - env_var = EnvVar(name=key, value=value) + + env_var = EnvVar(name=key, value=value) # type: ignore arguments.append(env_var) run_id_var = EnvVar(name="run_id", value="{{workflow.uid}}") + log_level_var = EnvVar(name="log_level", value=defaults.LOG_LEVEL) arguments.append(run_id_var) + arguments.append(log_level_var) - # # TODO: Experimental feature - - # original_run_id_var = EnvVar(name="original_run_id") - # arguments.append(original_run_id_var) + # TODO: Can we do reruns? for volume in self.spec.volumes: self._container_volumes.append(ContainerVolume(name=volume.name, mount_path=volume.mount_path)) diff --git a/runnable/extensions/executor/local_container/implementation.py b/runnable/extensions/executor/local_container/implementation.py index be711283..9f826132 100644 --- a/runnable/extensions/executor/local_container/implementation.py +++ b/runnable/extensions/executor/local_container/implementation.py @@ -202,7 +202,6 @@ def _spin_container( f"Please provide a docker_image using executor_config of the step {node.name} or at global config" ) - print("container", self._volumes) # TODO: Should consider using getpass.getuser() when running the docker container? Volume permissions container = client.containers.create( image=docker_image, diff --git a/runnable/extensions/nodes.py b/runnable/extensions/nodes.py index dd76bd5f..45850311 100644 --- a/runnable/extensions/nodes.py +++ b/runnable/extensions/nodes.py @@ -15,7 +15,7 @@ field_validator, ) -from runnable import datastore, defaults, utils +from runnable import console, datastore, defaults, utils from runnable.datastore import ( JsonParameter, MetricParameter, @@ -96,11 +96,10 @@ def execute( attempt_number=attempt_number, ) - logger.debug(f"attempt_log: {attempt_log}") + logger.info(f"attempt_log: {attempt_log}") logger.info(f"Step {self.name} completed with status: {attempt_log.status}") step_log.status = attempt_log.status - step_log.attempts.append(attempt_log) return step_log @@ -347,6 +346,7 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs): for internal_branch_name, _ in self.branches.items(): effective_branch_name = self._resolve_map_placeholders(internal_branch_name, map_variable=map_variable) branch_log = self._context.run_log_store.get_branch_log(effective_branch_name, self._context.run_id) + if branch_log.status != defaults.SUCCESS: step_success_bool = False @@ -498,6 +498,8 @@ def fan_out(self, map_variable: TypeMapVariable = None, **kwargs): self.internal_name + "." + str(iter_variable), map_variable=map_variable ) branch_log = self._context.run_log_store.create_branch_log(effective_branch_name) + + console.print(f"Branch log created for {effective_branch_name}: {branch_log}") branch_log.status = defaults.PROCESSING self._context.run_log_store.add_branch_log(branch_log, self._context.run_id) @@ -589,6 +591,8 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs): self.internal_name + "." + str(iter_variable), map_variable=map_variable ) branch_log = self._context.run_log_store.get_branch_log(effective_branch_name, self._context.run_id) + # console.print(f"Branch log for {effective_branch_name}: {branch_log}") + if branch_log.status != defaults.SUCCESS: step_success_bool = False diff --git a/runnable/tasks.py b/runnable/tasks.py index 98d90997..fe3d0755 100644 --- a/runnable/tasks.py +++ b/runnable/tasks.py @@ -14,13 +14,10 @@ from typing import Any, Dict, List, Literal, Tuple from pydantic import BaseModel, ConfigDict, Field, field_validator - -# from rich import print -from rich.console import Console from stevedore import driver import runnable.context as context -from runnable import defaults, exceptions, parameters, utils +from runnable import console, defaults, exceptions, parameters, utils from runnable.datastore import ( JsonParameter, MetricParameter, @@ -147,41 +144,26 @@ def execution_context(self, map_variable: TypeMapVariable = None, allow_complex: if context_param in params: params[param_name].value = params[context_param].value + console.log("Parameters available for the execution:") + console.log(params) + logger.debug(f"Resolved parameters: {params}") if not allow_complex: params = {key: value for key, value in params.items() if isinstance(value, JsonParameter)} - log_file_name = self._context.executor._context_node.internal_name - if map_variable: - for _, value in map_variable.items(): - log_file_name += "_" + str(value) - - log_file_name = "".join(x for x in log_file_name if x.isalnum()) + ".execution.log" - - log_file = open(log_file_name, "w") - parameters_in = copy.deepcopy(params) f = io.StringIO() - task_console = Console(file=io.StringIO()) try: with contextlib.redirect_stdout(f): # with contextlib.nullcontext(): - yield params, task_console - print(task_console.file.getvalue()) # type: ignore + yield params except Exception as e: # pylint: disable=broad-except + console.log(e, style=defaults.error_style) logger.exception(e) finally: - task_console = None # type: ignore print(f.getvalue()) # print to console - log_file.write(f.getvalue()) # Print to file - f.close() - log_file.close() - - # Put the log file in the catalog - self._context.catalog_handler.put(name=log_file.name, run_id=context.run_context.run_id) - os.remove(log_file.name) # Update parameters # This should only update the parameters that are changed at the root level. @@ -233,7 +215,7 @@ def execute_command( """Execute the notebook as defined by the command.""" attempt_log = StepAttempt(status=defaults.FAIL, start_time=str(datetime.now())) - with self.execution_context(map_variable=map_variable) as (params, task_console), self.expose_secrets() as _: + with self.execution_context(map_variable=map_variable) as params, self.expose_secrets() as _: module, func = utils.get_module_and_attr_names(self.command) sys.path.insert(0, os.getcwd()) # Need to add the current directory to path imported_module = importlib.import_module(module) @@ -243,9 +225,10 @@ def execute_command( try: filtered_parameters = parameters.filter_arguments_for_func(f, params.copy(), map_variable) logger.info(f"Calling {func} from {module} with {filtered_parameters}") + user_set_parameters = f(**filtered_parameters) # This is a tuple or single value except Exception as e: - task_console.log(e, style=defaults.error_style, markup=False) + console.log(e, style=defaults.error_style, markup=False) raise exceptions.CommandCallError(f"Function call: {self.command} did not succeed.\n") from e attempt_log.input_parameters = params.copy() @@ -289,8 +272,8 @@ def execute_command( except Exception as _e: msg = f"Call to the function {self.command} did not succeed.\n" attempt_log.message = msg - task_console.print_exception(show_locals=False) - task_console.log(_e, style=defaults.error_style) + console.print_exception(show_locals=False) + console.log(_e, style=defaults.error_style) attempt_log.end_time = str(datetime.now()) @@ -346,17 +329,17 @@ def execute_command( notebook_output_path = self.notebook_output_path - with self.execution_context(map_variable=map_variable, allow_complex=False) as ( - params, - _, - ), self.expose_secrets() as _: + with self.execution_context( + map_variable=map_variable, allow_complex=False + ) as params, self.expose_secrets() as _: + copy_params = copy.deepcopy(params) + if map_variable: for key, value in map_variable.items(): notebook_output_path += "_" + str(value) - params[key] = JsonParameter(kind="json", value=value) + copy_params[key] = JsonParameter(kind="json", value=value) # Remove any {v}_unreduced parameters from the parameters - copy_params = copy.deepcopy(params) unprocessed_params = [k for k, v in copy_params.items() if not v.reduced] for key in list(copy_params.keys()): @@ -397,6 +380,9 @@ def execute_command( ) except PicklingError as e: logger.exception("Notebooks cannot return objects") + console.log("Notebooks cannot return objects", style=defaults.error_style) + console.log(e, style=defaults.error_style) + logger.exception(e) raise @@ -413,6 +399,9 @@ def execute_command( ) logger.exception(msg) logger.exception(e) + + console.log(msg, style=defaults.error_style) + attempt_log.status = defaults.FAIL attempt_log.end_time = str(datetime.now()) @@ -468,7 +457,7 @@ def execute_command( subprocess_env[key] = secret_value try: - with self.execution_context(map_variable=map_variable, allow_complex=False) as (params, task_console): + with self.execution_context(map_variable=map_variable, allow_complex=False) as params: subprocess_env.update({k: v.get_value() for k, v in params.items()}) # Json dumps all runnable environment variables @@ -499,14 +488,14 @@ def execute_command( if proc.returncode != 0: msg = ",".join(result[1].split("\n")) - task_console.print(msg, style=defaults.error_style) + console.print(msg, style=defaults.error_style) raise exceptions.CommandCallError(msg) # for stderr for line in result[1].split("\n"): if line.strip() == "": continue - task_console.print(line, style=defaults.warning_style) + console.print(line, style=defaults.warning_style) output_parameters: Dict[str, Parameter] = {} metrics: Dict[str, Parameter] = {} @@ -517,7 +506,7 @@ def execute_command( continue logger.info(line) - task_console.print(line) + console.print(line) if line.strip() == collect_delimiter: # The lines from now on should be captured @@ -558,6 +547,10 @@ def execute_command( msg = f"Call to the command {self.command} did not succeed" logger.exception(msg) logger.exception(e) + + console.log(msg, style=defaults.error_style) + console.log(e, style=defaults.error_style) + attempt_log.status = defaults.FAIL attempt_log.end_time = str(datetime.now()) diff --git a/runnable/utils.py b/runnable/utils.py index d45e6ac4..f097b7a9 100644 --- a/runnable/utils.py +++ b/runnable/utils.py @@ -4,6 +4,8 @@ import json import logging import os +import random +import string import subprocess from collections import OrderedDict from datetime import datetime @@ -394,6 +396,7 @@ def get_node_execution_command( node: BaseNode, map_variable: TypeMapVariable = None, over_write_run_id: str = "", + log_level: str = "", ) -> str: """A utility function to standardize execution call to a node via command line. @@ -410,7 +413,7 @@ def get_node_execution_command( if over_write_run_id: run_id = over_write_run_id - log_level = logging.getLevelName(logger.getEffectiveLevel()) + log_level = log_level or logging.getLevelName(logger.getEffectiveLevel()) action = f"runnable execute_single_node {run_id} " f"{node._command_friendly_name()}" f" --log-level {log_level}" @@ -437,6 +440,7 @@ def get_fan_command( node: BaseNode, run_id: str, map_variable: TypeMapVariable = None, + log_level: str = "", ) -> str: """ An utility function to return the fan "in or out" command @@ -451,7 +455,7 @@ def get_fan_command( Returns: str: The fan in or out command """ - log_level = logging.getLevelName(logger.getEffectiveLevel()) + log_level = log_level or logging.getLevelName(logger.getEffectiveLevel()) action = ( f"runnable fan {run_id} " f"{node._command_friendly_name()} " @@ -614,3 +618,17 @@ def gather_variables() -> dict: variables[key] = value return variables + + +def make_log_file_name(node: BaseNode, map_variable: TypeMapVariable) -> str: + random_tag = "".join(random.choices(string.ascii_uppercase + string.digits, k=3)) + log_file_name = node.name + + if map_variable: + for _, value in map_variable.items(): + log_file_name += "_" + str(value) + + log_file_name += "_" + random_tag + log_file_name = "".join(x for x in log_file_name if x.isalnum()) + ".execution.log" + + return log_file_name