Skip to content

Commit

Permalink
update for flytefile and flytedirectory
Browse files Browse the repository at this point in the history
Signed-off-by: Future-Outlier <eric901201@gmail.com>
  • Loading branch information
Future-Outlier committed Mar 20, 2024
1 parent 10d8b20 commit 55549f9
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions flytekit/core/container_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ def local_execute(

from flytekit.core.promise import translate_inputs_to_literals
from flytekit.core.type_engine import TypeEngine, TypeTransformerFailedError
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile

try:
kwargs = translate_inputs_to_literals(
Expand All @@ -131,37 +133,38 @@ def local_execute(
logger.error(msg)
raise type(exc)(msg) from exc

container_output_dir = "/flyte/raw-container-task/outputs"
output_directory = ctx.file_access.get_random_local_directory()
volume_bindings = {
output_directory: {
"bind": container_output_dir,
"bind": self._output_data_dir,
"mode": "rw",
},
}

commands = ""
cmd_and_args = []
if self._cmd:
for cmd in self._cmd:
if cmd.startswith("{{.inputs.") and cmd.endswith("}}"):
k = cmd[len("{{.inputs.") : -len("}}")]
commands += str(native_inputs[k]).split()[0] + " "
elif cmd == self._output_data_dir:
commands += container_output_dir + " "
else:
commands += cmd + " "
cmd_and_args += self._cmd
if self._args:
for arg in self._args:
if arg.startswith("{{.inputs.") and arg.endswith("}}"):
k = arg[len("{{.inputs.") : -len("}}")]
commands += str(native_inputs[k]).split()[0] + " "
elif arg == self._output_data_dir:
commands += container_output_dir + " "
cmd_and_args += self._args

for cmd in cmd_and_args:
if cmd.startswith("{{.inputs.") and cmd.endswith("}}"):
k = cmd[len("{{.inputs.") : -len("}}")]
if type(native_inputs[k]) in [FlyteFile, FlyteDirectory]:
local_flyte_file_or_dir_path = str(native_inputs[k])
remote_flyte_file_or_dir_path = os.path.join(self._input_data_dir, k.replace(".", "/")) # type: ignore
volume_bindings[local_flyte_file_or_dir_path] = {
"bind": remote_flyte_file_or_dir_path,
"mode": "rw",
}
commands += remote_flyte_file_or_dir_path + " "
else:
commands += arg + " "
commands += str(native_inputs[k]).split()[0] + " "
else:
commands += cmd + " "

client = docker.from_env()

# If can't find the image, pull it
if client.images.list(filters={"reference": self._image}) == []:
logger.info(f"Pulling image:{self._image} for container task:{self.name}")
Expand All @@ -185,16 +188,19 @@ def local_execute(
output_dict = {}
if self._outputs:
for k, output_type in self._outputs.items():
with open(os.path.join(output_directory, k), "r") as f:
output_val = f.read()

# bool('False') is True, so we need to handle this case
if output_type == bool:
output_dict[k] = output_val == "True"
elif output_type == datetime.datetime:
output_dict[k] = datetime.datetime.fromisoformat(output_val) # type: ignore
if output_type in [FlyteFile, FlyteDirectory]:
output_dict[k] = output_type(path=os.path.join(output_directory, k))
else:
output_dict[k] = output_type(output_val)
with open(os.path.join(output_directory, k), "r") as f:
output_val = f.read()

# bool('False') is True, so we need to handle this case
if output_type == bool:
output_dict[k] = output_val == "True"
elif output_type == datetime.datetime:
output_dict[k] = datetime.datetime.fromisoformat(output_val) # type: ignore
else:
output_dict[k] = output_type(output_val)

outputs_literal_map = TypeEngine.dict_to_literal_map(ctx, output_dict)
outputs_literals = outputs_literal_map.literals
Expand Down

0 comments on commit 55549f9

Please sign in to comment.