Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binary output artifacts getting encoded as string #2223

Closed
knkski opened this issue Sep 24, 2019 · 6 comments
Closed

Binary output artifacts getting encoded as string #2223

knkski opened this issue Sep 24, 2019 · 6 comments
Assignees

Comments

@knkski
Copy link

knkski commented Sep 24, 2019

What happened:

Tried using the output artifacts as mentioned in #1654 (comment)

What did you expect to happen:

I expected to be able to pass binary artifacts around between tasks

What steps did you take:

I'm getting an issue related to bytes getting encoded as a string somewhere (b"b'\\x00\\x01'" instead of b'\x00\x01') when running this pipeline:

from typing import NamedTuple
from kfp import dsl, components
from kubernetes import client


EXTRA_CODE = '''
import subprocess, sys
subprocess.call([sys.executable, '-m', 'pip', 'install', 'git+https://github.com/kubeflow/pipelines.git#egg=kfp&subdirectory=sdk/python/'])
from kfp import components
'''


def task1_fn() -> NamedTuple('Task1', [('foo', bytes)]):
    return bytes(i for i in range(2)),


def task2_fn(file: components.InputBinaryFile(bytes)):
    assert file.read() == b'\x00\x01'  # We're actually getting `b"b'\\x00\\x01'"`


@dsl.pipeline(name='Artifact Passing Test')
def artifact_pipeline():
    task1 = components.func_to_container_op(task1_fn, base_image='python:3.7')
    task2 = components.func_to_container_op(task2_fn, base_image='python:3.7', extra_code=EXTRA_CODE)
    t1 = task1()
    t2 = task2(t1.outputs['foo'])

    t1.add_volume(client.V1Volume(name='outputs', empty_dir=client.V1EmptyDirVolumeSource()))
    t1.container.add_volume_mount(client.V1VolumeMount(name='outputs', mount_path='/tmp/outputs'))

Anything else you would like to add:

Also, attempting to use this raises a question of whether or not this approach handles multi-GB or greater files, as it seems like just returning bytes from a function may run out of memory on larger files. An example use case would be a pipeline that works on very large files, but each step iterates through the file to do its work, instead of loading it into memory.

@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 24, 2019

Tried using the output artifacts as mentioned in #1654 (comment)

By support I meant the technical ability of the underlying DSL and compiler. The ability to write a program that reads and writes file, then create component.yaml for it, load it, use in the pipeline, compile and have the system pass the data files for you. This did not yet include the improved support in Lighweight components.
The improved support comes to Lightweight components this week.

The problem is two-fold:

  1. Automatic serialization of arguments, default values and output values. The default serializer is str. Other serializers must be added to the SDK explicitly. I'm not sure what would be a best way to serialize binary data into string. It's not technically needed in this particular case, but how would you do the binary data serialization for default values and arguments?

  2. The ability to produce and consume binary data. As mentioned previously, this ability exist, but it will only come to Lightweight components in 1-2 days.

InputBinaryFile

Wow. You're on the bleeding edge =). The last piece you need is an even more bleeding edge: #2221 Both should be released in couple of days.

def task1_fn(file: OutputBinaryFile(bytes)):
    file.write(b'\x00\x01')

@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 24, 2019

P.S.

  1. Why do you install the kfp package inside the container? kfp should not be needed at runtime.
  2. Why do you add that installation code through extra_code instead of including it in the function?

@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 24, 2019

components.InputBinaryFile

Oh I see why you're doing this. This syntax won't work - you need to use plain InputBinaryFile without the module name (there would be plain class InputBinaryFile in runtime).
I can try fixing this limitation (by removing the annotations), but it might not be trivial.

@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 24, 2019

import kfp
from kfp import InputBinaryFile, OutputBinaryFile, func_to_container_op
from kubernetes import client

def task1_fn(file: OutputBinaryFile(bytes)):
    file.write(b'\x00\x01')

