Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove legacy API #807

Merged
merged 28 commits into from
Jan 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f47d921
move exceptions
wild-endeavor Jan 3, 2022
3f7d078
move constants
wild-endeavor Jan 3, 2022
2c7d593
move more out of common
wild-endeavor Jan 4, 2022
d98885c
move more, remove primitives
wild-endeavor Jan 4, 2022
0848a59
deletion of common
wild-endeavor Jan 4, 2022
05b4b7f
delete interfaces/data, i think it's been replaced with persistence
wild-endeavor Jan 4, 2022
87ae04f
remove common tests
wild-endeavor Jan 4, 2022
5f9b692
new identifier file, bring back mock stats
wild-endeavor Jan 4, 2022
dfc2e37
move cli identifiers
wild-endeavor Jan 4, 2022
1f2f773
move translator into tools
wild-endeavor Jan 4, 2022
21ff70d
data proxy replace, more test deletion
wild-endeavor Jan 5, 2022
c2de241
remove old array stuff from entrypoint, remove test_register
wild-endeavor Jan 5, 2022
145dd4a
delete everything that fails
wild-endeavor Jan 8, 2022
af45f24
add back missing init files
wild-endeavor Jan 11, 2022
8f3a4ed
bump dateutil
wild-endeavor Jan 12, 2022
f947730
make requirements
wild-endeavor Jan 12, 2022
50e6140
remove rest of six
wild-endeavor Jan 13, 2022
929d759
nit
wild-endeavor Jan 14, 2022
c2c13dc
cleanup running through all files
wild-endeavor Jan 14, 2022
544a68a
missed one six
wild-endeavor Jan 14, 2022
1efbcdf
sagemaker updates
wild-endeavor Jan 14, 2022
36da4e0
Merge remote-tracking branch 'origin/master' into legacy-removal
wild-endeavor Jan 14, 2022
2079a5a
spark add missing enum
wild-endeavor Jan 14, 2022
1ac8020
spark models
wild-endeavor Jan 14, 2022
137e2b8
nit
wild-endeavor Jan 14, 2022
6d71fd1
lint
wild-endeavor Jan 14, 2022
6522eee
min python
wild-endeavor Jan 14, 2022
9733963
sagemaker change
wild-endeavor Jan 14, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@
else:
from importlib.metadata import entry_points

import flytekit.plugins # This will be deprecated, these are the old plugins, the new plugins live in plugins/
from flytekit.core.base_sql_task import SQLTask
from flytekit.core.base_task import SecurityContext, TaskMetadata, kwtypes
from flytekit.core.condition import conditional
Expand Down
119 changes: 26 additions & 93 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
import contextlib
import datetime as _datetime
import importlib as _importlib
import logging as python_logging
import os as _os
import pathlib
import random as _random
import traceback as _traceback
from typing import List

import click as _click
from flyteidl.core import literals_pb2 as _literals_pb2

