diff --git a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandler.java b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandler.java index d9b776e78d..2455da5d52 100644 --- a/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandler.java +++ b/connector-runtime/connector-runtime-core/src/main/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandler.java @@ -33,6 +33,7 @@ import io.camunda.zeebe.client.api.command.ClientStatusException; import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; import io.camunda.zeebe.client.api.response.PublishMessageResponse; +import java.util.Optional; import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -153,7 +154,7 @@ protected CorrelationResult triggerMessageStartEvent( Object extractedVariables = extractVariables(variables, definition); try { - String correlationKey = + var correlationKey = extractCorrelationKey(correlationPoint.correlationKeyExpression(), variables); PublishMessageResponse result = zeebeClient @@ -161,7 +162,7 @@ protected CorrelationResult triggerMessageStartEvent( .messageName(correlationPoint.messageName()) // correlation key must be empty to start a new process, see: // https://docs.camunda.io/docs/components/modeler/bpmn/message-events/#message-start-events - .correlationKey(correlationKey) + .correlationKey(correlationKey.orElse("")) .messageId(messageId) .tenantId(definition.tenantId()) .variables(extractedVariables) @@ -198,18 +199,20 @@ protected CorrelationResult triggerMessage( String correlationKeyExpression, Object variables, String messageId) { - - String correlationKey = extractCorrelationKey(correlationKeyExpression, variables); - if (!isActivationConditionMet(definition, variables)) { LOG.debug("Activation condition didn't match: {}", definition.correlationPoint()); return new MessageCorrelationResult( messageName, new CorrelationErrorData(CorrelationErrorReason.ACTIVATION_CONDITION_NOT_MET)); } + String correlationKey = + extractCorrelationKey(correlationKeyExpression, variables) + .orElseThrow( + () -> + new ConnectorException( + "Correlation key not resolved: " + correlationKeyExpression)); Object extractedVariables = extractVariables(variables, definition); - try { PublishMessageResponse response = zeebeClient @@ -224,7 +227,6 @@ protected CorrelationResult triggerMessage( LOG.info("Published message with key: " + response.getMessageKey()); return new MessageCorrelationResult(messageName, response.getMessageKey()); - } catch (Exception e) { throw new ConnectorException( "Failed to publish process message for subscription: " + definition.correlationPoint(), @@ -248,15 +250,21 @@ protected boolean isActivationConditionMet( } } - protected String extractCorrelationKey(String correlationKeyExpression, Object context) { - if (correlationKeyExpression == null || correlationKeyExpression.isBlank()) { - return ""; - } - try { - return feelEngine.evaluate(correlationKeyExpression, context, String.class); - } catch (Exception e) { - throw new ConnectorInputException(e); + protected Optional extractCorrelationKey( + String correlationKeyExpression, Object context) { + Optional correlationKey; + if (correlationKeyExpression != null && !correlationKeyExpression.isBlank()) { + try { + correlationKey = + Optional.ofNullable( + feelEngine.evaluate(correlationKeyExpression, context, String.class)); + } catch (Exception e) { + correlationKey = Optional.empty(); + } + } else { + correlationKey = Optional.empty(); } + return correlationKey; } protected String extractMessageId(String messageIdExpression, Object context) { diff --git a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandlerTest.java b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandlerTest.java index 761d9c4759..a380254a53 100644 --- a/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandlerTest.java +++ b/connector-runtime/connector-runtime-core/src/test/java/io/camunda/connector/runtime/core/inbound/correlation/InboundCorrelationHandlerTest.java @@ -161,7 +161,10 @@ void boundaryMessageEvent_shouldCallCorrectZeebeMethod() { // given var point = new BoundaryEventCorrelationPoint( - "test-boundary", "", "123", new BoundaryEventCorrelationPoint.Activity("123", "test")); + "test-boundary", + "=\"test\"", + "123", + new BoundaryEventCorrelationPoint.Activity("123", "test")); var definition = mock(InboundConnectorDefinitionImpl.class); when(definition.correlationPoint()).thenReturn(point); @@ -176,7 +179,7 @@ void boundaryMessageEvent_shouldCallCorrectZeebeMethod() { verifyNoMoreInteractions(zeebeClient); verify(dummyCommand).messageName("test-boundary"); - verify(dummyCommand).correlationKey(""); + verify(dummyCommand).correlationKey("test"); verify(dummyCommand).messageId("123"); verify(dummyCommand).send(); } @@ -521,7 +524,7 @@ void messageIdIsNull_expressionIsNull_usesRandomUuid() { var dummyCommand = spy(new PublishMessageCommandDummy()); when(zeebeClient.newPublishMessageCommand()).thenReturn(dummyCommand); // when - handler.correlate(definition, Collections.emptyMap()); + handler.correlate(definition, Collections.singletonMap("correlationKey", "testkey")); // then ArgumentCaptor messageIdCaptor = ArgumentCaptor.forClass(String.class); verify(dummyCommand).messageId(messageIdCaptor.capture()); @@ -534,7 +537,7 @@ void messageIdIsNull_expressionIsNull_usesRandomUuid() { @Test void messageIdIsNull_expressionIsProvided_usesExtractedMessageId() { // given - var point = new MessageCorrelationPoint("msg1", "=correlationKey", "=extractedId"); + var point = new MessageCorrelationPoint("msg1", "=extractedId", "=extractedId"); var definition = mock(InboundConnectorDefinitionImpl.class); when(definition.correlationPoint()).thenReturn(point); var dummyCommand = spy(new PublishMessageCommandDummy()); @@ -549,7 +552,7 @@ void messageIdIsNull_expressionIsProvided_usesExtractedMessageId() { @Test void messageIdIsProvided_usesGivenMessageId() { // given - var point = new MessageCorrelationPoint("msg1", "=correlationKey", null); + var point = new MessageCorrelationPoint("msg1", "=123", null); var definition = mock(InboundConnectorDefinitionImpl.class); when(definition.correlationPoint()).thenReturn(point); var dummyCommand = spy(new PublishMessageCommandDummy()); diff --git a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java index a6a1657b89..fe24496b91 100644 --- a/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java +++ b/connector-runtime/connector-runtime-spring/src/main/java/io/camunda/connector/runtime/inbound/webhook/InboundWebhookRestController.java @@ -22,6 +22,7 @@ import static org.springframework.web.bind.annotation.RequestMethod.POST; import static org.springframework.web.bind.annotation.RequestMethod.PUT; +import io.camunda.connector.api.error.ConnectorException; import io.camunda.connector.api.inbound.webhook.MappedHttpRequest; import io.camunda.connector.api.inbound.webhook.WebhookConnectorException; import io.camunda.connector.api.inbound.webhook.WebhookConnectorException.WebhookSecurityException; @@ -31,6 +32,7 @@ import io.camunda.connector.api.inbound.webhook.WebhookResultContext; import io.camunda.connector.api.inbound.webhook.WebhookTriggerResultContext; import io.camunda.connector.feel.FeelEngineWrapperException; +import io.camunda.connector.runtime.core.error.BpmnError; import io.camunda.connector.runtime.inbound.lifecycle.ActiveInboundConnector; import io.camunda.connector.runtime.inbound.webhook.model.HttpServletRequestWebhookProcessingPayload; import jakarta.servlet.http.HttpServletRequest; @@ -105,14 +107,20 @@ private ResponseEntity processWebhook( } } } catch (Exception e) { - LOG.info("Webhook failed with exception", e); + LOG.info("Webhook: {} failed with exception", connector.context().getDefinition(), e); if (e instanceof FeelEngineWrapperException feelEngineWrapperException) { var error = new FeelExpressionErrorResponse( feelEngineWrapperException.getReason(), feelEngineWrapperException.getExpression()); connectorResponse = ResponseEntity.unprocessableEntity().body(error); - } else if (e instanceof WebhookConnectorException webhookConnectorException) { - connectorResponse = handleWebhookConnectorException(webhookConnectorException); + } else if (e instanceof ConnectorException connectorException) { + if (e instanceof WebhookConnectorException webhookConnectorException) { + connectorResponse = handleWebhookConnectorException(webhookConnectorException); + } else { + connectorResponse = + ResponseEntity.unprocessableEntity() + .body(new BpmnError("WEBHOOK_NOT_PROCESSED", connectorException.getMessage())); + } } else { connectorResponse = ResponseEntity.internalServerError().build(); } diff --git a/connectors/webhook/src/main/java/io/camunda/connector/inbound/signature/HMACSignatureValidator.java b/connectors/webhook/src/main/java/io/camunda/connector/inbound/signature/HMACSignatureValidator.java index 3d392240a3..5f94680874 100644 --- a/connectors/webhook/src/main/java/io/camunda/connector/inbound/signature/HMACSignatureValidator.java +++ b/connectors/webhook/src/main/java/io/camunda/connector/inbound/signature/HMACSignatureValidator.java @@ -6,12 +6,12 @@ */ package io.camunda.connector.inbound.signature; +import io.camunda.connector.api.error.ConnectorException; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; import java.util.Map; -import java.util.Objects; import java.util.TreeMap; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; @@ -41,11 +41,21 @@ public HMACSignatureValidator( this.hmacHeader = hmacHeader; this.hmacSecretKey = hmacSecretKey; this.hmacAlgo = hmacAlgo; - Objects.requireNonNull(requestBody, "Request body must not be null"); - Objects.requireNonNull(headers, "Headers must not be null"); - Objects.requireNonNull(hmacHeader, "HMAC header must not be null"); - Objects.requireNonNull(hmacSecretKey, "HMAC secret key must not be null"); - Objects.requireNonNull(hmacAlgo, "HMAC algorithm must not be null"); + if (requestBody == null) { + throw new ConnectorException("Request body must not be null"); + } + if (headers == null) { + throw new ConnectorException("Headers must not be null"); + } + if (hmacHeader == null) { + throw new ConnectorException("HMAC header must not be null"); + } + if (hmacSecretKey == null) { + throw new ConnectorException("HMAC secret key must not be null"); + } + if (hmacAlgo == null) { + throw new ConnectorException("HMAC algorithm key must not be null"); + } } public boolean isRequestValid() @@ -54,7 +64,7 @@ public boolean isRequestValid() caseInsensitiveHeaders.putAll(headers); if (!caseInsensitiveHeaders.containsKey(hmacHeader)) { - throw new IOException("Expected HMAC header " + hmacHeader + ", but was not present"); + throw new ConnectorException("Expected HMAC header " + hmacHeader + ", but was not present"); } final String providedHmac = caseInsensitiveHeaders.get(hmacHeader); LOG.debug("Given HMAC from webhook call: {}", providedHmac);