Skip to content

Commit

Permalink
Get WebClient config from config map (#149)
Browse files Browse the repository at this point in the history
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>
  • Loading branch information
slinkydeveloper authored Sep 7, 2020
1 parent 5958397 commit fa7b309
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 64 deletions.
2 changes: 2 additions & 0 deletions data-plane/config/100-config-kafka-broker-data-plane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,5 @@ data:
# ssl.keymanager.algorithm
# ssl.secure.random.implementation
# ssl.trustmanager.algorithm
config-kafka-broker-webclient.properties: |
idleTimeout=10000
2 changes: 2 additions & 0 deletions data-plane/config/template/500-dispatcher.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ spec:
value: /etc/config/config-kafka-broker-producer.properties
- name: CONSUMER_CONFIG_FILE_PATH
value: /etc/config/config-kafka-broker-consumer.properties
- name: WEBCLIENT_CONFIG_FILE_PATH
value: /etc/config/config-kafka-broker-webclient.properties
- name: BROKERS_TRIGGERS_PATH
value: /etc/brokers-triggers/data
- name: BROKERS_INITIAL_CAPACITY
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dev.knative.eventing.kafka.broker.dispatcher;

import static net.logstash.logback.argument.StructuredArguments.keyValue;

import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Configurations {

private static final Logger logger = LoggerFactory.getLogger(Configurations.class);

static Properties getKafkaProperties(final String path) {
if (path == null) {
return new Properties();
}

final var props = new Properties();
try (final var configReader = new FileReader(path)) {
props.load(configReader);
} catch (IOException e) {
logger.error("failed to load configurations from file {}", keyValue("path", path), e);
}

return props;
}

static JsonObject getFileConfigurations(final Vertx vertx, String file) throws ExecutionException, InterruptedException {
final var fileConfigs = new ConfigStoreOptions()
.setType("file")
.setFormat("properties")
.setConfig(new JsonObject().put("path", file));

return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(fileConfigs))
.getConfig()
.toCompletionStage()
.toCompletableFuture()
.get();
}

static JsonObject getEnvConfigurations(final Vertx vertx) throws InterruptedException, ExecutionException {
final var envConfigs = new ConfigStoreOptions()
.setType("env")
.setOptional(false)
.setConfig(new JsonObject().put("raw-data", true));

return ConfigRetriever.create(vertx, new ConfigRetrieverOptions().addStore(envConfigs))
.getConfig()
.toCompletionStage()
.toCompletableFuture()
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,17 @@

package dev.knative.eventing.kafka.broker.dispatcher;

import static net.logstash.logback.argument.StructuredArguments.keyValue;

import dev.knative.eventing.kafka.broker.core.ObjectsCreator;
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.dispatcher.http.HttpConsumerVerticleFactory;
import io.cloudevents.CloudEvent;
import io.vertx.config.ConfigRetriever;
import io.vertx.config.ConfigRetrieverOptions;
import io.vertx.config.ConfigStoreOptions;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import net.logstash.logback.encoder.LogstashEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,6 +38,7 @@ public class Main {
private static final String BROKERS_TRIGGERS_PATH = "BROKERS_TRIGGERS_PATH";
private static final String PRODUCER_CONFIG_FILE_PATH = "PRODUCER_CONFIG_FILE_PATH";
private static final String CONSUMER_CONFIG_FILE_PATH = "CONSUMER_CONFIG_FILE_PATH";
private static final String WEBCLIENT_CONFIG_FILE_PATH = "WEBCLIENT_CONFIG_FILE_PATH";
private static final String BROKERS_INITIAL_CAPACITY = "BROKERS_INITIAL_CAPACITY";
private static final String TRIGGERS_INITIAL_CAPACITY = "TRIGGERS_INITIAL_CAPACITY";
public static final String INSTANCE_ID = "INSTANCE_ID";
Expand All @@ -55,7 +48,7 @@ public class Main {
*
* @param args command line arguments.
*/
public static void main(final String[] args) {
public static void main(final String[] args) throws Exception {

// HACK HACK HACK
// maven-shade-plugin doesn't include the LogstashEncoder class, neither by specifying the
Expand All @@ -69,34 +62,27 @@ public static void main(final String[] args) {
final var vertx = Vertx.vertx();
Runtime.getRuntime().addShutdownHook(new Thread(vertx::close));

final JsonObject json;
try {
json = getConfigurations(vertx);
} catch (InterruptedException e) {
System.exit(1);
return;
}

final var producerConfigs = config(json.getString(PRODUCER_CONFIG_FILE_PATH));
final var consumerConfigs = config(json.getString(CONSUMER_CONFIG_FILE_PATH));
final var instanceID = json.getString(INSTANCE_ID);
final JsonObject envConfig = Configurations.getEnvConfigurations(vertx);
final var producerConfig = Configurations.getKafkaProperties(envConfig.getString(PRODUCER_CONFIG_FILE_PATH));
final var consumerConfig = Configurations.getKafkaProperties(envConfig.getString(CONSUMER_CONFIG_FILE_PATH));
final var webClientConfig = Configurations.getFileConfigurations(vertx, envConfig.getString(WEBCLIENT_CONFIG_FILE_PATH));

final ConsumerRecordOffsetStrategyFactory<String, CloudEvent>
consumerRecordOffsetStrategyFactory = ConsumerRecordOffsetStrategyFactory.unordered();

final var consumerVerticleFactory = new HttpConsumerVerticleFactory(
consumerRecordOffsetStrategyFactory,
consumerConfigs,
WebClient.create(vertx, new WebClientOptions().setIdleTimeout(10000)),
consumerConfig,
WebClient.create(vertx, new WebClientOptions(webClientConfig)),
vertx,
producerConfigs
producerConfig
);

final var brokersManager = new BrokersManager<>(
vertx,
consumerVerticleFactory,
Integer.parseInt(json.getString(BROKERS_INITIAL_CAPACITY)),
Integer.parseInt(json.getString(TRIGGERS_INITIAL_CAPACITY))
Integer.parseInt(envConfig.getString(BROKERS_INITIAL_CAPACITY)),
Integer.parseInt(envConfig.getString(TRIGGERS_INITIAL_CAPACITY))
);

final var objectCreator = new ObjectsCreator(brokersManager);
Expand All @@ -105,7 +91,7 @@ public static void main(final String[] args) {
final var fw = new FileWatcher(
FileSystems.getDefault().newWatchService(),
objectCreator,
new File(json.getString(BROKERS_TRIGGERS_PATH))
new File(envConfig.getString(BROKERS_TRIGGERS_PATH))
);

fw.watch(); // block forever
Expand All @@ -115,42 +101,4 @@ public static void main(final String[] args) {
}
}

private static Properties config(final String path) {
if (path == null) {
return new Properties();
}

final var consumerConfigs = new Properties();
try (final var configReader = new FileReader(path)) {
consumerConfigs.load(configReader);
} catch (IOException e) {
logger.error("failed to load configurations from file {}", keyValue("path", path), e);
}

return consumerConfigs;
}

private static JsonObject getConfigurations(final Vertx vertx) throws InterruptedException {

final var envConfigs = new ConfigStoreOptions()
.setType("env")
.setOptional(false)
.setConfig(new JsonObject().put("raw-data", true));

final var configRetrieverOptions = new ConfigRetrieverOptions()
.addStore(envConfigs);

final var configRetriever = ConfigRetriever.create(vertx, configRetrieverOptions);


final var waitConfigs = new ArrayBlockingQueue<JsonObject>(1);
configRetriever.getConfig()
.onSuccess(waitConfigs::add)
.onFailure(cause -> {
logger.error("failed to retrieve configurations", cause);
vertx.close(ignored -> System.exit(1));
});

return waitConfigs.take();
}
}

0 comments on commit fa7b309

Please sign in to comment.