diff --git a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java index e2ccb6e79..2e5e9d87e 100644 --- a/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java +++ b/src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java @@ -33,6 +33,7 @@ import io.vertx.ext.web.handler.CorsHandler; import io.vertx.ext.web.openapi.RouterBuilder; +import io.vertx.ext.web.validation.BodyProcessorException; import io.vertx.ext.web.validation.ParameterProcessorException; import io.vertx.json.schema.ValidationException; import org.apache.kafka.clients.CommonClientConfigs; @@ -563,23 +564,24 @@ private void errorHandler(RoutingContext routingContext) { if (routingContext.statusCode() == HttpResponseStatus.BAD_REQUEST.code()) { message = HttpResponseStatus.BAD_REQUEST.reasonPhrase(); // in case of validation exception, building a meaningful error message - if (routingContext.failure() != null && routingContext.failure().getCause() instanceof ValidationException) { - ValidationException validationException = (ValidationException) routingContext.failure().getCause(); + if (routingContext.failure() != null) { StringBuilder sb = new StringBuilder(); - if (validationException.inputScope() != null) { - sb.append("Validation error on: ").append(validationException.inputScope()).append(" - "); - } - sb.append(validationException.getMessage()); - message = sb.toString(); - } - - if (routingContext.failure() != null && routingContext.failure() instanceof ParameterProcessorException) { - ParameterProcessorException validationException = (ParameterProcessorException) routingContext.failure(); - StringBuilder sb = new StringBuilder(); - if (validationException.getParameterName() != null) { - sb.append("Validation error on: ").append(validationException.getParameterName()).append(" - "); + if (routingContext.failure().getCause() instanceof ValidationException) { + ValidationException validationException = (ValidationException) routingContext.failure().getCause(); + if (validationException.inputScope() != null) { + sb.append("Validation error on: ").append(validationException.inputScope()).append(" - "); + } + sb.append(validationException.getMessage()); + } else if (routingContext.failure() instanceof ParameterProcessorException) { + ParameterProcessorException parameterException = (ParameterProcessorException) routingContext.failure(); + if (parameterException.getParameterName() != null) { + sb.append("Parameter error on: ").append(parameterException.getParameterName()).append(" - "); + } + sb.append(parameterException.getMessage()); + } else if (routingContext.failure() instanceof BodyProcessorException) { + BodyProcessorException bodyProcessorException = (BodyProcessorException) routingContext.failure(); + sb.append(bodyProcessorException.getMessage()); } - sb.append(validationException.getMessage()); message = sb.toString(); } } else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) { diff --git a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java index 6dd275308..8bb6b9d7c 100644 --- a/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java +++ b/src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java @@ -647,7 +647,7 @@ void sendToOneStringPartitionTest(VertxTestContext context) throws InterruptedEx producerService() .sendRecordsToPartitionRequest(topic, partition, root, BridgeContentType.KAFKA_JSON_JSON) - .sendJsonObject(root, verifyBadRequest(context, "Validation error on: partitionid - [Bad Request] Parsing error for parameter partitionid in location PATH: java.lang.NumberFormatException: For input string: \"" + partition + "\"")); + .sendJsonObject(root, verifyBadRequest(context, "Parameter error on: partitionid - [Bad Request] Parsing error for parameter partitionid in location PATH: java.lang.NumberFormatException: For input string: \"" + partition + "\"")); } @Test @@ -939,4 +939,37 @@ void sendAsyncMessages(VertxTestContext context) throws InterruptedException, Ex assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); } + + @Test + void sendSimpleMessageWithWrongContentType(VertxTestContext context) throws InterruptedException, ExecutionException { + KafkaFuture future = adminClientFacade.createTopic(topic); + + String value = "message-value"; + + JsonArray records = new JsonArray(); + JsonObject json = new JsonObject(); + json.put("value", value); + records.add(json); + + JsonObject root = new JsonObject(); + root.put("records", records); + + future.get(); + + producerService() + .sendRecordsRequest(topic, root, "bad-content-type") + .sendJsonObject(root, ar -> { + context.verify(() -> { + assertThat(ar.succeeded(), is(true)); + HttpResponse response = ar.result(); + HttpBridgeError error = HttpBridgeError.fromJson(response.body()); + assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code())); + assertThat(error.getMessage(), containsString("Cannot find body processor for content type")); + }); + context.completeNow(); + }); + + assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true)); + } }