diff --git a/pom.xml b/pom.xml
index 37ef84351c..97e68a56a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -325,7 +325,7 @@
maven-javadoc-plugin
${maven-javadoc-plugin.version}
- 8
+ 17
none
diff --git a/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java b/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java
index 81d96326a4..51c9cfadd7 100644
--- a/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java
+++ b/schema-registry/src/main/java/io/streamnative/pulsar/handlers/kop/schemaregistry/HttpJsonRequestProcessor.java
@@ -19,6 +19,7 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
+import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
@@ -87,8 +88,16 @@ public CompletableFuture processRequest(FullHttpRequest reques
return buildJsonResponse(resp, RESPONSE_CONTENT_TYPE);
}
}).exceptionally(err -> {
- log.error("Error while processing request", err);
- return buildJsonErrorResponse(err);
+ Throwable throwable = err;
+ while (throwable.getCause() != null) {
+ throwable = throwable.getCause();
+ }
+ if (throwable instanceof SchemaStorageException e) {
+ return buildErrorResponse(e.getHttpStatusCode(), e.getMessage());
+ } else {
+ log.error("Error while processing request", err);
+ return buildJsonErrorResponse(err);
+ }
});
} catch (IOException err) {
log.error("Cannot decode request", err);
diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java
index bd0d8a6ee9..98f2fc645a 100644
--- a/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java
+++ b/tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaAuthorizationTestBase.java
@@ -21,10 +21,12 @@
import static org.testng.AssertJUnit.fail;
import com.google.common.collect.Sets;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.jsonwebtoken.SignatureAlgorithm;
+import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
@@ -754,6 +756,20 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex
consumer.close();
}
+ @Test(timeOut = 30000)
+ public void testSchemaNoAuth() {
+ final KafkaProducer producer = createAvroProducer(false, false);
+ try {
+ producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e.getCause() instanceof RestClientException);
+ var restException = (RestClientException) e.getCause();
+ assertEquals(restException.getErrorCode(), HttpResponseStatus.UNAUTHORIZED.code());
+ assertTrue(restException.getMessage().contains("Missing AUTHORIZATION header"));
+ }
+ producer.close();
+ }
private IndexedRecord createAvroRecord() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
@@ -766,6 +782,10 @@ private IndexedRecord createAvroRecord() {
}
private KafkaProducer createAvroProducer(boolean withTokenPrefix) {
+ return createAvroProducer(withTokenPrefix, true);
+ }
+
+ private KafkaProducer createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -782,11 +802,11 @@ private KafkaProducer createAvroProducer(boolean withTokenPrefi
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
-
- props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
-
- props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
- username + ":" + (withTokenPrefix ? password : userToken));
+ if (withSchemaToken) {
+ props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
+ props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
+ username + ":" + (withTokenPrefix ? password : userToken));
+ }
return new KafkaProducer<>(props);
}