-
Notifications
You must be signed in to change notification settings - Fork 146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update-with-Start operation #2199
Conversation
bd04fc0
to
c7d8e94
Compare
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
c7d8e94
to
7b95dee
Compare
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
7b95dee
to
24a6205
Compare
assertEquals(e.getStatus().getCode(), Status.NOT_FOUND.getCode()); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to write a test for a failed server-side validation of the Update, but couldn't find any validation in the test server to leverage.
FWIW, I tried commenting out the client-side validation and made sure the server-side error is returned correctly.
24a6205
to
8e95d65
Compare
temporal-sdk/src/main/java/io/temporal/internal/client/RootWorkflowClientInvoker.java
Outdated
Show resolved
Hide resolved
264a5ef
to
b7ef808
Compare
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/StartWorkflowOperation.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/StartWorkflowOperation.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
public static final class Builder<R> { | ||
private final UpdateOptions.Builder<R> options; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current builder setup prevents users from writing something like:
<R> void applyDefaultUpdateOptions(UpdateOptions.Builder<R> builder) { ... }
That they can use for this update and other updates. This is probably not a big deal, but at least in Go they can write this helper. I just want to make sure we acknowledge that, hopefully unlike our other SDKs, there is no shared options building reuse in Java, and we'll have to make sure every future option we add to the UpdateOptions
that applies to update with start gets an associated setter here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies, I overlooked this one yesterday. To make sure I understand this right: the idea is to provide the above method on the Update operation builder? And it would set the operation's update options to the options of builder.build()
?
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowOptions.java
Outdated
Show resolved
Hide resolved
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
b7ef808
to
d79bd4e
Compare
temporal-sdk/src/main/java/io/temporal/client/WorkflowStartOperationUpdate.java
Outdated
Show resolved
Hide resolved
() -> | ||
service | ||
.blockingStub() | ||
.withOption(METRICS_TAGS_CALL_OPTIONS_KEY, metricsScope) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be including more metric tags here? start
includes the workflow type and task queue, update includes the update name. It might make sense to include them here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! 019690e
@@ -386,4 +386,15 @@ public GetWorkerTaskReachabilityResponse GetWorkerTaskReachability( | |||
.getWorkerTaskReachability(req), | |||
grpcRetryerOptions); | |||
} | |||
|
|||
@Override | |||
public ExecuteMultiOperationResponse executeMultiOperation(ExecuteMultiOperationRequest req) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
executeMultiOperation
is a potentially long operation correct because we won't return until the update reaches the desired stage. If so we need to take a deadline here like the update
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update is throwing a WorkflowUpdateTimeoutOrCancelledException
upon timeout; I'm not quite sure if I should re-use it here or not. What do you think?
(I'll re-use it for now to move forward and we can change it, if needed)
|
||
ExecuteMultiOperationResponse response; | ||
try { | ||
response = genericClient.executeMultiOperation(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not need to call this in a loop like regular update to avoid loosing the update requests ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good callout. We do. I don't think I did that in Go. I'll file a ticket for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, addressed here: 3652e31 - I couldn't see a good way to test that, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is something that needs to be tested with a unit test mocking the grpc stub
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 67bfbb4
import org.junit.Test; | ||
|
||
public class UpdateWithStartTest { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have any test here that tests the update was executed before the main workflow method? Maybe I just missed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I follow; why would that be an Update-with-Start-specific test? Isn't that already covered by the Update-specific tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because in the past there was not a good way to guarantee the update was in the first workflow task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My main concern is how do you know the test server is sending the update in the first workflow task when using update-with-start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Okay, I see what you mean.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so I think the startAndSendUpdateTogetherWithNullUpdateResult
test covers this. The workflow is
public static class TestUpdatedWorkflowImpl implements TestWorkflows.TestUpdatedWorkflow {
private String state;
@Override
public String execute() {
return state;
}
@Override
public void update(String arg) {
this.state = arg;
}
}
And the test asserts that the workflow result equals the update input. If the update wasn't executed first, it would return null
instead and the test fails.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed here: 2a39d5e
} | ||
|
||
if (response.getResponsesCount() != request.getOperationsCount()) { | ||
throw new Error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not throw Error
from the workflow client. The SDK never throws Error
from the client, generally Error
should only be thrown if there is something critically wrong and unrecoverable within the application itself, not some downstream service. (In Workflow code we throw errors as well, but that is to avoid users catching internal exceptions and is more an exception then the rule)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, that's good to know! 092a8af
* Returns a new builder for an {@link UpdateWithStartWorkflowOperation} for a one argument | ||
* request. | ||
* | ||
* @param request method reference to a proxy created through {@link |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method reference has to be a a method annotated as an Update correct? Can we clarify that in the docs here? I don't think it is immediately obvious looking at just this class what methods it can take
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point! a05748d
return this.handle; | ||
} | ||
|
||
public void setUpdateHandle(WorkflowUpdateHandle<R> updateHandle) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your calling it from the root interceptor and that is probably the root of the issue. I think this should be set at the top of the interceptor chain and I think that will also fix the visibility here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I had called that out before (in the first version of the PR); that's a good idea on how to solve that. c2e0b42
49c7e10
to
092a8af
Compare
} | ||
|
||
// execute update | ||
UpdateWorkflowExecutionResponse updateResult; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you know this update is always sent as part of the first workflow task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've responded here: #2199 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test is good, but looking at the code it is not clear to me the test server implementation actually guarantees the update will be in the same workflow task? My specific concern is the startWorkflowExecutionImpl
and the updateWorkflowExecutionImpl
are separate calls?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked out your branch and added a short 1s sleep between the call startWorkflowExecutionImpl
and updateWorkflowExecutionImpl
and confirmed the update will not be delivered in the first workflow task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I've poked around the server a little more, in particular TestWorkflowMutableStateImpl
, and can see how this happens now. I don't see an easy fix right now. The two MS updates need to happen together to guarantee they are also delivered together to the worker. But I don't see any kind of "transaction" API. I'll need to dig deeper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed here: 2a39d5e
() -> | ||
exception.set( | ||
assertThrows( | ||
WorkflowServiceException.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The equivalent Update test receives the WorkflowUpdateTimeoutOrCancelledException
directly here. I'm not sure why here it's wrapped inside a WorkflowServiceException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Workflow client and test server changes LGTM
What was changed
Adds support for Update-with-Start, using the MultiOperation API (temporalio/api#367).
Why?
To allow Update-with-Start feature.