Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution inputs as container arguments for processing jobs #197

Open
francescocamussoni opened this issue Jun 1, 2023 · 6 comments
Open

Comments

@francescocamussoni
Copy link

francescocamussoni commented Jun 1, 2023

I'm trying to use execution inputs as container arguements for my processing job:

execution_input = ExecutionInput(
    schema={
        "IngestaJobName": str,
        "PreprocessingJobName": str,
        "InferenceJobName": str,
        "Fecha": str,
    }
)
#Call step
ingesta_step = ProcessingStep(
    inference_config["ingesta_step_name"],
    processor=ingesta_processor,
    job_name=execution_input['IngestaJobName'],
    inputs=inputs_ingesta,
    outputs=outputs_ingesta,
    container_arguments=["--fecha", "$$.Execution.Input['Fecha']"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/"+inference_config["ingesta_function"]], 
)

I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]]

But in both cases it doesn't work.

Use Case

When I lunch a new execution of my state machine, it would be useful to get some execution inputs as a container argument in order to define some parameters of intereset that will be define the behaviour of the step directly by the execution input without updating the state machine definition


This is a 🚀 Feature Request

@francescocamussoni francescocamussoni changed the title short issue description Execution inputs as container arguments for processing jobs Jun 1, 2023
@wong-a
Copy link
Contributor

wong-a commented Jun 2, 2023

I've also tried to replace container_arguments for ["--fecha", execution_input["Fecha"]]
But in both cases it doesn't work.

Can you elaborate on the behaviour you are seeing?

@francescocamussoni
Copy link
Author

francescocamussoni commented Jun 2, 2023

With ["--fecha", execution_input["Fecha"]]
I get this error TypeError: Object of type ExecutionInput is not JSON serializable

When I execute

branching_workflow = Workflow(
    name=pipeline_name_step,
    definition=workflow_graph,
    role=role,
    execution_input=execution_input
).create()

This is the complete error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Input In [282], in <cell line: 2>()
      1 # Create or update your StateMachine Workflow
----> 2 branch_workflow = branching_workflow.create()

File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:205, in Workflow.create(self)
    202     return self.state_machine_arn
    204 try:
--> 205     self.state_machine_arn = self._create()
    206 except self.client.exceptions.StateMachineAlreadyExists as e:
    207     self.state_machine_arn = self._extract_state_machine_arn(e)

File /usr/local/lib/python3.8/site-packages/stepfunctions/workflow/stepfunctions.py:215, in Workflow._create(self)
    212 def _create(self):
    213     response = self.client.create_state_machine(
    214         name=self.name,
--> 215         definition=self.definition.to_json(pretty=self.format_json),
    216         roleArn=self.role,
    217         tags=self.tags
    218     )
    219     logger.info("Workflow created successfully on AWS Step Functions.")
    220     return response['stateMachineArn']

File /usr/local/lib/python3.8/site-packages/stepfunctions/steps/states.py:91, in Block.to_json(self, pretty)
     82 """Serialize to a JSON formatted string.
     83 
     84 Args:
   (...)
     88     str: JSON formatted string representation of the block.
     89 """
     90 if pretty:
---> 91     return json.dumps(self.to_dict(), indent=4)
     93 return json.dumps(self.to_dict())

File /usr/local/lib/python3.8/json/__init__.py:234, in dumps(obj, skipkeys, ensure_ascii, check_circular, allow_nan, cls, indent, separators, default, sort_keys, **kw)
    232 if cls is None:
    233     cls = JSONEncoder
--> 234 return cls(
    235     skipkeys=skipkeys, ensure_ascii=ensure_ascii,
    236     check_circular=check_circular, allow_nan=allow_nan, indent=indent,
    237     separators=separators, default=default, sort_keys=sort_keys,
    238     **kw).encode(obj)

File /usr/local/lib/python3.8/json/encoder.py:201, in JSONEncoder.encode(self, o)
    199 chunks = self.iterencode(o, _one_shot=True)
    200 if not isinstance(chunks, (list, tuple)):
