Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/3506: Improve Type Conversion errors, use rich to prettify error messages #1582

Merged
merged 13 commits into from
Apr 17, 2023
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ htmlcov
*.ipynb
*dat
docs/source/_tags/
.hypothesis
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

git+https://github.com/flyteorg/pytest-flyte@main#egg=pytest-flyte
coverage[toml]
hypothesis
joblib
mock
pytest
Expand Down
5 changes: 5 additions & 0 deletions flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@
import sys
from typing import Generator

from rich import traceback

if sys.version_info < (3, 10):
from importlib_metadata import entry_points
else:
Expand Down Expand Up @@ -296,3 +298,6 @@ def load_implicit_plugins():

# Load all implicit plugins
load_implicit_plugins()

# Pretty-print exception messages
traceback.install(width=None, extra_lines=0)
41 changes: 27 additions & 14 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
translate_inputs_to_literals,
)
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.deck.deck import Deck
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
Expand Down Expand Up @@ -245,12 +245,17 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
# Promises as essentially inputs from previous task executions
# native constants are just bound to this specific task (default values for a task input)
# Also along with promises and constants, there could be dictionary or list of promises or constants
kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
flyte_interface_types=self.interface.inputs,
native_types=self.get_input_types(), # type: ignore
)
try:
kwargs = translate_inputs_to_literals(
ctx,
incoming_values=kwargs,
flyte_interface_types=self.interface.inputs,
native_types=self.get_input_types(), # type: ignore
)
except TypeTransformerFailedError as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc
input_literal_map = _literal_models.LiteralMap(literals=kwargs)

# if metadata.cache is set, check memoized version
Expand Down Expand Up @@ -515,7 +520,14 @@ def dispatch_execute(
) as exec_ctx:
# TODO We could support default values here too - but not part of the plan right now
# Translate the input literals to Python native
native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
try:
native_inputs = TypeEngine.literal_map_to_kwargs(
exec_ctx, input_literal_map, self.python_interface.inputs
)
except Exception as exc:
msg = f"Failed to convert inputs of task '{self.name}':\n {exc}"
logger.error(msg)
raise TypeError(msg) from exc

