Skip to content

Commit

Permalink
chore(webhook): Provide proper error response in case of correlation …
Browse files Browse the repository at this point in the history
…key expression failures (#1272)
  • Loading branch information
sbuettner authored Oct 11, 2023
1 parent 897d326 commit 58ebdc9
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.client.api.response.PublishMessageResponse;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -153,15 +154,15 @@ protected CorrelationResult<CorrelatedMessageStart> triggerMessageStartEvent(
Object extractedVariables = extractVariables(variables, definition);

try {
String correlationKey =
var correlationKey =
extractCorrelationKey(correlationPoint.correlationKeyExpression(), variables);
PublishMessageResponse result =
zeebeClient
.newPublishMessageCommand()
.messageName(correlationPoint.messageName())
// correlation key must be empty to start a new process, see:
// https://docs.camunda.io/docs/components/modeler/bpmn/message-events/#message-start-events
.correlationKey(correlationKey)
.correlationKey(correlationKey.orElse(""))
.messageId(messageId)
.tenantId(definition.tenantId())
.variables(extractedVariables)
Expand Down Expand Up @@ -198,18 +199,20 @@ protected CorrelationResult<CorrelatedMessage> triggerMessage(
String correlationKeyExpression,
Object variables,
String messageId) {

String correlationKey = extractCorrelationKey(correlationKeyExpression, variables);

if (!isActivationConditionMet(definition, variables)) {
LOG.debug("Activation condition didn't match: {}", definition.correlationPoint());
return new MessageCorrelationResult(
messageName,
new CorrelationErrorData(CorrelationErrorReason.ACTIVATION_CONDITION_NOT_MET));
}
String correlationKey =
extractCorrelationKey(correlationKeyExpression, variables)
.orElseThrow(
() ->
new ConnectorException(
"Correlation key not resolved: " + correlationKeyExpression));

Object extractedVariables = extractVariables(variables, definition);

try {
PublishMessageResponse response =
zeebeClient
Expand All @@ -224,7 +227,6 @@ protected CorrelationResult<CorrelatedMessage> triggerMessage(

LOG.info("Published message with key: " + response.getMessageKey());
return new MessageCorrelationResult(messageName, response.getMessageKey());

} catch (Exception e) {
throw new ConnectorException(
"Failed to publish process message for subscription: " + definition.correlationPoint(),
Expand All @@ -248,15 +250,21 @@ protected boolean isActivationConditionMet(
}
}

protected String extractCorrelationKey(String correlationKeyExpression, Object context) {
if (correlationKeyExpression == null || correlationKeyExpression.isBlank()) {
return "";
}
try {
return feelEngine.evaluate(correlationKeyExpression, context, String.class);
} catch (Exception e) {
throw new ConnectorInputException(e);
protected Optional<String> extractCorrelationKey(
String correlationKeyExpression, Object context) {
Optional<String> correlationKey;
if (correlationKeyExpression != null && !correlationKeyExpression.isBlank()) {
try {
correlationKey =
Optional.ofNullable(
feelEngine.evaluate(correlationKeyExpression, context, String.class));
} catch (Exception e) {
correlationKey = Optional.empty();
}
} else {
correlationKey = Optional.empty();
}
return correlationKey;
}

protected String extractMessageId(String messageIdExpression, Object context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ void boundaryMessageEvent_shouldCallCorrectZeebeMethod() {
// given
var point =
new BoundaryEventCorrelationPoint(
"test-boundary", "", "123", new BoundaryEventCorrelationPoint.Activity("123", "test"));
"test-boundary",
"=\"test\"",
"123",
new BoundaryEventCorrelationPoint.Activity("123", "test"));
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);

Expand All @@ -176,7 +179,7 @@ void boundaryMessageEvent_shouldCallCorrectZeebeMethod() {
verifyNoMoreInteractions(zeebeClient);

verify(dummyCommand).messageName("test-boundary");
verify(dummyCommand).correlationKey("");
verify(dummyCommand).correlationKey("test");
verify(dummyCommand).messageId("123");
verify(dummyCommand).send();
}
Expand Down Expand Up @@ -521,7 +524,7 @@ void messageIdIsNull_expressionIsNull_usesRandomUuid() {
var dummyCommand = spy(new PublishMessageCommandDummy());
when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand);
// when
handler.correlate(definition, Collections.emptyMap());
handler.correlate(definition, Collections.singletonMap("correlationKey", "testkey"));
// then
ArgumentCaptor<String> messageIdCaptor = ArgumentCaptor.forClass(String.class);
verify(dummyCommand).messageId(messageIdCaptor.capture());
Expand All @@ -534,7 +537,7 @@ void messageIdIsNull_expressionIsNull_usesRandomUuid() {
@Test
void messageIdIsNull_expressionIsProvided_usesExtractedMessageId() {
// given
var point = new MessageCorrelationPoint("msg1", "=correlationKey", "=extractedId");
var point = new MessageCorrelationPoint("msg1", "=extractedId", "=extractedId");
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
var dummyCommand = spy(new PublishMessageCommandDummy());
Expand All @@ -549,7 +552,7 @@ void messageIdIsNull_expressionIsProvided_usesExtractedMessageId() {
@Test
void messageIdIsProvided_usesGivenMessageId() {
// given
var point = new MessageCorrelationPoint("msg1", "=correlationKey", null);
var point = new MessageCorrelationPoint("msg1", "=123", null);
var definition = mock(InboundConnectorDefinitionImpl.class);
when(definition.correlationPoint()).thenReturn(point);
var dummyCommand = spy(new PublishMessageCommandDummy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.springframework.web.bind.annotation.RequestMethod.POST;
import static org.springframework.web.bind.annotation.RequestMethod.PUT;

import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.inbound.webhook.MappedHttpRequest;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorException;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorException.WebhookSecurityException;
Expand All @@ -31,6 +32,7 @@
import io.camunda.connector.api.inbound.webhook.WebhookResultContext;
import io.camunda.connector.api.inbound.webhook.WebhookTriggerResultContext;
import io.camunda.connector.feel.FeelEngineWrapperException;
import io.camunda.connector.runtime.core.error.BpmnError;
import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector;
import io.camunda.connector.runtime.inbound.webhook.model.HttpServletRequestWebhookProcessingPayload;
import jakarta.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -105,14 +107,20 @@ private ResponseEntity<?> processWebhook(
}
}
} catch (Exception e) {
LOG.info("Webhook failed with exception", e);
LOG.info("Webhook: {} failed with exception", connector.context().getDefinition(), e);
if (e instanceof FeelEngineWrapperException feelEngineWrapperException) {
var error =
new FeelExpressionErrorResponse(
feelEngineWrapperException.getReason(), feelEngineWrapperException.getExpression());
connectorResponse = ResponseEntity.unprocessableEntity().body(error);
} else if (e instanceof WebhookConnectorException webhookConnectorException) {
connectorResponse = handleWebhookConnectorException(webhookConnectorException);
} else if (e instanceof ConnectorException connectorException) {
if (e instanceof WebhookConnectorException webhookConnectorException) {
connectorResponse = handleWebhookConnectorException(webhookConnectorException);
} else {
connectorResponse =
ResponseEntity.unprocessableEntity()
.body(new BpmnError("WEBHOOK_NOT_PROCESSED", connectorException.getMessage()));
}
} else {
connectorResponse = ResponseEntity.internalServerError().build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
*/
package io.camunda.connector.inbound.signature;

import io.camunda.connector.api.error.ConnectorException;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
Expand Down Expand Up @@ -41,11 +41,21 @@ public HMACSignatureValidator(
this.hmacHeader = hmacHeader;
this.hmacSecretKey = hmacSecretKey;
this.hmacAlgo = hmacAlgo;
Objects.requireNonNull(requestBody, "Request body must not be null");
Objects.requireNonNull(headers, "Headers must not be null");
Objects.requireNonNull(hmacHeader, "HMAC header must not be null");
Objects.requireNonNull(hmacSecretKey, "HMAC secret key must not be null");
Objects.requireNonNull(hmacAlgo, "HMAC algorithm must not be null");
if (requestBody == null) {
throw new ConnectorException("Request body must not be null");
}
if (headers == null) {
throw new ConnectorException("Headers must not be null");
}
if (hmacHeader == null) {
throw new ConnectorException("HMAC header must not be null");
}
if (hmacSecretKey == null) {
throw new ConnectorException("HMAC secret key must not be null");
}
if (hmacAlgo == null) {
throw new ConnectorException("HMAC algorithm key must not be null");
}
}

public boolean isRequestValid()
Expand All @@ -54,7 +64,7 @@ public boolean isRequestValid()
caseInsensitiveHeaders.putAll(headers);

if (!caseInsensitiveHeaders.containsKey(hmacHeader)) {
throw new IOException("Expected HMAC header " + hmacHeader + ", but was not present");
throw new ConnectorException("Expected HMAC header " + hmacHeader + ", but was not present");
}
final String providedHmac = caseInsensitiveHeaders.get(hmacHeader);
LOG.debug("Given HMAC from webhook call: {}", providedHmac);
Expand Down

0 comments on commit 58ebdc9

Please sign in to comment.