Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create multiple receiver verticles #985

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data-plane/profiler/resources/config-sacura-warmup.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

sender:
target: http://localhost:8080/attack-ingress-single
frequency: 1000
frequency: 10000
workers: 15
keepAlive: true
receiver:
port: 8081
timeout: 0m
timeout: 2m
duration: 5m
timeout: 0m
6 changes: 3 additions & 3 deletions data-plane/profiler/resources/config-sacura.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

sender:
target: http://localhost:8080/attack-ingress-single
frequency: 1000
frequency: 10000
workers: 15
keepAlive: true
receiver:
port: 8081
timeout: 0m
timeout: 2m
duration: 10m
timeout: 0m
timeout: 1m
29 changes: 16 additions & 13 deletions data-plane/profiler/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
#

set -e

# Set runtime variables
# echo 1 > /proc/sys/kernel/perf_event_paranoid
# echo 0 > /proc/sys/kernel/kptr_restrict
Expand All @@ -40,6 +42,9 @@ PROJECT_ROOT_DIR=$(dirname $0)/..
RESOURCES_DIR="$(dirname $0)"/resources
ASYNC_PROFILER_URL="https://github.com/jvm-profiling-tools/async-profiler/releases/download/v2.0/async-profiler-2.0-linux-x64.tar.gz"
KAFKA_URL="https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz"
LOG_DIR=${LOG_DIR:-"/tmp/eventing-kafka-broker-logs/profiler"}

rm -rf "${LOG_DIR}" && mkdir -p "${LOG_DIR}"

echo "Project root dir: ${PROJECT_ROOT_DIR}"
echo "Resource dir: ${RESOURCES_DIR}"
Expand All @@ -50,28 +55,26 @@ echo "Kafka URL: ${KAFKA_URL}"
cd "${PROJECT_ROOT_DIR}" && ./mvnw package -DskipTests -Dlicense.skip -Deditorconfig.skip -B -U --no-transfer-progress && cd - || exit 1

# Download async profiler.
rm async-profiler
rm async-profiler.tgz
rm -rf async-profiler
rm -rf async-profiler.tgz
mkdir async-profiler
wget -O - ${ASYNC_PROFILER_URL} >async-profiler.tgz
tar xzvf async-profiler.tgz -C async-profiler --strip-components=1

# Download Apache Kafka.
rm kafka
rm kafka.tgz
rm -r /tmp/kafka-logs/
rm -r /tmp/zookeeper/
rm -rf kafka
rm -rf kafka.tgz
rm -rf /tmp/kafka-logs/
rm -rf /tmp/zookeeper/
mkdir kafka
wget -O - ${KAFKA_URL} >kafka.tgz
tar xzvf kafka.tgz -C kafka --strip-components=1

ls -al

# Start Zookeeper and Kafka.
cd kafka || exit 1
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/zookeeper-server-start.sh config/zookeeper.properties >"${LOG_DIR}/zookeeper.log" &
zookeeper_pid=$!
./bin/kafka-server-start.sh config/server.properties &
./bin/kafka-server-start.sh config/server.properties >"${LOG_DIR}/kafka.log" &
kafka_pid=$!

# Create our Kafka topic.
Expand Down Expand Up @@ -106,7 +109,7 @@ java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/receiver/target/receiver-1.0-SNAPSHOT.jar &
-jar "${PROJECT_ROOT_DIR}"/receiver/target/receiver-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" &
receiver_pid=$!

# Define expected env variables.
Expand All @@ -121,7 +124,7 @@ java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/dispatcher/target/dispatcher-1.0-SNAPSHOT.jar &
-jar "${PROJECT_ROOT_DIR}"/dispatcher/target/dispatcher-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" &
dispatcher_pid=$!

# Download Sacura
Expand Down Expand Up @@ -155,6 +158,6 @@ kill $dispatcher_pid
kill -9 $kafka_pid
kill -9 $zookeeper_pid

rm -r kafka kafka.tgz async-profiler async-profiler.tgz
rm -rf kafka kafka.tgz async-profiler async-profiler.tgz

