From 836a06203c3db2caf2d9492c553e5864dc4fd1c7 Mon Sep 17 00:00:00 2001 From: GoncaloPT Date: Fri, 20 Jan 2023 17:48:43 +0000 Subject: [PATCH] Adding support for SchemaRegistryConfiguration new parameters: - truststoreLocation - truststorePassword Adding support for KafkaConfiguration: - saslJaasConfig Increasing MessageController maxOffset size since 1000 seems to be easily supported. --- .gitignore | 2 + .../kafdrop/config/KafkaConfiguration.java | 93 ++++++++++--------- .../config/SchemaRegistryConfiguration.java | 19 ++++ .../kafdrop/controller/MessageController.java | 15 ++- .../kafdrop/util/AvroMessageDeserializer.java | 69 +++++++++----- 5 files changed, 126 insertions(+), 72 deletions(-) diff --git a/.gitignore b/.gitignore index b8bb7eb3..6dc666a2 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,5 @@ settings.xml kafka.properties* kafka.truststore.jks* kafka.keystore.jks* + +**/**prod.yml diff --git a/src/main/java/kafdrop/config/KafkaConfiguration.java b/src/main/java/kafdrop/config/KafkaConfiguration.java index cd34f497..09bf3159 100644 --- a/src/main/java/kafdrop/config/KafkaConfiguration.java +++ b/src/main/java/kafdrop/config/KafkaConfiguration.java @@ -14,50 +14,59 @@ @ConfigurationProperties(prefix = "kafka") @Data public final class KafkaConfiguration { - private static final Logger LOG = LoggerFactory.getLogger(KafkaConfiguration.class); - - private String brokerConnect; - private boolean isSecured = false; - private String saslMechanism; - private String securityProtocol; - private String truststoreFile; - private String propertiesFile; - private String keystoreFile; - - public void applyCommon(Properties properties) { - properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerConnect); - if (isSecured) { - LOG.warn("The 'isSecured' property is deprecated; consult README.md on the preferred way to configure security"); - properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism); - } + private static final Logger LOG = LoggerFactory.getLogger(KafkaConfiguration.class); - if(isSecured || securityProtocol.equals("SSL")) { - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); - } + private String brokerConnect; + private boolean isSecured = false; + private String saslMechanism; + private String securityProtocol; + private String truststoreFile; + private String propertiesFile; + private String keystoreFile; - LOG.info("Checking truststore file {}", truststoreFile); - if (new File(truststoreFile).isFile()) { - LOG.info("Assigning truststore location to {}", truststoreFile); - properties.put("ssl.truststore.location", truststoreFile); - } + private String saslJaasConfig; - LOG.info("Checking keystore file {}", keystoreFile); - if (new File(keystoreFile).isFile()) { - LOG.info("Assigning keystore location to {}", keystoreFile); - properties.put("ssl.keystore.location", keystoreFile); - } + public void applyCommon(Properties properties) { + properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerConnect); + if (isSecured) { + LOG.warn("The 'isSecured' property is deprecated; consult README.md on the preferred way to configure security"); + properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + } else if (!saslMechanism.isEmpty()) { + properties.put(SaslConfigs.SASL_MECHANISM, saslMechanism); + } + + if (isSecured || securityProtocol.equals("SSL") || securityProtocol.equals("SASL_SSL")) { + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol); + } + + LOG.info("Checking truststore file {}", truststoreFile); + if (new File(truststoreFile).isFile()) { + LOG.info("Assigning truststore location to {}", truststoreFile); + properties.put("ssl.truststore.location", truststoreFile); + } + + LOG.info("Checking keystore file {}", keystoreFile); + if (new File(keystoreFile).isFile()) { + LOG.info("Assigning keystore location to {}", keystoreFile); + properties.put("ssl.keystore.location", keystoreFile); + } + if (saslJaasConfig != null) { + LOG.info("adding sasl.jaas.config. Value is hidden"); + properties.put("sasl.jaas.config", saslJaasConfig); + + } - LOG.info("Checking properties file {}", propertiesFile); - final var propertiesFile = new File(this.propertiesFile); - if (propertiesFile.isFile()) { - LOG.info("Loading properties from {}", this.propertiesFile); - final var propertyOverrides = new Properties(); - try (var propsReader = new BufferedReader(new FileReader(propertiesFile))) { - propertyOverrides.load(propsReader); - } catch (IOException e) { - throw new KafkaConfigurationException(e); - } - properties.putAll(propertyOverrides); + LOG.info("Checking properties file {}", propertiesFile); + final var propertiesFile = new File(this.propertiesFile); + if (propertiesFile.isFile()) { + LOG.info("Loading properties from {}", this.propertiesFile); + final var propertyOverrides = new Properties(); + try (var propsReader = new BufferedReader(new FileReader(propertiesFile))) { + propertyOverrides.load(propsReader); + } catch (IOException e) { + throw new KafkaConfigurationException(e); + } + properties.putAll(propertyOverrides); + } } - } -} \ No newline at end of file +} diff --git a/src/main/java/kafdrop/config/SchemaRegistryConfiguration.java b/src/main/java/kafdrop/config/SchemaRegistryConfiguration.java index f45ce0ec..45bae3cb 100644 --- a/src/main/java/kafdrop/config/SchemaRegistryConfiguration.java +++ b/src/main/java/kafdrop/config/SchemaRegistryConfiguration.java @@ -19,6 +19,9 @@ public static final class SchemaRegistryProperties { private String connect; private String auth; + private String truststoreLocation; + private String truststorePassword; + public String getConnect() { return connect; } @@ -31,6 +34,22 @@ public void setConnect(String connect) { public void setAuth(String auth) { this.auth = auth; } + public String getTruststoreLocation() { + return truststoreLocation; + } + + public String getTruststorePassword() { + return truststorePassword; + } + + public void setTruststoreLocation(String truststoreLocation) { + this.truststoreLocation = truststoreLocation; + } + + public void setTruststorePassword(String truststorePassword) { + this.truststorePassword = truststorePassword; + } + public List getConnectList() { return CONNECT_SEPARATOR.splitAsStream(this.connect) .map(String::trim) diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index b25a7b12..65be7065 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Optional; import javax.validation.Valid; import javax.validation.constraints.Max; @@ -57,6 +58,7 @@ @Controller public final class MessageController { + public static final String DESC = ".desc"; private final KafkaMonitor kafkaMonitor; private final MessageInspector messageInspector; @@ -151,7 +153,7 @@ public String viewMessageForm(@PathVariable("name") String topicName, model.addAttribute("topic", topic); // pre-select a descriptor file for a specific topic if available model.addAttribute("defaultDescFile", protobufProperties.getDescFilesList().stream() - .filter(descFile -> descFile.replace(".desc", "").equals(topicName)).findFirst().orElse("")); + .filter(descFile -> descFile.replace(DESC, "").equals(topicName)).findFirst().orElse("")); model.addAttribute("defaultFormat", defaultFormat); model.addAttribute("messageFormats", MessageFormat.values()); @@ -261,15 +263,18 @@ private MessageDeserializer getDeserializer(String topicName, MessageFormat form if (format == MessageFormat.AVRO) { final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); final var schemaRegistryAuth = schemaRegistryProperties.getAuth(); + final var truststoreLocation = Optional.ofNullable(schemaRegistryProperties.getTruststoreLocation()); + final var truststorePassword = Optional.ofNullable(schemaRegistryProperties.getTruststorePassword()); - deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth); + deserializer = new AvroMessageDeserializer(topicName, schemaRegistryUrl, schemaRegistryAuth, + truststoreLocation,truststorePassword); } else if (format == MessageFormat.PROTOBUF && null != descFile) { // filter the input file name - final var descFileName = descFile.replace(".desc", "") + final var descFileName = descFile.replace(DESC, "") .replace(".", "") .replace("/", ""); - final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + ".desc"; + final var fullDescFile = protobufProperties.getDirectory() + File.separator + descFileName + DESC; deserializer = new ProtobufMessageDeserializer(fullDescFile, msgTypeName, isAnyProto); } else if (format == MessageFormat.PROTOBUF) { final var schemaRegistryUrl = schemaRegistryProperties.getConnect(); @@ -310,7 +315,7 @@ public static class PartitionOffsetInfo { */ @NotNull @Min(1) - @Max(100) + @Max(1000) @JsonProperty("lastOffset") private Long count; diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index b01ea072..ed4ef9f2 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -7,30 +7,49 @@ public final class AvroMessageDeserializer implements MessageDeserializer { - private final String topicName; - private final KafkaAvroDeserializer deserializer; - - public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { - this.topicName = topicName; - this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); - } - - @Override - public String deserializeMessage(ByteBuffer buffer) { - // Convert byte buffer to byte array - final var bytes = ByteUtils.convertToByteArray(buffer); - return deserializer.deserialize(topicName, bytes).toString(); - } - - private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { - final var config = new HashMap(); - config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); - if (schemaRegistryAuth != null) { - config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); - config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); + private final String topicName; + private final KafkaAvroDeserializer deserializer; + + public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, + String schemaRegistryAuth) { + this.topicName = topicName; + this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, + Optional.empty(), + Optional.empty()); + } + + public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, + String schemaRegistryAuth, + Optional truststoreLocation, + Optional truststorePassword) { + this.topicName = topicName; + this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, + truststoreLocation, truststorePassword); + } + + @Override + public String deserializeMessage(ByteBuffer buffer) { + // Convert byte buffer to byte array + final var bytes = ByteUtils.convertToByteArray(buffer); + return deserializer.deserialize(topicName, bytes).toString(); + } + + private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth, + Optional truststoreLocation, + Optional truststorePassword) { + final var config = new HashMap(); + config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); + if (schemaRegistryAuth != null) { + config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); + config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); + config.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, false); + truststorePassword.ifPresent(passwd -> config.put("schema.registry.ssl.truststore.password", passwd)); + truststoreLocation.ifPresent(location -> config.put("schema.registry.ssl.truststore.location", location)); + + + } + final var kafkaAvroDeserializer = new KafkaAvroDeserializer(); + kafkaAvroDeserializer.configure(config, false); + return kafkaAvroDeserializer; } - final var kafkaAvroDeserializer = new KafkaAvroDeserializer(); - kafkaAvroDeserializer.configure(config, false); - return kafkaAvroDeserializer; - } }