Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Commit

Permalink
[Schema] Return 401 error when no HTTP authentication is configured
Browse files Browse the repository at this point in the history
### Motivation

When authentication is enabled, if the Schema REST requests were sent
without HTTP authentication header, the Schema Registry will return 404,
rather than 401.

### Modifications

- When `SchemaStorageException` is thrown, build the response with the
  error code and the exception message.
- Add `testSchemaNoAuth` to verify 401 unauthorized will be returned.
  • Loading branch information
BewareMyPower committed Apr 23, 2023
1 parent f0eda8e commit caddc0e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.streamnative.pulsar.handlers.kop.schemaregistry.model.impl.SchemaStorageException;
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -87,8 +88,16 @@ public CompletableFuture<FullHttpResponse> processRequest(FullHttpRequest reques
return buildJsonResponse(resp, RESPONSE_CONTENT_TYPE);
}
}).exceptionally(err -> {
log.error("Error while processing request", err);
return buildJsonErrorResponse(err);
Throwable throwable = err;
while (throwable.getCause() != null) {
throwable = throwable.getCause();
}
if (throwable instanceof SchemaStorageException e) {
return buildErrorResponse(e.getHttpStatusCode(), e.getMessage());
} else {
log.error("Error while processing request", err);
return buildJsonErrorResponse(err);
}
});
} catch (IOException err) {
log.error("Cannot decode request", err);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import static org.testng.AssertJUnit.fail;

import com.google.common.collect.Sets;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
import io.jsonwebtoken.SignatureAlgorithm;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -754,6 +756,20 @@ public void testAvroProduceAndConsumeWithAuth(boolean withTokenPrefix) throws Ex
consumer.close();
}

@Test(timeOut = 30000)
public void testSchemaNoAuth() {
final KafkaProducer<Integer, Object> producer = createAvroProducer(false, null);
try {
producer.send(new ProducerRecord<>("test-avro-wrong-auth", createAvroRecord())).get();
fail();
} catch (Exception e) {
assertTrue(e.getCause() instanceof RestClientException);
var restException = (RestClientException) e.getCause();
assertEquals(restException.getErrorCode(), HttpResponseStatus.UNAUTHORIZED.code());
assertTrue(restException.getMessage().contains("Missing AUTHORIZATION header"));
}
producer.close();
}

private IndexedRecord createAvroRecord() {
String userSchema = "{\"namespace\": \"example.avro\", \"type\": \"record\", "
Expand All @@ -766,6 +782,10 @@ private IndexedRecord createAvroRecord() {
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix) {
return createAvroProducer(withTokenPrefix, userToken);
}

private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefix, String schemaToken) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + getClientPort());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
Expand All @@ -782,11 +802,11 @@ private KafkaProducer<Integer, Object> createAvroProducer(boolean withTokenPrefi
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");


props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");

props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
if (schemaToken != null) {
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG,
username + ":" + (withTokenPrefix ? password : userToken));
}

return new KafkaProducer<>(props);
}
Expand Down

0 comments on commit caddc0e

Please sign in to comment.