exit 0
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,33 @@
import dev.knative.eventing.kafka.broker.core.file.FileWatcher;
import dev.knative.eventing.kafka.broker.core.metrics.Metrics;
import dev.knative.eventing.kafka.broker.core.reconciler.impl.ResourcesReconcilerMessageHandler;
import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import dev.knative.eventing.kafka.broker.core.tracing.TracingConfig;
import dev.knative.eventing.kafka.broker.core.utils.Configurations;
import dev.knative.eventing.kafka.broker.core.utils.Shutdown;
import io.cloudevents.kafka.CloudEventSerializer;
import io.cloudevents.kafka.PartitionKeyExtensionInterceptor;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Verticle;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.tracing.opentelemetry.OpenTelemetryOptions;
import net.logstash.logback.encoder.LogstashEncoder;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.FileSystems;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import net.logstash.logback.encoder.LogstashEncoder;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.function.Supplier;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

Expand Down Expand Up @@ -110,34 +111,19 @@ public static void main(final String[] args) throws IOException {
producerConfigs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CloudEventSerializer.class);
producerConfigs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, PartitionKeyExtensionInterceptor.class.getName());

final Function<Vertx, RequestMapper> handlerFactory = v -> new RequestMapper(
v,
AuthProvider.kubernetes(),
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(v, properties),
badRequestCounter,
produceEventsCounter
);

final var httpServerOptions = new HttpServerOptions(
Configurations.getPropertiesAsJson(env.getHttpServerConfigFilePath())
);
httpServerOptions.setPort(env.getIngressPort());
httpServerOptions.setTracingPolicy(TracingPolicy.PROPAGATE);

final var verticle = new ReceiverVerticle(
httpServerOptions,
handlerFactory,
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);
final Supplier<Verticle> verticle = new ReceiverVerticleSupplier(env, producerConfigs, badRequestCounter, produceEventsCounter, httpServerOptions);
slinkydeveloper marked this conversation as resolved.
Show resolved Hide resolved

final var deploymentOptions = new DeploymentOptions()
.setInstances(Runtime.getRuntime().availableProcessors());

final var waitVerticle = new CountDownLatch(1);
vertx.deployVerticle(verticle)
vertx.deployVerticle(verticle, deploymentOptions)
.onSuccess(v -> {
logger.info("Receiver started");
waitVerticle.countDown();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.receiver;

import dev.knative.eventing.kafka.broker.core.security.AuthProvider;
import io.micrometer.core.instrument.Counter;
import io.vertx.core.Verticle;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.kafka.client.producer.KafkaProducer;

import java.util.Properties;
import java.util.function.Supplier;

class ReceiverVerticleSupplier implements Supplier<Verticle> {
pierDipi marked this conversation as resolved.
Show resolved Hide resolved

private final ReceiverEnv env;
private final Properties producerConfigs;
private final Counter badRequestCounter;
private final Counter produceEventsCounter;
private final HttpServerOptions httpServerOptions;

ReceiverVerticleSupplier(final ReceiverEnv env,
final Properties producerConfigs,
final Counter badRequestCounter,
final Counter produceEventsCounter,
final HttpServerOptions httpServerOptions) {

this.env = env;
this.producerConfigs = producerConfigs;
this.badRequestCounter = badRequestCounter;
this.produceEventsCounter = produceEventsCounter;
this.httpServerOptions = httpServerOptions;
}

@Override
public Verticle get() {
return new ReceiverVerticle(
httpServerOptions,
v -> new RequestMapper(
v,
AuthProvider.kubernetes(),
producerConfigs,
new CloudEventRequestToRecordMapper(),
properties -> KafkaProducer.create(v, properties),
badRequestCounter,
produceEventsCounter
),
h -> new SimpleProbeHandlerDecorator(
env.getLivenessProbePath(),
env.getReadinessProbePath(),
h
)
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright © 2018 Knative Authors (knative-dev@googlegroups.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.knative.eventing.kafka.broker.receiver;

import io.micrometer.core.instrument.Counter;
import io.vertx.core.http.HttpServerOptions;
import org.junit.jupiter.api.Test;

import java.util.Properties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

public class ReceiverVerticleSupplierTest {

@Test
public void shouldCreateMultipleReceiverVerticleInstances() {
final var supplier = new ReceiverVerticleSupplier(
mock(ReceiverEnv.class),
mock(Properties.class),
mock(Counter.class),
mock(Counter.class),
mock(HttpServerOptions.class)
);

assertThat(supplier.get()).isNotSameAs(supplier.get());
}
}