Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[Schema] Return 401 error when no HTTP authentication is configured (#…
Browse files Browse the repository at this point in the history
…1802)

[Schema] Return 401 error when no HTTP authentication is configured

### Motivation

When authentication is enabled, if the Schema REST requests were sent
without HTTP authentication header, the Schema Registry will return 404,
rather than 401.

### Modifications

- When `SchemaStorageException` is thrown, build the response with the
error code and the exception message.
- Add `testSchemaNoAuth` to verify 401 unauthorized will be returned.
  • Loading branch information
BewareMyPower authored Apr 23, 2023
1 parent 4c22dfa commit c41ad06
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
<artifactId>maven-javadoc-plugin</artifactId>
<version>${maven-javadoc-plugin.version}</version>
<configuration>
<source>8</source>
<source>17</source>
<doclint>none</doclint>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -87,8 +88,16 @@ public CompletableFuture<FullHttpResponse> processRequest(FullHttpRequest reques
return buildJsonResponse(resp, RESPONSE_CONTENT_TYPE);
}
}).exceptionally(err -> {
log.error("Error while processing request", err);
return buildJsonErrorResponse(err);
Throwable throwable = err;
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
if (throwable instanceof SchemaStorageException e) {
return buildErrorResponse(e.getHttpStatusCode(), e.getMessage());
} else {
log.error("Error while processing request", err);
return buildJsonErrorResponse(err);
}
});
} catch (IOException err) {
log.error("Cannot decode request", err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import static org.testng.AssertJUnit.fail;

import com.google.common.collect.Sets;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.jsonwebtoken.SignatureAlgorithm;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -754,6 +756,20 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex
consumer.close();
}

@Test(timeOut = 30000)
public void testSchemaNoAuth() {
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, false);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RestClientException);
var restException = (RestClientException) e.getCause();
assertEquals(restException.getErrorCode(), HttpResponseStatus.UNAUTHORIZED.code());
assertTrue(restException.getMessage().contains("Missing AUTHORIZATION header"));
}
producer.close();
}

private IndexedRecord createAvroRecord() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
Expand All @@ -766,6 +782,10 @@ private IndexedRecord createAvroRecord() {
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix) {
return createAvroProducer(withTokenPrefix, true);
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, boolean withSchemaToken) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
Expand All @@ -782,11 +802,11 @@ private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefi
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");


props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");

props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
if (withSchemaToken) {
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
}

return new KafkaProducer<>(props);
}
Expand Down

0 comments on commit c41ad06

Please sign in to comment.