diff --git a/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java b/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java index 81d96326a4..51c9cfadd7 100644 --- a/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java +++ b/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponseStatus; +import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException; import java.io.DataInput; import java.io.IOException; import java.util.ArrayList; @@ -87,8 +88,16 @@ public CompletableFuture processRequest(FullHttpRequest reques return buildJsonResponse(resp, RESPONSE_CONTENT_TYPE); } }).exceptionally(err -> { - log.error("Error while processing request", err); - return buildJsonErrorResponse(err); + Throwable throwable = err; + while (throwable.getCause() != null) { + throwable = throwable.getCause(); + } + if (throwable instanceof SchemaStorageException e) { + return buildErrorResponse(e.getHttpStatusCode(), e.getMessage()); + } else { + log.error("Error while processing request", err); + return buildJsonErrorResponse(err); + } }); } catch (IOException err) { log.error("Cannot decode request", err); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java index bd0d8a6ee9..4d7004a396 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java @@ -21,10 +21,12 @@ import static org.testng.AssertJUnit.fail; import com.google.common.collect.Sets; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; import io.jsonwebtoken.SignatureAlgorithm; +import io.netty.handler.codec.http.HttpResponseStatus; import java.time.Duration; import java.util.Collections; import java.util.List; @@ -754,6 +756,20 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex consumer.close(); } + @Test(timeOut = 30000) + public void testSchemaNoAuth() { + final KafkaProducer producer = createAvroProducer(false, null); + try { + producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get(); + fail(); + } catch (Exception e) { + assertTrue(e.getCause() instanceof RestClientException); + var restException = (RestClientException) e.getCause(); + assertEquals(restException.getErrorCode(), HttpResponseStatus.UNAUTHORIZED.code()); + assertTrue(restException.getMessage().contains("Missing AUTHORIZATION header")); + } + producer.close(); + } private IndexedRecord createAvroRecord() { String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", " @@ -766,6 +782,10 @@ private IndexedRecord createAvroRecord() { } private KafkaProducer createAvroProducer(boolean withTokenPrefix) { + return createAvroProducer(withTokenPrefix, userToken); + } + + private KafkaProducer createAvroProducer(boolean withTokenPrefix, String schemaToken) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); @@ -782,11 +802,11 @@ private KafkaProducer createAvroProducer(boolean withTokenPrefi props.put("security.protocol", "SASL_PLAINTEXT"); props.put("sasl.mechanism", "PLAIN"); - - props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - - props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, - username + ":" + (withTokenPrefix ? password : userToken)); + if (schemaToken != null) { + props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, + username + ":" + (withTokenPrefix ? password : userToken)); + } return new KafkaProducer<>(props); }