Skip to content

Commit

Permalink
feat(test): add an fvt for broker deadlock
Browse files Browse the repository at this point in the history
Just a simple test to kill broker connectivity whilst the producer is
in-use and confirm that it is able to successfully recover.

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
  • Loading branch information
dnwe committed Feb 13, 2022
1 parent 6693712 commit 091d6d2
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package sarama

import (
"fmt"
"math"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -335,6 +336,67 @@ func testProducingMessages(t *testing.T, config *Config) {
safeClose(t, client)
}

// TestAsyncProducerRemoteBrokerClosed ensures that the async producer can
// cleanly recover if network connectivity to the remote brokers is lost and
// then subsequently resumed.
//
// https://github.com/Shopify/sarama/issues/2129
func TestAsyncProducerRemoteBrokerClosed(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config.ClientID = t.Name()
config.Net.MaxOpenRequests = 1
config.Producer.Flush.MaxMessages = 1
config.Producer.Return.Successes = true
config.Producer.Retry.Max = math.MaxInt32
config.Producer.Retry.Backoff = time.Millisecond
config.Version, _ = ParseKafkaVersion(FunctionalTestEnv.KafkaVersion)

producer, err := NewAsyncProducer(
FunctionalTestEnv.KafkaBrokerAddrs,
config,
)
if err != nil {
t.Fatal(err)
}

// produce some more messages and ensure success
for i := 0; i < 10; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

// shutdown all the active tcp connections
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Disable()
}

// produce some more messages
for i := 10; i < 20; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
}

// re-open the proxies
for _, proxy := range FunctionalTestEnv.Proxies {
_ = proxy.Enable()
}

// ensure the previously produced messages suceed
for i := 10; i < 20; i++ {
<-producer.Successes()
}

// produce some more messages and ensure success
for i := 20; i < 30; i++ {
producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)}
<-producer.Successes()
}

closeProducer(t, producer)
}

func validateMetrics(t *testing.T, client Client) {
// Get the broker used by test1 topic
var broker *Broker
Expand Down

0 comments on commit 091d6d2

Please sign in to comment.