def task2_fn(file: InputBinaryFile(bytes)):
    assert file.read() == b'\x00\x01'

#@dsl.pipeline(name='Artifact Passing Test')
def artifact_pipeline():
    task1 = func_to_container_op(task1_fn, base_image='python:3.7')
    task2 = func_to_container_op(task2_fn, base_image='python:3.7')
    t1 = task1()
    t2 = task2(t1.outputs['foo'])

    t1.add_volume(client.V1Volume(name='outputs', empty_dir=client.V1EmptyDirVolumeSource()))
    t1.container.add_volume_mount(client.V1VolumeMount(name='outputs', mount_path='/tmp/outputs'))

kfp.run_pipeline_func_on_cluster(artifact_pipeline, arguments={})

@knkski
Copy link
Author

knkski commented Sep 25, 2019

Awesome, thanks for the help! It's working for me now. And no, I don't have a need for components.InputBinaryFile vs just InputBinaryFile, I just wasn't aware that one vs the other would work.

@knkski knkski closed this as completed Sep 25, 2019
@Ark-kun
Copy link
Contributor

Ark-kun commented Sep 25, 2019

just wasn't aware that one vs the other would work.

Generally you should not need to be aware about this. But the current implementation of Lightweight components uses code copying and requires the functions to be self-contained (unless code pickling is used). While this does not add many limitation to the function contents, this puts severe limitations on type annotations used in the function signature. At this moment only NamedTuple, Input* and Output* are explicitly supported. And they must be referenced without module name.

You can easily see the generated code inside the component file that you can optionally create. The code is plain Python without any special libs.

kfp.components.func_to_container_op(task2_fn, base_image='python:3.7', output_component_file='task2.component.yaml')
# or the less common
kfp.components.func_to_component_text(task2_fn, base_image='python:3.7')
name: Task2 fn
inputs:
- name: file
  type: bytes
outputs: []
implementation:
  container:
    image: python:3.7
    command:
    - python3
    - -u
    - -c
    - |
      class InputBinaryFile:
          '''When creating component from function, InputBinaryFile should be used as function parameter annotation to tell the system to pass the *binary data stream* object (`io.BytesIO`) to the function instead of passing the actual data.'''
          def __init__(self, type=None):
              self.type = type

      def task2_fn(file: InputBinaryFile(bytes)):
          assert file.read() == b'\x00\x01'

      import argparse
      _parser = argparse.ArgumentParser(prog='Task2 fn', description='')
      _parser.add_argument("--file", dest="file", type=argparse.FileType('rb'), required=True, default=argparse.SUPPRESS)
      _parsed_args = vars(_parser.parse_args())
      _output_files = _parsed_args.pop("_output_paths", [])

      _outputs = task2_fn(**_parsed_args)

      if not hasattr(_outputs, '__getitem__') or isinstance(_outputs, str):
          _outputs = [_outputs]

      _output_serializers = [

      ]

      import os
      for idx, output_file in enumerate(_output_files):
          try:
              os.makedirs(os.path.dirname(output_file))
          except OSError:
              pass
          with open(output_file, 'w') as f:
              f.write(_output_serializers[idx](_outputs[idx]))
    args:
    - --file
    - {inputPath: file}

P.S. The component file can be used for sharing. It can be put anywhere and then loaded using kfp.component.load_component_from_url or kfp.component.load_component_from_file

@Ark-kun Ark-kun self-assigned this Sep 27, 2019
magdalenakuhn17 pushed a commit to magdalenakuhn17/pipelines that referenced this issue Oct 22, 2023
* update graph sample

Signed-off-by: iamlovingit <bitfrog@163.com>

* update graph.png

Signed-off-by: iamlovingit <bitfrog@163.com>

* add bgtest code

Signed-off-by: iamlovingit <bitfrog@163.com>

* update diagram path

Signed-off-by: iamlovingit <bitfrog@163.com>

* upadte ensemble sample.

Signed-off-by: iamlovingit <bitfrog@163.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants