Skip to content

Commit

Permalink
fix quoting issue with argo (#1456)
Browse files Browse the repository at this point in the history
* fix quoting issue with argo

* cleanup testing for payload stringification

---------

Co-authored-by: Sakari Ikonen <sakari.a.ikonen@gmail.com>
  • Loading branch information
savingoyal and saikonen authored Jun 15, 2023
1 parent f61a8fc commit 0ea964b
Showing 1 changed file with 15 additions and 17 deletions.
32 changes: 15 additions & 17 deletions metaflow/plugins/argo/argo_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,18 +444,16 @@ def _process_triggers(self):
# Assign a sanitized name since we need this at many places to please
# Argo Events sensors. There is a slight possibility of name collision
# but quite unlikely for us to worry about at this point.
event["sanitized_name"] = event["name"]
if any([x in event["name"] for x in [".", "-", "@", "+"]]):
event["sanitized_name"] = "%s_%s" % (
event["name"]
.replace(".", "")
.replace("-", "")
.replace("@", "")
.replace("+", ""),
to_unicode(
base64.b32encode(sha1(to_bytes(event["name"])).digest())
)[:4].lower(),
)
event["sanitized_name"] = "%s_%s" % (
event["name"]
.replace(".", "")
.replace("-", "")
.replace("@", "")
.replace("+", ""),
to_unicode(base64.b32encode(sha1(to_bytes(event["name"])).digest()))[
:4
].lower(),
)
return triggers, options

def _compile_workflow_template(self):
Expand Down Expand Up @@ -938,10 +936,8 @@ def _container_templates(self):
]
+ [
# Parameter names can be hyphenated, hence we use
# {{foo.bar['param_name']}}. We quote the value to
# make sure whitespaces are properly handled since
# Argo Events wouldn't do that for us.
"--%s='{{workflow.parameters.%s}}'"
# {{foo.bar['param_name']}}.
"--%s={{workflow.parameters.%s}}"
% (parameter["name"], parameter["name"])
for parameter in self.parameters.values()
]
Expand Down Expand Up @@ -1274,12 +1270,13 @@ def _container_templates(self):
+ ARGO_WORKFLOWS_KUBERNETES_SECRETS.split(",")
if k
],
# Assign a volume point to pass state to the next task.
volume_mounts=[
# Assign a volume mount to pass state to the next task.
kubernetes_sdk.V1VolumeMount(
name="out", mount_path="/mnt/out"
)
]
# Support tmpfs.
+ (
[
kubernetes_sdk.V1VolumeMount(
Expand All @@ -1290,6 +1287,7 @@ def _container_templates(self):
if tmpfs_enabled
else []
)
# Support persistent volume claims.
+ (
[
kubernetes_sdk.V1VolumeMount(
Expand Down

0 comments on commit 0ea964b

Please sign in to comment.