--> 201     chunks = list(chunks)
    202 return ''.join(chunks)

File /usr/local/lib/python3.8/json/encoder.py:431, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    429     yield from _iterencode_list(o, _current_indent_level)
    430 elif isinstance(o, dict):
--> 431     yield from _iterencode_dict(o, _current_indent_level)
    432 else:
    433     if markers is not None:

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

    [... skipping similar frames: _make_iterencode.<locals>._iterencode_dict at line 405 (2 times)]

File /usr/local/lib/python3.8/json/encoder.py:405, in _make_iterencode.<locals>._iterencode_dict(dct, _current_indent_level)
    403         else:
    404             chunks = _iterencode(value, _current_indent_level)
--> 405         yield from chunks
    406 if newline_indent is not None:
    407     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:325, in _make_iterencode.<locals>._iterencode_list(lst, _current_indent_level)
    323         else:
    324             chunks = _iterencode(value, _current_indent_level)
--> 325         yield from chunks
    326 if newline_indent is not None:
    327     _current_indent_level -= 1

File /usr/local/lib/python3.8/json/encoder.py:438, in _make_iterencode.<locals>._iterencode(o, _current_indent_level)
    436         raise ValueError("Circular reference detected")
    437     markers[markerid] = o
--> 438 o = _default(o)
    439 yield from _iterencode(o, _current_indent_level)
    440 if markers is not None:

File /usr/local/lib/python3.8/json/encoder.py:179, in JSONEncoder.default(self, o)
    160 def default(self, o):
    161     """Implement this method in a subclass such that it returns
    162     a serializable object for ``o``, or calls the base implementation
    163     (to raise a ``TypeError``).
   (...)
    177 
    178     """
--> 179     raise TypeError(f'Object of type {o.__class__.__name__} '
    180                     f'is not JSON serializable')

TypeError: Object of type ExecutionInput is not JSON serializable

On the other hand, with ["--fecha", "$$.Execution.Input['Fecha']"], I get $$.Execution.Input['Fecha'] as a literal string inside the .py of the processing job, it seems that the placeholder doesn't work.

image
image

@wong-a
Copy link
Contributor

wong-a commented Jun 5, 2023

Thanks for the additional details. The problem is likely that ProcessingStep passes container_arguments directly to `sagemaker.workflow.airflow#processing_config

if isinstance(job_name, str):
processing_parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id, job_name=job_name)
else:
processing_parameters = processing_config(processor=processor, inputs=inputs, outputs=outputs, container_arguments=container_arguments, container_entrypoint=container_entrypoint, kms_key_id=kms_key_id)

The work should be similar to #155

With some slight differences because difference because container_arguments is an array instead of a object

@francescocamussoni
Copy link
Author

Hi Wong, thank you for your response.

I don't know how to use parameters inside the processing job. I've tried
image
But its the same as defining a container argument, so I get the same error
image

@liyenhsu
Copy link

Hi @francescocamussoni did you resolve this? I'm having the exact same issue. Thanks!

@liyenhsu
Copy link

I have figured it out - the entire container_arguments needs to be one placeholder for a list. Therefore, using the example from the original post above, we should do the following:

from stepfunctions.inputs import ExecutionInput

execution_input = ExecutionInput(
    schema={
        "IngestaJobName": str,
        "PreprocessingJobName": str,
        "InferenceJobName": str,
        "args": list
    }
)

ingesta_step = ProcessingStep(
    inference_config["ingesta_step_name"],
    processor=ingesta_processor,
    job_name=execution_input["IngestaJobName"],
    inputs=inputs_ingesta,
    outputs=outputs_ingesta,
    container_arguments=execution_input["args"],
    container_entrypoint=["python3", "/opt/ml/processing/input/code/"+inference_config["ingesta_function"]], 
)

And then when calling Workflow.execute(inputs=inputs), pass the entire list of argument inside the dictionary inputs:

inputs = {"args": ["--fecha", "value-for-fecha"], ...}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants