Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for SchemaRegistryConfiguration(truststore password and location) and sasl jaas config in KafkaConfiguration #465

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ settings.xml
kafka.properties*
kafka.truststore.jks*
kafka.keystore.jks*

**/**prod.yml
93 changes: 51 additions & 42 deletions src/main/java/kafdrop/config/KafkaConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,59 @@
@ConfigurationProperties(prefix = "kafka")
@Data
public final class KafkaConfiguration {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConfiguration.class);

private String brokerConnect;
private boolean isSecured = false;
private String saslMechanism;
private String securityProtocol;
private String truststoreFile;
private String propertiesFile;
private String keystoreFile;

public void applyCommon(Properties properties) {
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerConnect);
if (isSecured) {
LOG.warn("The 'isSecured' property is deprecated; consult README.md on the preferred way to configure security");
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
}
private static final Logger LOG = LoggerFactory.getLogger(KafkaConfiguration.class);

if(isSecured || securityProtocol.equals("SSL")) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
}
private String brokerConnect;
private boolean isSecured = false;
private String saslMechanism;
private String securityProtocol;
private String truststoreFile;
private String propertiesFile;
private String keystoreFile;

LOG.info("Checking truststore file {}", truststoreFile);
if (new File(truststoreFile).isFile()) {
LOG.info("Assigning truststore location to {}", truststoreFile);
properties.put("ssl.truststore.location", truststoreFile);
}
private String saslJaasConfig;

LOG.info("Checking keystore file {}", keystoreFile);
if (new File(keystoreFile).isFile()) {
LOG.info("Assigning keystore location to {}", keystoreFile);
properties.put("ssl.keystore.location", keystoreFile);
}
public void applyCommon(Properties properties) {
properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerConnect);
if (isSecured) {
LOG.warn("The 'isSecured' property is deprecated; consult README.md on the preferred way to configure security");
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
} else if (!saslMechanism.isEmpty()) {
properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism);
}

if (isSecured || securityProtocol.equals("SSL") || securityProtocol.equals("SASL_SSL")) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
}

LOG.info("Checking truststore file {}", truststoreFile);
if (new File(truststoreFile).isFile()) {
LOG.info("Assigning truststore location to {}", truststoreFile);
properties.put("ssl.truststore.location", truststoreFile);
}

LOG.info("Checking keystore file {}", keystoreFile);
if (new File(keystoreFile).isFile()) {
LOG.info("Assigning keystore location to {}", keystoreFile);
properties.put("ssl.keystore.location", keystoreFile);
}
if (saslJaasConfig != null) {
LOG.info("adding sasl.jaas.config. Value is hidden");
properties.put("sasl.jaas.config", saslJaasConfig);

}

LOG.info("Checking properties file {}", propertiesFile);
final var propertiesFile = new File(this.propertiesFile);
if (propertiesFile.isFile()) {
LOG.info("Loading properties from {}", this.propertiesFile);
final var propertyOverrides = new Properties();
try (var propsReader = new BufferedReader(new FileReader(propertiesFile))) {
propertyOverrides.load(propsReader);
} catch (IOException e) {
throw new KafkaConfigurationException(e);
}
properties.putAll(propertyOverrides);
LOG.info("Checking properties file {}", propertiesFile);
final var propertiesFile = new File(this.propertiesFile);
if (propertiesFile.isFile()) {
LOG.info("Loading properties from {}", this.propertiesFile);
final var propertyOverrides = new Properties();
try (var propsReader = new BufferedReader(new FileReader(propertiesFile))) {
propertyOverrides.load(propsReader);
} catch (IOException e) {
throw new KafkaConfigurationException(e);
}
properties.putAll(propertyOverrides);
}
}
}
}
}
19 changes: 19 additions & 0 deletions src/main/java/kafdrop/config/SchemaRegistryConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ public static final class SchemaRegistryProperties {
private String connect;
private String auth;

private String truststoreLocation;
private String truststorePassword;

public String getConnect() {
return connect;
}
Expand All @@ -31,6 +34,22 @@ public void setConnect(String connect) {

public void setAuth(String auth) { this.auth = auth; }

public String getTruststoreLocation() {
return truststoreLocation;
}

public String getTruststorePassword() {
return truststorePassword;
}

public void setTruststoreLocation(String truststoreLocation) {
this.truststoreLocation = truststoreLocation;
}

public void setTruststorePassword(String truststorePassword) {
this.truststorePassword = truststorePassword;
}

public List<String> getConnectList() {
return CONNECT_SEPARATOR.splitAsStream(this.connect)
.map(String::trim)
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/kafdrop/controller/MessageController.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;

import javax.validation.Valid;
import javax.validation.constraints.Max;
Expand Down Expand Up @@ -57,6 +58,7 @@

@Controller
public final class MessageController {
public static final String DESC = ".desc";
private final KafkaMonitor kafkaMonitor;

private final MessageInspector messageInspector;
Expand Down Expand Up @@ -151,7 +153,7 @@ public String viewMessageForm(@PathVariable("name") String topicName,
model.addAttribute("topic", topic);
// pre-select a descriptor file for a specific topic if available
model.addAttribute("defaultDescFile", protobufProperties.getDescFilesList().stream()
.filter(descFile -> descFile.replace(".desc", "").equals(topicName)).findFirst().orElse(""));
.filter(descFile -> descFile.replace(DESC, "").equals(topicName)).findFirst().orElse(""));

model.addAttribute("defaultFormat", defaultFormat);
model.addAttribute("messageFormats", MessageFormat.values());
Expand Down Expand Up @@ -261,15 +263,18 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form
if (format == MessageFormat.AVRO) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
final var schemaRegistryAuth = schemaRegistryProperties.getAuth();
final var truststoreLocation = Optional.ofNullable(schemaRegistryProperties.getTruststoreLocation());
final var truststorePassword = Optional.ofNullable(schemaRegistryProperties.getTruststorePassword());

deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth);
deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth,
truststoreLocation,truststorePassword);
} else if (format == MessageFormat.PROTOBUF && null != descFile) {
// filter the input file name

final var descFileName = descFile.replace(".desc", "")
final var descFileName = descFile.replace(DESC, "")
.replace(".", "")
.replace("/", "");
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc";
final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + DESC;
deserializer = new ProtobufMessageDeserializer(fullDescFile, msgTypeName, isAnyProto);
} else if (format == MessageFormat.PROTOBUF) {
final var schemaRegistryUrl = schemaRegistryProperties.getConnect();
Expand Down Expand Up @@ -310,7 +315,7 @@ public static class PartitionOffsetInfo {
*/
@NotNull
@Min(1)
@Max(100)
@Max(1000)
@JsonProperty("lastOffset")
private Long count;

