Skip to content

Commit

Permalink
refactor: extract command writing from the GrpcToLogStreamGateway
Browse files Browse the repository at this point in the history
The GrpcToLogStreamGateway should not have to worry about any concurrency issues. By extracting the command writing to a separate class we have this logic isolated.
  • Loading branch information
remcowesterhoud committed May 12, 2022
1 parent acaa2a8 commit 40b73c1
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/

package io.camunda.zeebe.process.test.engine;

import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.util.buffer.BufferWriter;

/**
* This record is responsible for writing the commands to the {@link LogStreamRecordWriter} in a
* thread-safe way.
*/
record CommandWriter(LogStreamRecordWriter writer) {

void writeCommandWithKey(
final Long key, final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.key(key).metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}

void writeCommandWithoutKey(
final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.keyNull().metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@ public static ZeebeTestEngine create(final int port) {
new SubscriptionCommandSenderFactory(
logStream.newLogStreamRecordWriter().join(), partitionId);

final CommandWriter commandWriter =
new CommandWriter(logStream.newLogStreamRecordWriter().join());
final GatewayRequestStore gatewayRequestStore = new GatewayRequestStore();
final GrpcToLogStreamGateway gateway =
new GrpcToLogStreamGateway(
logStream.newLogStreamRecordWriter().join(),
partitionId,
partitionCount,
port,
gatewayRequestStore);
commandWriter, partitionId, partitionCount, port, gatewayRequestStore);
final Server grpcServer = ServerBuilder.forPort(port).addService(gateway).build();

final GrpcResponseWriter grpcResponseWriter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.TopologyResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.UpdateJobRetriesResponse;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.msgpack.value.ValueArray;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand All @@ -65,19 +64,18 @@
import io.camunda.zeebe.protocol.record.value.VariableDocumentUpdateSemantic;
import io.camunda.zeebe.util.VersionUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import io.grpc.stub.StreamObserver;

class GrpcToLogStreamGateway extends GatewayGrpc.GatewayImplBase {

private final LogStreamRecordWriter writer;
private final CommandWriter writer;
private final int partitionId;
private final int partitionCount;
private final int port;
private final GatewayRequestStore gatewayRequestStore;

public GrpcToLogStreamGateway(
final LogStreamRecordWriter writer,
final CommandWriter writer,
final int partitionId,
final int partitionCount,
final int port,
Expand All @@ -89,22 +87,6 @@ public GrpcToLogStreamGateway(
this.gatewayRequestStore = gatewayRequestStore;
}

private void writeCommandWithKey(
final Long key, final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.key(key).metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}

private void writeCommandWithoutKey(
final BufferWriter bufferWriter, final RecordMetadata recordMetadata) {
synchronized (writer) {
writer.reset();
writer.keyNull().metadataWriter(recordMetadata).valueWriter(bufferWriter).tryWrite();
}
}

@Override
public void activateJobs(
final ActivateJobsRequest request,
Expand All @@ -125,7 +107,7 @@ public void activateJobs(
jobBatchRecord.setTimeout(request.getTimeout());
jobBatchRecord.setMaxJobsToActivate(request.getMaxJobsToActivate());

writeCommandWithoutKey(jobBatchRecord, recordMetadata);
writer.writeCommandWithoutKey(jobBatchRecord, recordMetadata);
}

@Override
Expand All @@ -144,7 +126,8 @@ public void cancelProcessInstance(
final ProcessInstanceRecord processInstanceRecord = new ProcessInstanceRecord();
processInstanceRecord.setProcessInstanceKey(request.getProcessInstanceKey());

writeCommandWithKey(request.getProcessInstanceKey(), processInstanceRecord, recordMetadata);
writer.writeCommandWithKey(
request.getProcessInstanceKey(), processInstanceRecord, recordMetadata);
}

@Override
Expand All @@ -167,7 +150,7 @@ public void completeJob(
jobRecord.setVariables(BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables)));
}

writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand All @@ -185,7 +168,7 @@ public void createProcessInstance(

final ProcessInstanceCreationRecord processInstanceCreationRecord =
createProcessInstanceCreationRecord(request);
writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
writer.writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
}

@Override
Expand All @@ -205,7 +188,7 @@ public void createProcessInstanceWithResult(
createProcessInstanceCreationRecord(request.getRequest());
processInstanceCreationRecord.setFetchVariables(request.getFetchVariablesList());

writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
writer.writeCommandWithoutKey(processInstanceCreationRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -234,7 +217,7 @@ public void deployProcess(
.setResource(processRequestObject.getDefinition().toByteArray());
}));

writeCommandWithoutKey(deploymentRecord, recordMetadata);
writer.writeCommandWithoutKey(deploymentRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -262,7 +245,7 @@ public void deployResource(
.setResourceName(resource.getName())
.setResource(resource.getContent().toByteArray())));

writeCommandWithoutKey(deploymentRecord, recordMetadata);
writer.writeCommandWithoutKey(deploymentRecord, recordMetadata);
}

@Override
Expand All @@ -282,7 +265,7 @@ public void failJob(
jobRecord.setRetries(request.getRetries());
jobRecord.setErrorMessage(request.getErrorMessage());

writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand All @@ -302,7 +285,7 @@ public void throwError(
jobRecord.setErrorCode(BufferUtil.wrapString(request.getErrorCode()));
jobRecord.setErrorMessage(request.getErrorMessage());

writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -330,7 +313,7 @@ public void publishMessage(
BufferUtil.wrapArray(MsgPackConverter.convertToMsgPack(variables)));
}

writeCommandWithoutKey(messageRecord, recordMetadata);
writer.writeCommandWithoutKey(messageRecord, recordMetadata);
}

@Override
Expand All @@ -348,7 +331,7 @@ public void resolveIncident(

final IncidentRecord incidentRecord = new IncidentRecord();

writeCommandWithKey(request.getIncidentKey(), incidentRecord, recordMetadata);
writer.writeCommandWithKey(request.getIncidentKey(), incidentRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -378,7 +361,7 @@ public void setVariables(
? VariableDocumentUpdateSemantic.LOCAL
: VariableDocumentUpdateSemantic.PROPAGATE);

writeCommandWithoutKey(variableDocumentRecord, recordMetadata);
writer.writeCommandWithoutKey(variableDocumentRecord, recordMetadata);
}

@Override
Expand Down Expand Up @@ -428,7 +411,7 @@ public void updateJobRetries(
final JobRecord jobRecord = new JobRecord();
jobRecord.setRetries(request.getRetries());

writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
writer.writeCommandWithKey(request.getJobKey(), jobRecord, recordMetadata);
}

private RecordMetadata prepareRecordMetadata() {
Expand Down

0 comments on commit 40b73c1

Please sign in to comment.