Skip to content

Commit

Permalink
Fix OpenCircuitException: open circuit (#710) (#717)
Browse files Browse the repository at this point in the history
* Fix OpenCircuitException: open circuit

While analyzing logs I see most of the `sacura` flakiness is due to
errors like:
`io.vertx.circuitbreaker.OpenCircuitException: open circuit`,
which means that the circuit breaker, after switching to its open state,
doesn't switch its state to closed. The actual reason is not yet clear.

- Increase backoff delay of sacura trigger
- Set circuit breaker max failure to a number greater than max retries
- Set circuit breaker reset timeout to backoff dealy

Part of #416

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Reduce event frequency

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>

* Reduce workers

Signed-off-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
  • Loading branch information
pierDipi authored Mar 10, 2021
1 parent 8daeb77 commit b2aa942
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 25 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/knative-java-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ name: Test
on:

push:
branches: [ 'master' ]
branches: [ 'main' ]

pull_request:
branches: [ 'master', 'release-*' ]
branches: [ 'main', 'release-*' ]

jobs:

Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/knative-profile-data-plane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ name: Profiling
on:

push:
branches: [ 'master' ]
branches: [ 'main' ]

pull_request:
branches: [ 'master', 'release-*' ]
branches: [ 'main', 'release-*' ]

jobs:

Expand Down
22 changes: 11 additions & 11 deletions data-plane/THIRD-PARTY.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ Lists of 153 third-party dependencies.
(Apache 2.0) Gson (com.google.code.gson:gson:2.8.6 - https://github.com/google/gson/gson)
(Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.3.4 - http://nexus.sonatype.org/oss-repository-hosting.html/error_prone_parent/error_prone_annotations)
(The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.1 - https://github.com/google/guava/failureaccess)
(Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:29.0-android - https://github.com/google/guava/guava)
(Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:30.0-android - https://github.com/google/guava/guava)
(Apache License, Version 2.0) Guava: Google Core Libraries for Java (com.google.guava:guava:30.0-jre - https://github.com/google/guava/guava)
(The Apache Software License, Version 2.0) Guava ListenableFuture only (com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava - https://github.com/google/guava/listenablefuture)
(The Apache Software License, Version 2.0) J2ObjC Annotations (com.google.j2objc:j2objc-annotations:1.3 - https://github.com/google/j2objc/)
(3-Clause BSD License) Protocol Buffers [Core] (com.google.protobuf:protobuf-java:3.14.0 - https://developers.google.com/protocol-buffers/protobuf-java/)
(3-Clause BSD License) Protocol Buffers [Util] (com.google.protobuf:protobuf-java-util:3.14.0 - https://developers.google.com/protocol-buffers/protobuf-java-util/)
(3-Clause BSD License) Protocol Buffers [Core] (com.google.protobuf:protobuf-java:3.15.1 - https://developers.google.com/protocol-buffers/protobuf-java/)
(3-Clause BSD License) Protocol Buffers [Util] (com.google.protobuf:protobuf-java-util:3.15.1 - https://developers.google.com/protocol-buffers/protobuf-java-util/)
(Apache 2.0) OkHttp Logging Interceptor (com.squareup.okhttp3:logging-interceptor:3.12.12 - https://github.com/square/okhttp/logging-interceptor)
(Apache 2.0) MockWebServer (com.squareup.okhttp3:mockwebserver:3.12.6 - https://github.com/square/okhttp/mockwebserver)
(Apache 2.0) OkHttp (com.squareup.okhttp3:okhttp:3.14.9 - https://github.com/square/okhttp/okhttp)
Expand Down Expand Up @@ -65,8 +65,8 @@ Lists of 153 third-party dependencies.
(Apache License, Version 2.0) Fabric8 :: Kubernetes :: Server Mock (io.fabric8:kubernetes-server-mock:5.0.2 - http://fabric8.io/kubernetes-server-mock/)
(Apache License, Version 2.0) Fabric8 :: Mock Web Server (io.fabric8:mockwebserver:0.1.8 - http://fabric8.io/)
(The Apache Software License, Version 2.0) zjsonpatch (io.fabric8:zjsonpatch:0.3.0 - https://github.com/fabric8io/zjsonpatch/)
(The Apache Software License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.6.3 - https://github.com/micrometer-metrics/micrometer)
(The Apache Software License, Version 2.0) micrometer-registry-prometheus (io.micrometer:micrometer-registry-prometheus:1.6.3 - https://github.com/micrometer-metrics/micrometer)
(The Apache Software License, Version 2.0) micrometer-core (io.micrometer:micrometer-core:1.6.4 - https://github.com/micrometer-metrics/micrometer)
(The Apache Software License, Version 2.0) micrometer-registry-prometheus (io.micrometer:micrometer-registry-prometheus:1.6.4 - https://github.com/micrometer-metrics/micrometer)
(Apache License, Version 2.0) Netty/Buffer (io.netty:netty-buffer:4.1.52.Final - https://netty.io/netty-buffer/)
(Apache License, Version 2.0) Netty/Codec (io.netty:netty-codec:4.1.52.Final - https://netty.io/netty-codec/)
(Apache License, Version 2.0) Netty/Codec/DNS (io.netty:netty-codec-dns:4.1.52.Final - https://netty.io/netty-codec-dns/)
Expand Down Expand Up @@ -109,8 +109,8 @@ Lists of 153 third-party dependencies.
(The Apache Software License, Version 2.0) Zipkin Sender: OkHttp 3 (io.zipkin.reporter2:zipkin-sender-okhttp3:2.16.0 - https://github.com/openzipkin/zipkin-reporter-java/zipkin-sender-okhttp3)
(The Apache Software License, Version 2.0) Zipkin Core Library (io.zipkin.zipkin2:zipkin:2.22.2 - https://github.com/openzipkin/zipkin/zipkin)
(Eclipse Public License 1.0) JUnit (junit:junit:4.13.1 - http://junit.org)
(Apache License, Version 2.0) Byte Buddy (without dependencies) (net.bytebuddy:byte-buddy:1.10.19 - https://bytebuddy.net/byte-buddy)
(Apache License, Version 2.0) Byte Buddy agent (net.bytebuddy:byte-buddy-agent:1.10.19 - https://bytebuddy.net/byte-buddy-agent)
(Apache License, Version 2.0) Byte Buddy (without dependencies) (net.bytebuddy:byte-buddy:1.10.20 - https://bytebuddy.net/byte-buddy)
(Apache License, Version 2.0) Byte Buddy agent (net.bytebuddy:byte-buddy-agent:1.10.20 - https://bytebuddy.net/byte-buddy-agent)
(Apache License, Version 2.0) (MIT License) Logstash Logback Encoder (net.logstash.logback:logstash-logback-encoder:6.6 - https://github.com/logstash/logstash-logback-encoder)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:4.6 - http://pholser.github.com/jopt-simple)
(The MIT License) JOpt Simple (net.sf.jopt-simple:jopt-simple:5.0.4 - http://jopt-simple.github.io/jopt-simple)
Expand Down Expand Up @@ -138,11 +138,11 @@ Lists of 153 third-party dependencies.
(Eclipse Public License v2.0) JUnit Platform Engine API (org.junit.platform:junit-platform-engine:1.7.0 - https://junit.org/junit5/)
(Public Domain, per Creative Commons CC0) LatencyUtils (org.latencyutils:LatencyUtils:2.0.3 - http://latencyutils.github.io/LatencyUtils/)
(The Apache Software License, Version 2.0) LZ4 and xxHash (org.lz4:lz4-java:1.7.1 - https://github.com/lz4/lz4-java)
(The MIT License) mockito-core (org.mockito:mockito-core:3.7.7 - https://github.com/mockito/mockito)
(The MIT License) mockito-junit-jupiter (org.mockito:mockito-junit-jupiter:3.7.7 - https://github.com/mockito/mockito)
(The MIT License) mockito-core (org.mockito:mockito-core:3.8.0 - https://github.com/mockito/mockito)
(The MIT License) mockito-junit-jupiter (org.mockito:mockito-junit-jupiter:3.8.0 - https://github.com/mockito/mockito)
(Apache License, Version 2.0) Objenesis (org.objenesis:objenesis:3.1 - http://objenesis.org)
(GNU General Public License (GPL), version 2, with the Classpath exception) JMH Core (org.openjdk.jmh:jmh-core:1.23 - http://openjdk.java.net/projects/code-tools/jmh/jmh-core/)
(GNU General Public License (GPL), version 2, with the Classpath exception) JMH Generators: Annotation Processors (org.openjdk.jmh:jmh-generator-annprocess:1.23 - http://openjdk.java.net/projects/code-tools/jmh/jmh-generator-annprocess/)
(GNU General Public License (GPL), version 2, with the Classpath exception) JMH Core (org.openjdk.jmh:jmh-core:1.27 - http://openjdk.java.net/projects/code-tools/jmh/jmh-core/)
(GNU General Public License (GPL), version 2, with the Classpath exception) JMH Generators: Annotation Processors (org.openjdk.jmh:jmh-generator-annprocess:1.27 - http://openjdk.java.net/projects/code-tools/jmh/jmh-generator-annprocess/)
(The Apache License, Version 2.0) org.opentest4j:opentest4j (org.opentest4j:opentest4j:1.2.0 - https://github.com/ota4j-team/opentest4j)
(The New BSD License) (WTFPL) Reflections (org.reflections:reflections:0.9.12 - http://github.com/ronmamo/reflections)
(Apache-2.0) Scala Library (org.scala-lang:scala-library:2.12.12 - https://www.scala-lang.org/)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public static DataPlaneContract.Egress egress1() {
.setConsumerGroup("1-1234567")
.setDestination(DESTINATION)
.setFilter(DataPlaneContract.Filter.newBuilder().putAttributes("type", "dev.knative"))
.setEgressConfig(DataPlaneContract.EgressConfig.newBuilder()
.setRetry(1)
.setBackoffDelay(1000)
.build())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -58,11 +63,13 @@
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import static dev.knative.eventing.kafka.broker.core.utils.Logging.keyValue;

public class HttpConsumerVerticleFactory implements ConsumerVerticleFactory {

private static final Logger logger = LoggerFactory.getLogger(HttpConsumerVerticleFactory.class);

private final static ConsumerRecordSender NO_DLQ_SENDER =
ConsumerRecordSender.create(Future.failedFuture("No DLQ set"), Future.succeededFuture());

Expand Down Expand Up @@ -200,7 +207,10 @@ private ConsumerRecordSender createConsumerRecordSender(

final var circuitBreaker = CircuitBreaker
.create(target, vertx, createCircuitBreakerOptions(egress))
.retryPolicy(computeRetryPolicy(egress));
.retryPolicy(computeRetryPolicy(egress))
.openHandler(r -> logger.info("Circuit breaker opened {}", keyValue("target", target)))
.halfOpenHandler(r -> logger.info("Circuit breaker half-opened {}", keyValue("target", target)))
.closeHandler(r -> logger.info("Circuit breaker closed {}", keyValue("target", target)));

return new HttpConsumerRecordSender(
vertx,
Expand All @@ -212,7 +222,16 @@ private ConsumerRecordSender createConsumerRecordSender(

private static CircuitBreakerOptions createCircuitBreakerOptions(final DataPlaneContract.EgressConfig egressConfig) {
if (egressConfig != null && egressConfig.getRetry() > 0) {
return new CircuitBreakerOptions().setMaxRetries(egressConfig.getRetry());
return new CircuitBreakerOptions()
// TODO reset timeout should be configurable or, at least, set by the control plane
.setResetTimeout(
egressConfig.getBackoffDelay() > 0 ?
egressConfig.getBackoffDelay() :
CircuitBreakerOptions.DEFAULT_RESET_TIMEOUT
)
// TODO max failures should be configurable or, at least, set by the control plane
.setMaxFailures(egressConfig.getRetry() * 2)
.setMaxRetries(egressConfig.getRetry());
}
return new CircuitBreakerOptions();
}
Expand All @@ -230,10 +249,9 @@ static Function<Integer, Long> computeRetryPolicy(final EgressConfig egress) {
}

private static boolean hasDeadLetterSink(final EgressConfig egressConfig) {
return !(egressConfig == null || egressConfig.getDeadLetter() == null || egressConfig.getDeadLetter().isEmpty());
return !(egressConfig == null || egressConfig.getDeadLetter().isEmpty());
}


private static OffsetManager getOffsetManager(final DeliveryGuarantee type, final KafkaConsumer<?, ?> consumer,
Consumer<Integer> commitHandler) {
return switch (type) {
Expand Down
4 changes: 2 additions & 2 deletions test/config/sacura/101-broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ spec:
name: config-broker

delivery:
retry: 3
retry: 5
backoffPolicy: exponential
backoffDelay: PT0.5S
backoffDelay: PT5S
4 changes: 2 additions & 2 deletions test/config/sacura/200-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ data:
sacura.yaml: |
sender:
target: http://kafka-broker-ingress.knative-eventing.svc.cluster.local/sacura/broker
frequency: 200
workers: 15
frequency: 100
workers: 4
keepAlive: true
receiver:
port: 8080
Expand Down

0 comments on commit b2aa942

Please sign in to comment.