Expand Down
69 changes: 44 additions & 25 deletions src/main/java/kafdrop/util/AvroMessageDeserializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,49 @@


public final class AvroMessageDeserializer implements MessageDeserializer {
private final String topicName;
private final KafkaAvroDeserializer deserializer;

public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth);
}

@Override
public String deserializeMessage(ByteBuffer buffer) {
// Convert byte buffer to byte array
final var bytes = ByteUtils.convertToByteArray(buffer);
return deserializer.deserialize(topicName, bytes).toString();
}

private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
private final String topicName;
private final KafkaAvroDeserializer deserializer;

public AvroMessageDeserializer(String topicName, String schemaRegistryUrl,
String schemaRegistryAuth) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth,
Optional.empty(),
Optional.empty());
}

public AvroMessageDeserializer(String topicName, String schemaRegistryUrl,
String schemaRegistryAuth,
Optional<String> truststoreLocation,
Optional<String> truststorePassword) {
this.topicName = topicName;
this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth,
truststoreLocation, truststorePassword);
}

@Override
public String deserializeMessage(ByteBuffer buffer) {
// Convert byte buffer to byte array
final var bytes = ByteUtils.convertToByteArray(buffer);
return deserializer.deserialize(topicName, bytes).toString();
}

private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth,
Optional<String> truststoreLocation,
Optional<String> truststorePassword) {
final var config = new HashMap<String, Object>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
if (schemaRegistryAuth != null) {
config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth);
config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false);
truststorePassword.ifPresent(passwd -> config.put("schema.registry.ssl.truststore.password", passwd));
truststoreLocation.ifPresent(location -> config.put("schema.registry.ssl.truststore.location", location));


}
final var kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(config, false);
return kafkaAvroDeserializer;
}
final var kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(config, false);
return kafkaAvroDeserializer;
}
}