from flytekit import PythonFunctionTask
from flytekit.common import constants as _constants
from flytekit.common import utils as _common_utils
from flytekit.common import utils as _utils
from flytekit.common.exceptions import scopes as _scoped_exceptions
from flytekit.common.exceptions import scopes as _scopes
from flytekit.common.exceptions import system as _system_exceptions
from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.configuration import TemporaryConfiguration as _TemporaryConfiguration
from flytekit.configuration import internal as _internal_config
from flytekit.configuration import sdk as _sdk_config
from flytekit.core import constants as _constants
from flytekit.core import utils
from flytekit.core.base_task import IgnoreOutputs, PythonTask
from flytekit.core.context_manager import (
ExecutionParameters,
ExecutionState,
FlyteContext,
FlyteContextManager,
Expand All @@ -33,9 +27,8 @@
from flytekit.core.data_persistence import FileAccessProvider
from flytekit.core.map_task import MapPythonTask
from flytekit.core.promise import VoidPromise
from flytekit.engines import loader as _engine_loader
from flytekit.interfaces import random as _flyte_random
from flytekit.interfaces.data import data_proxy as _data_proxy
from flytekit.exceptions import scopes as _scoped_exceptions
from flytekit.exceptions import scopes as _scopes
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.loggers import entrypoint_logger as logger
from flytekit.models import dynamic_job as _dynamic_job
Expand All @@ -47,6 +40,12 @@
from flytekit.tools.module_loader import load_object_from_module


def get_version_message():
import flytekit

return f"Welcome to Flyte! Version: {flytekit.__version__}"


def _compute_array_job_index():
# type () -> int
"""
Expand All @@ -61,25 +60,6 @@ def _compute_array_job_index():
return offset + int(_os.environ.get(_os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME")))


def _map_job_index_to_child_index(local_input_dir, datadir, index):
local_lookup_file = local_input_dir.get_named_tempfile("indexlookup.pb")
idx_lookup_file = _os.path.join(datadir, "indexlookup.pb")

# if the indexlookup.pb does not exist, then just return the index
if not _data_proxy.Data.data_exists(idx_lookup_file):
return index

_data_proxy.Data.get_data(idx_lookup_file, local_lookup_file)
mapping_proto = _utils.load_proto_from_file(_literals_pb2.LiteralCollection, local_lookup_file)
if len(mapping_proto.literals) < index:
raise _system_exceptions.FlyteSystemAssertion(
"dynamic task index lookup array size: {} is smaller than lookup index {}".format(
len(mapping_proto.literals), index
)
)
return mapping_proto.literals[index].scalar.primitive.integer


def _dispatch_execute(
ctx: FlyteContext,
task_def: PythonTask,
Expand All @@ -101,7 +81,7 @@ def _dispatch_execute(
# Step1
local_inputs_file = _os.path.join(ctx.execution_state.working_dir, "inputs.pb")
ctx.file_access.get_data(inputs_path, local_inputs_file)
input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)
input_proto = utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)
idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto)

# Step2
Expand Down Expand Up @@ -174,7 +154,7 @@ def _dispatch_execute(
logger.error("!! End Error Captured by Flyte !!")

for k, v in output_file_dict.items():
_common_utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))
utils.write_proto_to_file(v.to_flyte_idl(), _os.path.join(ctx.execution_state.engine_dir, k))

ctx.file_access.put_data(ctx.execution_state.engine_dir, output_prefix, is_multipart=True)
logger.info(f"Engine folder written successfully to the output prefix {output_prefix}")
Expand Down Expand Up @@ -283,45 +263,6 @@ def _handle_annotated_task(
_dispatch_execute(ctx, task_def, inputs, output_prefix)


@_scopes.system_entry_point
def _legacy_execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test):
"""
This function should be called for old flytekit api tasks (the only API that was available in 0.15.x and earlier)
"""
with _TemporaryConfiguration(_internal_config.CONFIGURATION_PATH.get()):
with _utils.AutoDeletingTempDir("input_dir") as input_dir:
# Load user code
task_module = _importlib.import_module(task_module)
task_def = getattr(task_module, task_name)

local_inputs_file = input_dir.get_named_tempfile("inputs.pb")

# Handle inputs/outputs for array job.
if _os.environ.get("BATCH_JOB_ARRAY_INDEX_VAR_NAME"):
job_index = _compute_array_job_index()

# TODO: Perhaps remove. This is a workaround to an issue we perceived with limited entropy in
# TODO: AWS batch array jobs.
_flyte_random.seed_flyte_random(
"{} {} {}".format(_random.random(), _datetime.datetime.utcnow(), job_index)
)

# If an ArrayTask is discoverable, the original job index may be different than the one specified in
# the environment variable. Look up the correct input/outputs in the index lookup mapping file.
job_index = _map_job_index_to_child_index(input_dir, inputs, job_index)

inputs = _os.path.join(inputs, str(job_index), "inputs.pb")
output_prefix = _os.path.join(output_prefix, str(job_index))

_data_proxy.Data.get_data(inputs, local_inputs_file)
input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)

_engine_loader.get_engine().get_task(task_def).execute(
_literal_models.LiteralMap.from_flyte_idl(input_proto),
context={"output_prefix": output_prefix, "raw_output_data_prefix": raw_output_data_prefix},
)


@_scopes.system_entry_point
def _execute_task(
inputs,
Expand Down Expand Up @@ -416,8 +357,6 @@ def _pass_through():


@_pass_through.command("pyflyte-execute")
@_click.option("--task-module", required=False)
@_click.option("--task-name", required=False)
@_click.option("--inputs", required=True)
@_click.option("--output-prefix", required=True)
@_click.option("--raw-output-data-prefix", required=False)
Expand All @@ -431,8 +370,6 @@ def _pass_through():
nargs=-1,
)
def execute_task_cmd(
task_module,
task_name,
inputs,
output_prefix,
raw_output_data_prefix,
Expand All @@ -442,7 +379,7 @@ def execute_task_cmd(
resolver,
resolver_args,
):
logger.info(_utils.get_version_message())
logger.info(get_version_message())
# We get weird errors if there are no click echo messages at all, so emit an empty string so that unit tests pass.
_click.echo("")
# Backwards compatibility - if Propeller hasn't filled this in, then it'll come through here as the original
Expand All @@ -455,21 +392,17 @@ def execute_task_cmd(
# Use the presence of the resolver to differentiate between old API tasks and new API tasks
# The addition of a new top-level command seemed out of scope at the time of this writing to pursue given how
# pervasive this top level command already (plugins mostly).
if not resolver:
logger.info("No resolver found, assuming legacy API task...")
_legacy_execute_task(task_module, task_name, inputs, output_prefix, raw_output_data_prefix, test)
else:
logger.debug(f"Running task execution with resolver {resolver}...")
_execute_task(
inputs,
output_prefix,
raw_output_data_prefix,
test,
resolver,
resolver_args,
dynamic_addl_distro,
dynamic_dest_dir,
)
logger.debug(f"Running task execution with resolver {resolver}...")
_execute_task(
inputs,
output_prefix,
raw_output_data_prefix,
test,
resolver,
resolver_args,
dynamic_addl_distro,
dynamic_dest_dir,
)


@_pass_through.command("pyflyte-fast-execute")
Expand Down Expand Up @@ -528,7 +461,7 @@ def map_execute_task_cmd(
resolver,
resolver_args,
):
logger.info(_utils.get_version_message())
logger.info(get_version_message())

_execute_map_task(
inputs,
Expand Down
8 changes: 4 additions & 4 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ def create_execution(self, project, domain, name, execution_spec, inputs):
def recover_execution(self, id, name: str = None):
"""
Recreates a previously-run workflow execution that will only start executing from the last known failure point.
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier id:
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:param name str: Optional name to assign to the newly created execution.
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier
"""
Expand All @@ -572,7 +572,7 @@ def recover_execution(self, id, name: str = None):

def get_execution(self, id):
"""
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier id:
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:rtype: flytekit.models.execution.Execution
"""
return _execution.Execution.from_flyte_idl(
Expand Down Expand Up @@ -638,7 +638,7 @@ def list_executions_paginated(self, project, domain, limit=100, token=None, filt

def terminate_execution(self, id, cause):
"""
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier id:
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:param Text cause:
"""
super(SynchronousFlyteClient, self).terminate_execution(
Expand All @@ -647,7 +647,7 @@ def terminate_execution(self, id, cause):

def relaunch_execution(self, id, name=None):
"""
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier id:
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier id:
:param Text name: [Optional] name for the new execution. If not specified, a randomly generated name will be
used
:returns: The unique identifier for the new execution.
Expand Down
4 changes: 2 additions & 2 deletions flytekit/clients/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ def iterate_node_executions(
"""
This returns a generator for node executions.
:param flytekit.clients.friendly.SynchronousFlyteClient client:
:param flytekit.common.core.identifier.WorkflowExecutionIdentifier workflow_execution_identifier:
:param flytekit.common.core.identifier.TaskExecutionIdentifier task_execution_identifier:
:param flytekit.models.core.identifier.WorkflowExecutionIdentifier workflow_execution_identifier:
:param flytekit.models.core.identifier.TaskExecutionIdentifier task_execution_identifier:
:param int limit: The maximum number of elements to retrieve
:param list[flytekit.models.filters.Filter] filters:
:rtype: Iterator[flytekit.models.node_execution.NodeExecution]
Expand Down
9 changes: 3 additions & 6 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import time
from typing import List

import six as _six
from flyteidl.service import admin_pb2_grpc as _admin_service
from google.protobuf.json_format import MessageToJson as _MessageToJson
from grpc import RpcError as _RpcError
Expand All @@ -13,13 +12,13 @@

from flytekit.clis.auth import credentials as _credentials_access
from flytekit.clis.sdk_in_container import basic_auth as _basic_auth
from flytekit.common.exceptions import user as _user_exceptions
from flytekit.configuration import creds as _creds_config
from flytekit.configuration.creds import _DEPRECATED_CLIENT_CREDENTIALS_SCOPE as _DEPRECATED_SCOPE
from flytekit.configuration.creds import CLIENT_ID as _CLIENT_ID
from flytekit.configuration.creds import COMMAND as _COMMAND
from flytekit.configuration.creds import DEPRECATED_OAUTH_SCOPES, SCOPES
from flytekit.configuration.platform import AUTH as _AUTH
from flytekit.exceptions import user as _user_exceptions
from flytekit.loggers import cli_logger


Expand Down Expand Up @@ -91,9 +90,7 @@ def _refresh_credentials_from_command(flyte_client):
output = subprocess.run(command, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
cli_logger.error("Failed to generate token from command {}".format(command))
raise _user_exceptions.FlyteAuthenticationException(
"Problems refreshing token with command: " + _six.text_type(e)
)
raise _user_exceptions.FlyteAuthenticationException("Problems refreshing token with command: " + str(e))
flyte_client.set_access_token(output.stdout.strip())


Expand Down Expand Up @@ -134,7 +131,7 @@ def handler(*args, **kwargs):
# Always retry auth errors.
if i == (max_retries - 1):
# Exit the loop and wrap the authentication error.
raise _user_exceptions.FlyteAuthenticationException(_six.text_type(e))
raise _user_exceptions.FlyteAuthenticationException(str(e))
cli_logger.error(f"Unauthenticated RPC error {e}, refreshing credentials and retrying\n")
refresh_handler_fn = _get_refresh_handler(_creds_config.AUTH_MODE.get())
refresh_handler_fn(args[0])
Expand Down
Loading