# TODO: Logger should auto inject the current context information to indicate if the task is running within
# a workflow or a subworkflow etc
Expand Down Expand Up @@ -559,19 +571,20 @@ def dispatch_execute(
# We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption
# built into the IDL that all the values of a literal map are of the same type.
literals = {}
for k, v in native_outputs_as_map.items():
for i, (k, v) in enumerate(native_outputs_as_map.items()):
literal_type = self._outputs_interface[k].type
py_type = self.get_type_for_output_var(k, v)

if isinstance(v, tuple):
raise TypeError(f"Output({k}) in task{self.name} received a tuple {v}, instead of {py_type}")
raise TypeError(f"Output({k}) in task '{self.name}' received a tuple {v}, instead of {py_type}")
try:
literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
except Exception as e:
logger.error(f"Failed to convert return value for var {k} with error {type(e)}: {e}")
raise TypeError(
f"Failed to convert return value for var {k} for function {self.name} with error {type(e)}: {e}"
) from e
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != f"o{i}" else i
msg = f"Failed to convert outputs of task '{self.name}' at position {key}:\n {e}"
logger.error(msg)
raise TypeError(msg) from e

if self._disable_deck is False:
INPUT = "input"
Expand Down
21 changes: 15 additions & 6 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
)
from flytekit.core.interface import Interface
from flytekit.core.node import Node
from flytekit.core.type_engine import DictTransformer, ListTransformer, TypeEngine
from flytekit.core.type_engine import DictTransformer, ListTransformer, TypeEngine, TypeTransformerFailedError
from flytekit.exceptions import user as _user_exceptions
from flytekit.models import interface as _interface_models
from flytekit.models import literals as _literal_models
Expand Down Expand Up @@ -141,7 +141,10 @@ def extract_value(
raise ValueError(f"Received unexpected keyword argument {k}")
var = flyte_interface_types[k]
t = native_types[k]
result[k] = extract_value(ctx, v, t, var.type)
try:
result[k] = extract_value(ctx, v, t, var.type)
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from exc

return result

Expand Down Expand Up @@ -477,10 +480,14 @@ def create_native_named_tuple(

if isinstance(promises, Promise):
k, v = [(k, v) for k, v in entity_interface.outputs.items()][0] # get output native type
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = k if k != "o0" else 0
try:
return TypeEngine.to_python_value(ctx, promises.val, v)
except Exception as e:
raise AssertionError(f"Failed to convert value of output {k}, expected type {v}.") from e
raise TypeError(
f"Failed to convert output in position {key} of value {promises.val}, expected type {v}."
) from e

if len(cast(Tuple[Promise], promises)) == 0:
return None
Expand All @@ -490,7 +497,7 @@ def create_native_named_tuple(
named_tuple_name = entity_interface.output_tuple_name

outputs = {}
for p in cast(Tuple[Promise], promises):
for i, p in enumerate(cast(Tuple[Promise], promises)):
if not isinstance(p, Promise):
raise AssertionError(
"Workflow outputs can only be promises that are returned by tasks. Found a value of"
Expand All @@ -500,7 +507,9 @@ def create_native_named_tuple(
try:
outputs[p.var] = TypeEngine.to_python_value(ctx, p.val, t)
except Exception as e:
raise AssertionError(f"Failed to convert value of output {p.var}, expected type {t}.") from e
# only show the name of output key if it's user-defined (by default Flyte names these as "o<n>")
key = p.var if p.var != f"o{i}" else i
raise TypeError(f"Failed to convert output in position {key} of value {p.val}, expected type {t}.") from e

# Should this class be part of the Interface?
nt = collections.namedtuple(named_tuple_name, list(outputs.keys())) # type: ignore
Expand Down Expand Up @@ -1057,7 +1066,7 @@ def flyte_entity_call_handler(
for k, v in kwargs.items():
if k not in cast(SupportsNodeCreation, entity).python_interface.inputs:
raise ValueError(
f"Received unexpected keyword argument {k} in function {cast(SupportsNodeCreation, entity).name}"
f"Received unexpected keyword argument '{k}' in function '{cast(SupportsNodeCreation, entity).name}'"
)

ctx = FlyteContextManager.current_context()
Expand Down
16 changes: 12 additions & 4 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def type_assertions_enabled(self) -> bool:

def assert_type(self, t: Type[T], v: T):
if not hasattr(t, "__origin__") and not isinstance(v, t):
raise TypeTransformerFailedError(f"Type of Val '{v}' is not an instance of {t}")
raise TypeTransformerFailedError(f"Expected value of type {t} but got '{v}' of type {type(v)}")

@abstractmethod
def get_literal_type(self, t: Type[T]) -> LiteralType:
Expand Down Expand Up @@ -166,7 +166,9 @@ def get_literal_type(self, t: Optional[Type[T]] = None) -> LiteralType:

def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
if type(python_val) != self._type:
raise TypeTransformerFailedError(f"Expected value of type {self._type} but got type {type(python_val)}")
raise TypeTransformerFailedError(
f"Expected value of type {self._type} but got '{python_val}' of type {type(python_val)}"
)
return self._to_literal_transformer(python_val)

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
Expand All @@ -185,7 +187,7 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
return res
except AttributeError:
# Assume that this is because a property on `lv` was None
raise TypeTransformerFailedError(f"Cannot convert literal {lv}")
raise TypeTransformerFailedError(f"Cannot convert literal {lv} to {self._type}")

def guess_python_type(self, literal_type: LiteralType) -> Type[T]:
if literal_type.simple is not None and literal_type.simple == self._lt.simple:
Expand Down Expand Up @@ -864,7 +866,13 @@ def literal_map_to_kwargs(
raise ValueError(
f"Received more input values {len(lm.literals)}" f" than allowed by the input spec {len(python_types)}"
)
return {k: TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k]) for k, v in lm.literals.items()}
kwargs = {}
for i, k in enumerate(lm.literals):
try:
kwargs[k] = TypeEngine.to_python_value(ctx, lm.literals[k], python_types[k])
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Error converting input '{k}' at position {i}:\n {exc}") from exc
return kwargs

@classmethod
def dict_to_literal_map(
Expand Down
14 changes: 11 additions & 3 deletions flytekit/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from flytekit.core.python_auto_container import PythonAutoContainerTask
from flytekit.core.reference_entity import ReferenceEntity, WorkflowReference
from flytekit.core.tracker import extract_task_module
from flytekit.core.type_engine import TypeEngine
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.exceptions import scopes as exception_scopes
from flytekit.exceptions.user import FlyteValidationException, FlyteValueException
from flytekit.loggers import logger
Expand Down Expand Up @@ -260,7 +260,10 @@ def __call__(self, *args, **kwargs) -> Union[Tuple[Promise], Promise, VoidPromis
input_kwargs = self.python_interface.default_inputs_as_kwargs
input_kwargs.update(kwargs)
self.compile()
return flyte_entity_call_handler(self, *args, **input_kwargs)
try:
return flyte_entity_call_handler(self, *args, **input_kwargs)
except TypeError as exc:
raise TypeError(f"Encountered error while executing workflow '{self.name}':\n {exc}") from exc

def execute(self, **kwargs):
raise Exception("Should not be called")
Expand All @@ -274,7 +277,12 @@ def local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], Pr
for k, v in kwargs.items():
if not isinstance(v, Promise):
t = self.python_interface.inputs[k]
kwargs[k] = Promise(var=k, val=TypeEngine.to_literal(ctx, v, t, self.interface.inputs[k].type))
try:
kwargs[k] = Promise(var=k, val=TypeEngine.to_literal(ctx, v, t, self.interface.inputs[k].type))
except TypeTransformerFailedError as exc:
raise TypeError(
f"Failed to convert input argument '{k}' of workflow '{self.name}':\n {exc}"
) from exc

# The output of this will always be a combination of Python native values and Promises containing Flyte
# Literals.
Expand Down
7 changes: 5 additions & 2 deletions flytekit/exceptions/scopes.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,13 @@ def user_entry_point(wrapped, instance, args, kwargs):
_CONTEXT_STACK.append(_USER_CONTEXT)
if _is_base_context():
# See comment at this location for system_entry_point
fn_name = wrapped.__name__
try:
return wrapped(*args, **kwargs)
except FlyteScopedException as ex:
raise ex.value
except FlyteScopedException as exc:
raise exc.type(f"Error encountered while executing '{fn_name}':\n {exc.value}") from exc
except Exception as exc:
raise type(exc)(f"Error encountered while executing '{fn_name}':\n {exc}") from exc
else:
try:
return wrapped(*args, **kwargs)
Expand Down
30 changes: 24 additions & 6 deletions flytekit/loggers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import os
import shutil

from pythonjsonlogger import jsonlogger
from rich.console import Console
from rich.logging import RichHandler

# Note:
# The environment variable controls exposed to affect the individual loggers should be considered to be beta.
Expand All @@ -10,6 +13,7 @@
# For now, assume this is the environment variable whose usage will remain unchanged and controls output for all
# loggers defined in this file.
LOGGING_ENV_VAR = "FLYTE_SDK_LOGGING_LEVEL"
LOGGING_FMT_ENV_VAR = "FLYTE_SDK_LOGGING_FORMAT"

# By default, the root flytekit logger to debug so everything is logged, but enable fine-tuning
logger = logging.getLogger("flytekit")
Expand All @@ -33,8 +37,18 @@
user_space_logger = child_loggers["user_space"]

# create console handler
ch = logging.StreamHandler()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately a breaking change but I guess a minor version bump (1.6.0) is fine? We might want to highlight this in release notes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ch.setLevel(logging.DEBUG)
try:
handler = RichHandler(
rich_tracebacks=True,
omit_repeated_times=False,
keywords=["[flytekit]"],
log_time_format="%Y-%m-%d %H:%M:%S,%f",
console=Console(width=shutil.get_terminal_size().columns),
)
except OSError:
handler = logging.StreamHandler()

handler.setLevel(logging.DEBUG)

# Root logger control
# Don't want to import the configuration library since that will cause all sorts of circular imports, let's
Expand Down Expand Up @@ -63,10 +77,14 @@
child_logger.setLevel(logging.WARNING)

# create formatter
formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")
logging_fmt = os.environ.get(LOGGING_FMT_ENV_VAR, "json")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so no change in the default case right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, will update the logging format in a different PR

if logging_fmt == "json":
formatter = jsonlogger.JsonFormatter(fmt="%(asctime)s %(name)s %(levelname)s %(message)s")
else:
formatter = logging.Formatter(fmt="[%(name)s] %(message)s")

# add formatter to ch
ch.setFormatter(formatter)
# add formatter to the handler
handler.setFormatter(formatter)

# add ch to logger
logger.addHandler(ch)
logger.addHandler(handler)
4 changes: 3 additions & 1 deletion flytekit/models/common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import abc as _abc
import json as _json
import re

from flyteidl.admin import common_pb2 as _common_pb2
from google.protobuf import json_format as _json_format
Expand Down Expand Up @@ -57,7 +58,8 @@ def short_string(self):
"""
:rtype: Text
"""
return str(self.to_flyte_idl())
literal_str = re.sub(r"\s+", " ", str(self.to_flyte_idl())).strip()
return f"<FlyteLiteral {literal_str}>"

def verbose_string(self):
"""
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"numpy",
"gitpython",
"kubernetes>=12.0.1",
"rich",
],
extras_require=extras_require,
scripts=[
Expand Down
17 changes: 10 additions & 7 deletions tests/flytekit/unit/core/flyte_functools/test_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_wrapped_tasks_error(capfd):
)
out = capfd.readouterr().out

assert out.replace("\r", "").strip().split("\n") == [
assert out.replace("\r", "").strip().split("\n")[:5] == [
"before running my_task",
"try running my_task",
"error running my_task: my_task failed with input: 0",
Expand Down Expand Up @@ -74,11 +74,11 @@ def test_unwrapped_task():
capture_output=True,
)
error = completed_process.stderr
error_str = error.strip().split("\n")[-1]
assert (
"TaskFunction cannot be a nested/inner or local function."
" It should be accessible at a module level for Flyte to execute it." in error_str
)
error_str = ""
for line in error.strip().split("\n"):
if line.startswith("ValueError"):
error_str += line
assert error_str.startswith("ValueError: TaskFunction cannot be a nested/inner or local function.")


@pytest.mark.parametrize("script", ["nested_function.py", "nested_wrapped_function.py"])
Expand All @@ -90,5 +90,8 @@ def test_nested_function(script):
capture_output=True,
)
error = completed_process.stderr
error_str = error.strip().split("\n")[-1]
error_str = ""
for line in error.strip().split("\n"):
if line.startswith("ValueError"):
error_str += line
assert error_str.startswith("ValueError: TaskFunction cannot be a nested/inner or local function.")
Loading