Skip to content

Dynamic workflows, activities, signals, and queries #346

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 14, 2023

Conversation

cretz
Copy link
Member

@cretz cretz commented Jul 13, 2023

What was changed

  • Add a temporalio.common.RawValue class which represents a raw payload that is never converted (but still encoded/decoded)
  • Update composite/default payload converter to pass through that type by default
  • Add dynamic=True for @workflow.defn which requires the run method to accept a single Sequence[RawValue] param
  • Add dynamic=True for @activity.defn which requires the function to accept a single Sequence[RawValue] param
  • Updated @workflow.signal to work with Sequence[RawValue] args and deprecate/warn when using the existing style (which was varargs of already-converted values which makes little sense)
  • Updated @workflow.update to work with Sequence[RawValue] args and deprecate/warn when using the existing style (which was varargs of already-converted values which makes little sense)
  • 💥 Interceptor behavior change! We now resolve signal and query handlers before invoking interceptors like all other SDKs. If a user had some advanced use case where they had a custom interceptor and expected it to be called even for unknown signals/queries, it now will not. This should be rare.

Checklist

  1. Closes [Feature Request] Support dynamic workflows, activities, signals, and queries #242

@cretz cretz requested a review from a team as a code owner July 13, 2023 17:28
Copy link
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, but I do think it'd probably make sense to pass the name as the first arg to dynamic workflows/activities?

Comment on lines +543 to +544
`name` argument, and run method must accept a single parameter of `Sequence[temporalio.common.RawValue]` type. The
payload of the raw value can be converted via `workflow.payload_converter().from_payload`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we'd want name argument? (To the workflow input)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We surface this via contextual workflow info, so I intentionally leave it off here (same as .NET)

Comment on lines +1059 to +1061
If present, cannot have `name` argument, and the activity function must accept a single parameter of
`Sequence[temporalio.common.RawValue]`. The payload of the raw value can be converted via
`activity.payload_converter().from_payload`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, is this also missing name?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We surface this via contextual activity info, so I intentionally leave it off here (same as .NET)

Comment on lines +79 to +81
dynamic: If true, this activity will be dynamic. Dynamic activities have
to accept a single 'Sequence[RawValue]' parameter. This cannot be
set to true if name is present.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't appear to actually be enforced, just ignored. Would be nice to actually enforce (or better, make unrepresentable if reasonable)

Copy link
Member Author

@cretz cretz Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unrepresentable with the overloads if you use a type checker which everyone should. Right now, name is just ignored when dynamic is present. If we want a runtime check, I can add it.

Comment on lines +804 to 805
defn = temporalio.workflow._QueryDefinition(
name=name, fn=handler, is_method=False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO for this & signal it'd be better to use some special sigil type to represent the dynamic handler rather than just None.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have used None to represent dynamic for signals/queries for a long time now. Not sure it's worth changing.

for converter in self.converters.values():
payload = converter.to_payload(value)
if payload is not None:
break
if payload is None:
raise RuntimeError(
f"Value at index {index} of type {type(value)} has no known converter"
Copy link
Contributor

@dandavison dandavison Jul 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just code golf but FWIW this function has probably gained enough complexity now that it would be more readable as a generator:

        return list(self._payloads(values))

    def _payloads(
        self, values Sequence[Any]
    ) -> Generator[temporalio.api.common.v1.Payload]:
        for index, value in enumerate(values):
            # We intentionally attempt these serially just in case a stateful
            # converter may rely on the previous values
            # RawValue should just pass through
            if isinstance(value, temporalio.common.RawValue):
                yield value.payload
                continue
            else:
                for converter in self.converters.values():
                    payload = converter.to_payload(value)
                    if payload is not None:
                        yield payload
                        continue
            raise RuntimeError(
                f"Value at index {index} of type {type(value)} has no known converter"
            )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this small function deserves to be delegated to another small function. I don't think it's that complicated. Also, your second continue needs to be a break.

@cretz
Copy link
Member Author

cretz commented Jul 14, 2023

Builds are intermittently (but often) failing with python/cpython#91351 on Windows that surfaces because our sandbox importlib. This seems to be very specific to pytest runs and warnings capturing. Based on others' research there, we may disable warnings capture for pytest on Windows.

Looking at linked issues on the one linked above, it seems at least for now disabling Python bytecode during Windows tests fixes it. This is a pytest only deal, so users are not affected.

@cretz cretz force-pushed the dynamic-workflows branch 2 times, most recently from 5e5c5b0 to 30494b3 Compare July 14, 2023 18:14
@cretz cretz merged commit 83bbc36 into temporalio:main Jul 14, 2023
@cretz cretz deleted the dynamic-workflows branch July 14, 2023 20:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Support dynamic workflows, activities, signals, and queries
3 participants