Skip to content

Commit

Permalink
merge: #1012
Browse files Browse the repository at this point in the history
1012: Support using process instance migration from the zeebe client r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This adds support for the new `MigrateProcessInstance` RPC to ZPT, such that users can try migrating process instances from ZPT. This allows testing migrations before doing so in production.

> [!NOTE]
> This does not add assertions for the migration. That is out of scope.

Additionally, this fixes a test case that no longer added value. The test case was intended to ensure that we don't forget supporting RPCs in ZPT, but the implementation of the test did not function correctly anymore. Now it is able to correctly detect unsupported RPCs again, which [highlighted that `streamActivatedJobs` is not supported by ZPT](camunda/camunda#11231 (comment)).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #972

<!-- Cut-off marker
_All lines under and including the cut-off marker will be removed from the merge commit message_

## Definition of Ready

Please check the items that apply, before requesting a review.

You can find more details about these items in our wiki page about [Pull Requests and Code Reviews](https://github.com/camunda/zeebe/wiki/Pull-Requests-and-Code-Reviews).

* [ ] I've reviewed my own code
* [ ] I've written a clear changelist description
* [ ] I've narrowly scoped my changes
* [ ] I've separated structural from behavioural changes
-->

## Definition of Done

<!-- Please check the items that apply, before merging or (if possible) before requesting a review. -->

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [x] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to backport the fix

Testing:
* [x] There are unit/integration tests that verify all acceptance criterias of the issue
* [x] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually

Documentation:
* [ ] Javadoc has been written
* [ ] The documentation is updated


Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout authored Jan 2, 2024
2 parents fe33845 + 46a2e5c commit 67c23d8
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FormMetadata;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MatchedDecisionRule;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ProcessMetadata;
Expand Down Expand Up @@ -100,6 +102,8 @@ class GrpcResponseMapper {
Map.entry(UpdateJobRetriesRequest.class, this::createJobUpdateRetriesResponse),
Map.entry(UpdateJobTimeoutRequest.class, this::createJobUpdateTimeOutResponse),
Map.entry(ModifyProcessInstanceRequest.class, this::createModifyProcessInstanceResponse),
Map.entry(
MigrateProcessInstanceRequest.class, this::createMigrateProcessInstanceResponse),
Map.entry(BroadcastSignalRequest.class, this::createBroadcastSignalResponse));

GeneratedMessageV3 map(
Expand Down Expand Up @@ -294,6 +298,10 @@ private GeneratedMessageV3 createModifyProcessInstanceResponse() {
return ModifyProcessInstanceResponse.newBuilder().build();
}

private GeneratedMessageV3 createMigrateProcessInstanceResponse() {
return MigrateProcessInstanceResponse.getDefaultInstance();
}

private GeneratedMessageV3 createBroadcastSignalResponse() {
final SignalRecord signal = new SignalRecord();
signal.wrap(valueBufferView);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.EvaluateDecisionResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.FailJobResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.MigrateProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ModifyProcessInstanceResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.Partition;
Expand Down Expand Up @@ -61,6 +63,8 @@
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationStartInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationMappingInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceMigrationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationActivateInstruction;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationTerminateInstruction;
Expand All @@ -79,6 +83,7 @@
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceMigrationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ResourceDeletionIntent;
import io.camunda.zeebe.protocol.record.intent.SignalIntent;
Expand Down Expand Up @@ -474,6 +479,35 @@ public void modifyProcessInstance(
writer.writeCommandWithKey(request.getProcessInstanceKey(), record, recordMetadata);
}

@Override
public void migrateProcessInstance(
final MigrateProcessInstanceRequest request,
final StreamObserver<MigrateProcessInstanceResponse> responseObserver) {
final var requestId =
gatewayRequestStore.registerNewRequest(request.getClass(), responseObserver);

final var command =
new ProcessInstanceMigrationRecord()
.setProcessInstanceKey(request.getProcessInstanceKey())
.setTargetProcessDefinitionKey(
request.getMigrationPlan().getTargetProcessDefinitionKey());

request.getMigrationPlan().getMappingInstructionsList().stream()
.map(
instruction ->
new ProcessInstanceMigrationMappingInstruction()
.setSourceElementId(instruction.getSourceElementId())
.setTargetElementId(instruction.getTargetElementId()))
.forEach(command::addMappingInstruction);

writer.writeCommandWithoutKey(
command,
prepareRecordMetadata()
.requestId(requestId)
.valueType(ValueType.PROCESS_INSTANCE_MIGRATION)
.intent(ProcessInstanceMigrationIntent.MIGRATE));
}

@Override
public void updateJobTimeout(
final UpdateJobTimeoutRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,65 @@ void shouldModifyProcessInstance() throws InterruptedException, TimeoutException
.isNotEmpty();
}

@Test
void shouldMigrateProcessInstance() throws InterruptedException, TimeoutException {
// given
final DeploymentEvent deployment =
zeebeClient
.newDeployResourceCommand()
.addProcessModel(
Bpmn.createExecutableProcess("sourceProcess")
.startEvent()
.serviceTask("A", b -> b.zeebeJobType("a"))
.endEvent()
.done(),
"sourceProcess.bpmn")
.addProcessModel(
Bpmn.createExecutableProcess("targetProcess")
.startEvent()
.serviceTask("B", b -> b.zeebeJobTypeExpression("b"))
.endEvent()
.done(),
"targetProcess.bpmn")
.send()
.join();

final long processInstanceKey =
zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId("sourceProcess")
.latestVersion()
.send()
.join()
.getProcessInstanceKey();

zeebeEngine.waitForIdleState(Duration.ofSeconds(1));

final long targetProcessDefinitionKey =
deployment.getProcesses().stream()
.filter(p -> p.getBpmnProcessId().equals("targetProcess"))
.findFirst()
.orElseThrow()
.getProcessDefinitionKey();
zeebeClient
.newMigrateProcessInstanceCommand(processInstanceKey)
.migrationPlan(targetProcessDefinitionKey)
.addMappingInstruction("A", "B")
.send()
.join();

assertThat(
StreamSupport.stream(
RecordStream.of(zeebeEngine.getRecordStreamSource())
.processInstanceRecords()
.spliterator(),
false)
.filter(r -> r.getIntent() == ProcessInstanceIntent.ELEMENT_MIGRATED)
.filter(r -> r.getValue().getElementId().equals("B"))
.findFirst())
.isNotEmpty();
}

@Test
void shouldUpdateVariablesOnProcessInstance() {
// given
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayImplBase;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -21,6 +22,19 @@

class GrpcToLogStreamGatewayTest {

static final List<String> UNSUPPORTED_METHODS = List.of("streamActivatedJobs");

static final List<String> IGNORED_METHODS =
List.of(
"bindService",
"equals",
"getClass",
"hashCode",
"notify",
"notifyAll",
"toString",
"wait");

@ParameterizedTest(name = "{0}")
@MethodSource("provideMethods")
void testImplementsGatewayEndpoint(final String methodName) {
Expand All @@ -30,13 +44,21 @@ void testImplementsGatewayEndpoint(final String methodName) {
.findAny();

assertThat(optionalMethod)
.describedAs("Expected method %s to be implemented", methodName)
.describedAs(
"""
Expected method %s to be implemented. \
When this test fails, it's likely a new RPC that ZPT should support. \
Please check whether it should be supported by ZPT. \
If it should be suported, add a test case to EngineClientTest.java""",
methodName)
.isPresent();
}

private static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getDeclaredMethods())
.filter(method -> !method.getName().equals("bindService"))
.map(method -> Arguments.of(method.getName()));
static Stream<Arguments> provideMethods() {
return Arrays.stream(GatewayImplBase.class.getMethods())
.map(Method::getName)
.filter(name -> !IGNORED_METHODS.contains(name))
.filter(name -> !UNSUPPORTED_METHODS.contains(name))
.map(Arguments::of);
}
}

0 comments on commit 67c23d8

Please sign in to comment.