Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved logging for non local executors #149

Merged
merged 3 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/common/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ def read_initial_params_as_pydantic(
pydantic_param: ComplexParams,
envvar: str,
):
print(envvar)
assert integer == 1
assert floater == 3.14
assert stringer == "hello"
Expand Down Expand Up @@ -132,14 +131,14 @@ def read_processed_chunk(chunk: int, processed_python: int, processed_notebook:
A downstream step of process_chunk of map state which reads the processed chunk.
Since the process_chunk returns the chunk multiplied by 10, we assert that.
"""
assert chunk * 10 == processed_python
assert int(chunk) * 10 == processed_python
assert processed_python * 10 == processed_notebook
assert processed_notebook * 10 == processed_shell


def assert_default_reducer(
processed_python: List[int], processed_notebook: List[int], processed_shell: List[int], chunks: List[int]
) -> int:
):
"""
Demonstrates the default reducer which just returns the list of processed chunks.
"""
Expand All @@ -148,9 +147,7 @@ def assert_default_reducer(
assert processed_shell == [chunk * 1000 for chunk in chunks]


def assert_custom_reducer(
processed_python: int, processed_notebook: int, processed_shell: int, chunks: List[int]
) -> int:
def assert_custom_reducer(processed_python: int, processed_notebook: int, processed_shell: int, chunks: List[int]):
"""
Asserts the custom reducer returns the max of all the processed chunks.
"""
Expand Down
10 changes: 10 additions & 0 deletions examples/common/process_chunk.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@
"processed_python = None"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a8dc8e64",
"metadata": {},
"outputs": [],
"source": [
"chunk = int(chunk)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
2 changes: 1 addition & 1 deletion examples/configs/argo-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ executor:
mount_path: /mnt

run_log_store: # (4)
type: file-system
type: chunked-fs
config:
log_folder: /mnt/run_log_store

Expand Down
5 changes: 3 additions & 2 deletions runnable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
dictConfig(defaults.LOGGING_CONFIG)
logger = logging.getLogger(defaults.LOGGER_NAME)

console = Console()
console = Console(record=True)
console.print(":runner: Lets go!!")

from runnable.sdk import ( # noqa
Expand All @@ -30,7 +30,8 @@
pickled,
)

os.environ["_PLOOMBER_TELEMETRY_DEBUG"] = "false"
# Needed to disable ploomber telemetry
os.environ["PLOOMBER_STATS_ENABLED"] = "false"

## TODO: Summary should be a bit better for catalog.
## If the execution fails, hint them about the retry executor.
Expand Down
43 changes: 26 additions & 17 deletions runnable/entrypoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,11 @@ def get_default_configs() -> RunnableConfig:
"""
User can provide extensions as part of their code base, runnable-config.yaml provides the place to put them.
"""
user_configs = {}
user_configs: RunnableConfig = {}
if utils.does_file_exist(defaults.USER_CONFIG_FILE):
user_configs = utils.load_yaml(defaults.USER_CONFIG_FILE)
user_configs = cast(RunnableConfig, utils.load_yaml(defaults.USER_CONFIG_FILE))

if not user_configs:
return {}

user_defaults = user_configs.get("defaults", {})
if user_defaults:
return user_defaults

return {}
return user_configs


def prepare_configurations(
Expand Down Expand Up @@ -198,6 +191,7 @@ def execute(
run_context.progress = progress
executor.execute_graph(dag=run_context.dag) # type: ignore

# Non local executors have no run logs
if not executor._local:
executor.send_return_code(stage="traversal")
return
Expand Down Expand Up @@ -245,6 +239,8 @@ def execute_single_node(
"""
from runnable import nodes

console.print(f"Executing the single node: {step_name} with map variable: {map_variable}")

configuration_file = os.environ.get("RUNNABLE_CONFIGURATION_FILE", configuration_file)

run_context = prepare_configurations(
Expand All @@ -264,19 +260,32 @@ def execute_single_node(

executor.prepare_for_node_execution()

if not run_context.dag:
# There are a few entry points that make graph dynamically and do not have a dag defined statically.
run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_id, full=False)
run_context.dag = graph.create_graph(run_log.run_config["pipeline"])

step_internal_name = nodes.BaseNode._get_internal_name_from_command_name(step_name)
# TODO: may be make its own entry point
# if not run_context.dag:
# # There are a few entry points that make graph dynamically and do not have a dag defined statically.
# run_log = run_context.run_log_store.get_run_log_by_id(run_id=run_id, full=False)
# run_context.dag = graph.create_graph(run_log.run_config["pipeline"])
assert run_context.dag

map_variable_dict = utils.json_to_ordered_dict(map_variable)

step_internal_name = nodes.BaseNode._get_internal_name_from_command_name(step_name)
node_to_execute, _ = graph.search_node_by_internal_name(run_context.dag, step_internal_name)

logger.info("Executing the single node of : %s", node_to_execute)
executor.execute_node(node=node_to_execute, map_variable=map_variable_dict)
## This step is where we save the log file
try:
executor.execute_node(node=node_to_execute, map_variable=map_variable_dict)
except Exception: # noqa: E722
log_file_name = utils.make_log_file_name(
node=node_to_execute,
map_variable=map_variable_dict,
)
console.save_text(log_file_name)

# Put the log file in the catalog
run_context.catalog_handler.put(name=log_file_name, run_id=run_context.run_id)
os.remove(log_file_name)

executor.send_return_code(stage="execution")

Expand Down
57 changes: 31 additions & 26 deletions runnable/extensions/executor/argo/implementation.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import string
from abc import ABC, abstractmethod
from collections import OrderedDict
from typing import Any, Dict, List, Optional, Union, cast
from typing import Dict, List, Optional, Union, cast

from pydantic import (
BaseModel,
Expand All @@ -19,7 +19,7 @@
from ruamel.yaml import YAML
from typing_extensions import Annotated

from runnable import defaults, exceptions, integration, parameters, utils
from runnable import defaults, exceptions, integration, utils
from runnable.defaults import TypeMapVariable
from runnable.extensions.executor import GenericExecutor
from runnable.extensions.nodes import DagNode, MapNode, ParallelNode
Expand Down Expand Up @@ -378,6 +378,7 @@ def render(self, list_of_iter_values: Optional[List] = None):
self.node,
over_write_run_id=self.executor._run_id_placeholder,
map_variable=map_variable,
log_level=self.executor._log_level,
)

inputs = []
Expand Down Expand Up @@ -502,12 +503,16 @@ def render(self, list_of_iter_values: Optional[List] = None):
self.node = cast(MapNode, self.node)
task_template_arguments = []
dag_inputs = []
if list_of_iter_values:
for value in list_of_iter_values:
task_template_arguments.append(Argument(name=value, value="{{inputs.parameters." + value + "}}"))
dag_inputs.append(Parameter(name=value))

if not list_of_iter_values:
list_of_iter_values = []

for value in list_of_iter_values:
task_template_arguments.append(Argument(name=value, value="{{inputs.parameters." + value + "}}"))
dag_inputs.append(Parameter(name=value))

clean_name = self.executor.get_clean_name(self.node)

fan_out_template = self.executor._create_fan_out_template(
composite_node=self.node, list_of_iter_values=list_of_iter_values
)
Expand All @@ -518,9 +523,6 @@ def render(self, list_of_iter_values: Optional[List] = None):
)
fan_in_template.arguments = task_template_arguments if task_template_arguments else None

if not list_of_iter_values:
list_of_iter_values = []

list_of_iter_values.append(self.node.iterate_as)

self.executor._gather_task_templates_of_dag(
Expand Down Expand Up @@ -580,8 +582,12 @@ class Spec(BaseModel):
node_selector: Optional[Dict[str, str]] = Field(default_factory=dict, serialization_alias="nodeSelector")
tolerations: Optional[List[Toleration]] = Field(default=None, serialization_alias="tolerations")
parallelism: Optional[int] = Field(default=None, serialization_alias="parallelism")

# TODO: This has to be user driven
pod_gc: Dict[str, str] = Field(default={"strategy": "OnPodCompletion"}, serialization_alias="podGC")
pod_gc: Dict[str, str] = Field(
default={"strategy": "OnPodSuccess", "deleteDelayDuration": "600s"},
serialization_alias="podGC",
)

retry_strategy: Retry = Field(default=Retry(), serialization_alias="retryStrategy")
service_account_name: Optional[str] = Field(default=None, serialization_alias="serviceAccountName")
Expand Down Expand Up @@ -674,6 +680,8 @@ class ArgoExecutor(GenericExecutor):
service_name: str = "argo"
_local: bool = False

# TODO: Add logging level as option.

model_config = ConfigDict(extra="forbid")

image: str
Expand Down Expand Up @@ -719,6 +727,7 @@ class ArgoExecutor(GenericExecutor):
persistent_volumes: List[UserVolumeMounts] = Field(default_factory=list)

_run_id_placeholder: str = "{{workflow.parameters.run_id}}"
_log_level: str = "{{workflow.parameters.log_level}}"
_container_templates: List[ContainerTemplate] = []
_dag_templates: List[DagTemplate] = []
_clean_names: Dict[str, str] = {}
Expand Down Expand Up @@ -828,17 +837,7 @@ def fan_out(self, node: BaseNode, map_variable: TypeMapVariable = None):
iterate_on = self._context.run_log_store.get_parameters(self._context.run_id)[node.iterate_on]

with open("/tmp/output.txt", mode="w", encoding="utf-8") as myfile:
json.dump(iterate_on, myfile, indent=4)

def _get_parameters(self) -> Dict[str, Any]:
params = {}
if self._context.parameters_file:
# Parameters from the parameters file if defined
params.update(utils.load_yaml(self._context.parameters_file))
# parameters from environment variables supersede file based
params.update(parameters.get_user_set_parameters())

return params
json.dump(iterate_on.get_value(), myfile, indent=4)

def sanitize_name(self, name):
return name.replace(" ", "-").replace(".", "-").replace("_", "-")
Expand Down Expand Up @@ -886,6 +885,7 @@ def create_container_template(

if working_on.name == self._context.dag.start_at and self.expose_parameters_as_inputs:
for key, value in self._get_parameters().items():
value = value.get_value() # type: ignore
# Get the value from work flow parameters for dynamic behavior
if isinstance(value, int) or isinstance(value, float) or isinstance(value, str):
env_var = EnvVar(
Expand Down Expand Up @@ -943,6 +943,7 @@ def _create_fan_out_template(self, composite_node, list_of_iter_values: Optional
node=composite_node,
run_id=self._run_id_placeholder,
map_variable=map_variable,
log_level=self._log_level,
)

outputs = []
Expand Down Expand Up @@ -984,6 +985,7 @@ def _create_fan_in_template(self, composite_node, list_of_iter_values: Optional[
node=composite_node,
run_id=self._run_id_placeholder,
map_variable=map_variable,
log_level=self._log_level,
)

step_config = {"command": command, "type": "task", "next": "dummy"}
Expand Down Expand Up @@ -1033,6 +1035,8 @@ def _gather_task_templates_of_dag(
if working_on.node_type not in ["success", "fail"] and working_on._get_on_failure_node():
failure_node = dag.get_node_by_name(working_on._get_on_failure_node())

# same logic, if a template exists, retrieve it
# if not, create a new one
render_obj = get_renderer(working_on)(executor=self, node=failure_node)
render_obj.render(list_of_iter_values=list_of_iter_values.copy())

Expand Down Expand Up @@ -1083,18 +1087,19 @@ def execute_graph(self, dag: Graph, map_variable: Optional[dict] = None, **kwarg
# Expose "simple" parameters as workflow arguments for dynamic behavior
if self.expose_parameters_as_inputs:
for key, value in self._get_parameters().items():
value = value.get_value() # type: ignore
if isinstance(value, dict) or isinstance(value, list):
continue
env_var = EnvVar(name=key, value=value)

env_var = EnvVar(name=key, value=value) # type: ignore
arguments.append(env_var)

run_id_var = EnvVar(name="run_id", value="{{workflow.uid}}")
log_level_var = EnvVar(name="log_level", value=defaults.LOG_LEVEL)
arguments.append(run_id_var)
arguments.append(log_level_var)

# # TODO: Experimental feature

# original_run_id_var = EnvVar(name="original_run_id")
# arguments.append(original_run_id_var)
# TODO: Can we do reruns?

for volume in self.spec.volumes:
self._container_volumes.append(ContainerVolume(name=volume.name, mount_path=volume.mount_path))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ def _spin_container(
f"Please provide a docker_image using executor_config of the step {node.name} or at global config"
)

print("container", self._volumes)
# TODO: Should consider using getpass.getuser() when running the docker container? Volume permissions
container = client.containers.create(
image=docker_image,
Expand Down
10 changes: 7 additions & 3 deletions runnable/extensions/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
field_validator,
)

from runnable import datastore, defaults, utils
from runnable import console, datastore, defaults, utils
from runnable.datastore import (
JsonParameter,
MetricParameter,
Expand Down Expand Up @@ -96,11 +96,10 @@ def execute(
attempt_number=attempt_number,
)

logger.debug(f"attempt_log: {attempt_log}")
logger.info(f"attempt_log: {attempt_log}")
logger.info(f"Step {self.name} completed with status: {attempt_log.status}")

step_log.status = attempt_log.status

step_log.attempts.append(attempt_log)

return step_log
Expand Down Expand Up @@ -347,6 +346,7 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs):
for internal_branch_name, _ in self.branches.items():
effective_branch_name = self._resolve_map_placeholders(internal_branch_name, map_variable=map_variable)
branch_log = self._context.run_log_store.get_branch_log(effective_branch_name, self._context.run_id)

if branch_log.status != defaults.SUCCESS:
step_success_bool = False

Expand Down Expand Up @@ -498,6 +498,8 @@ def fan_out(self, map_variable: TypeMapVariable = None, **kwargs):
self.internal_name + "." + str(iter_variable), map_variable=map_variable
)
branch_log = self._context.run_log_store.create_branch_log(effective_branch_name)

console.print(f"Branch log created for {effective_branch_name}: {branch_log}")
branch_log.status = defaults.PROCESSING
self._context.run_log_store.add_branch_log(branch_log, self._context.run_id)

Expand Down Expand Up @@ -589,6 +591,8 @@ def fan_in(self, map_variable: TypeMapVariable = None, **kwargs):
self.internal_name + "." + str(iter_variable), map_variable=map_variable
)
branch_log = self._context.run_log_store.get_branch_log(effective_branch_name, self._context.run_id)
# console.print(f"Branch log for {effective_branch_name}: {branch_log}")

if branch_log.status != defaults.SUCCESS:
step_success_bool = False

Expand Down
Loading
Loading