Skip to content

Commit

Permalink
Added the catch of wrong content-type returning meaningful message (#714
Browse files Browse the repository at this point in the history
)

* Added the catch of wrong content-type returning meaningful message
Added test for the above use case

Signed-off-by: Paolo Patierno <ppatierno@live.com>

* Fixed tests

Signed-off-by: Paolo Patierno <ppatierno@live.com>

Signed-off-by: Paolo Patierno <ppatierno@live.com>
  • Loading branch information
ppatierno authored Nov 24, 2022
1 parent 711074c commit c3b30d9
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 16 deletions.
32 changes: 17 additions & 15 deletions src/main/java/io/strimzi/kafka/bridge/http/HttpBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import io.vertx.ext.web.handler.CorsHandler;
import io.vertx.ext.web.openapi.RouterBuilder;
import io.vertx.ext.web.validation.BodyProcessorException;
import io.vertx.ext.web.validation.ParameterProcessorException;
import io.vertx.json.schema.ValidationException;
import org.apache.kafka.clients.CommonClientConfigs;
Expand Down Expand Up @@ -563,23 +564,24 @@ private void errorHandler(RoutingContext routingContext) {
if (routingContext.statusCode() == HttpResponseStatus.BAD_REQUEST.code()) {
message = HttpResponseStatus.BAD_REQUEST.reasonPhrase();
// in case of validation exception, building a meaningful error message
if (routingContext.failure() != null && routingContext.failure().getCause() instanceof ValidationException) {
ValidationException validationException = (ValidationException) routingContext.failure().getCause();
if (routingContext.failure() != null) {
StringBuilder sb = new StringBuilder();
if (validationException.inputScope() != null) {
sb.append("Validation error on: ").append(validationException.inputScope()).append(" - ");
}
sb.append(validationException.getMessage());
message = sb.toString();
}

if (routingContext.failure() != null && routingContext.failure() instanceof ParameterProcessorException) {
ParameterProcessorException validationException = (ParameterProcessorException) routingContext.failure();
StringBuilder sb = new StringBuilder();
if (validationException.getParameterName() != null) {
sb.append("Validation error on: ").append(validationException.getParameterName()).append(" - ");
if (routingContext.failure().getCause() instanceof ValidationException) {
ValidationException validationException = (ValidationException) routingContext.failure().getCause();
if (validationException.inputScope() != null) {
sb.append("Validation error on: ").append(validationException.inputScope()).append(" - ");
}
sb.append(validationException.getMessage());
} else if (routingContext.failure() instanceof ParameterProcessorException) {
ParameterProcessorException parameterException = (ParameterProcessorException) routingContext.failure();
if (parameterException.getParameterName() != null) {
sb.append("Parameter error on: ").append(parameterException.getParameterName()).append(" - ");
}
sb.append(parameterException.getMessage());
} else if (routingContext.failure() instanceof BodyProcessorException) {
BodyProcessorException bodyProcessorException = (BodyProcessorException) routingContext.failure();
sb.append(bodyProcessorException.getMessage());
}
sb.append(validationException.getMessage());
message = sb.toString();
}
} else if (routingContext.statusCode() == HttpResponseStatus.NOT_FOUND.code()) {
Expand Down
35 changes: 34 additions & 1 deletion src/test/java/io/strimzi/kafka/bridge/http/ProducerIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void sendToOneStringPartitionTest(VertxTestContext context) throws InterruptedEx

producerService()
.sendRecordsToPartitionRequest(topic, partition, root, BridgeContentType.KAFKA_JSON_JSON)
.sendJsonObject(root, verifyBadRequest(context, "Validation error on: partitionid - [Bad Request] Parsing error for parameter partitionid in location PATH: java.lang.NumberFormatException: For input string: \"" + partition + "\""));
.sendJsonObject(root, verifyBadRequest(context, "Parameter error on: partitionid - [Bad Request] Parsing error for parameter partitionid in location PATH: java.lang.NumberFormatException: For input string: \"" + partition + "\""));
}

@Test
Expand Down Expand Up @@ -939,4 +939,37 @@ void sendAsyncMessages(VertxTestContext context) throws InterruptedException, Ex

assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true));
}

@Test
void sendSimpleMessageWithWrongContentType(VertxTestContext context) throws InterruptedException, ExecutionException {
KafkaFuture<Void> future = adminClientFacade.createTopic(topic);

String value = "message-value";

JsonArray records = new JsonArray();
JsonObject json = new JsonObject();
json.put("value", value);
records.add(json);

JsonObject root = new JsonObject();
root.put("records", records);

future.get();

producerService()
.sendRecordsRequest(topic, root, "bad-content-type")
.sendJsonObject(root, ar -> {
context.verify(() -> {
assertThat(ar.succeeded(), is(true));
HttpResponse<JsonObject> response = ar.result();
HttpBridgeError error = HttpBridgeError.fromJson(response.body());
assertThat(response.statusCode(), is(HttpResponseStatus.BAD_REQUEST.code()));
assertThat(error.getCode(), is(HttpResponseStatus.BAD_REQUEST.code()));
assertThat(error.getMessage(), containsString("Cannot find body processor for content type"));
});
context.completeNow();
});

assertThat(context.awaitCompletion(TEST_TIMEOUT, TimeUnit.SECONDS), is(true));
}
}

0 comments on commit c3b30d9

Please sign in to comment.