Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sleep when throttled (KIP-219) #2536

Merged
merged 2 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type Broker struct {

kerberosAuthenticator GSSAPIKerberosAuth
clientSessionReauthenticationTimeMs int64

throttleTimer *time.Timer
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand Down Expand Up @@ -456,7 +458,7 @@ func (b *Broker) AsyncProduce(request *ProduceRequest, cb ProduceCallback) error
}

// Well-formed response
b.updateThrottleMetric(res)
b.handleThrottledResponse(res)
cb(res, nil)
},
}
Expand Down Expand Up @@ -999,6 +1001,9 @@ func (b *Broker) sendInternal(rb protocolBody, promise *responsePromise) error {
return err
}

// check and wait if throttled
b.waitIfThrottled()

requestTime := time.Now()
// Will be decremented in responseReceiver (except error or request with NoResponse)
b.addRequestInFlightMetrics(1)
Expand Down Expand Up @@ -1046,7 +1051,7 @@ func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
return err
}
if res != nil {
b.updateThrottleMetric(res)
b.handleThrottledResponse(res)
}
return nil
}
Expand Down Expand Up @@ -1645,7 +1650,7 @@ type throttleSupport interface {
throttleTime() time.Duration
}

func (b *Broker) updateThrottleMetric(resp protocolBody) {
func (b *Broker) handleThrottledResponse(resp protocolBody) {
throttledResponse, ok := resp.(throttleSupport)
if !ok {
return
Expand All @@ -1656,6 +1661,29 @@ func (b *Broker) updateThrottleMetric(resp protocolBody) {
}
DebugLogger.Printf(
"broker/%d %T throttled %v\n", b.ID(), resp, throttleTime)
b.setThrottle(throttleTime)
b.updateThrottleMetric(throttleTime)
}

func (b *Broker) setThrottle(throttleTime time.Duration) {
if b.throttleTimer != nil {
// if there is an existing timer stop/clear it
if !b.throttleTimer.Stop() {
<-b.throttleTimer.C
}
}
b.throttleTimer = time.NewTimer(throttleTime)
}

func (b *Broker) waitIfThrottled() {
if b.throttleTimer != nil {
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
<-b.throttleTimer.C
b.throttleTimer = nil
}
}

func (b *Broker) updateThrottleMetric(throttleTime time.Duration) {
if b.brokerThrottleTime != nil {
throttleTimeInMs := int64(throttleTime / time.Millisecond)
b.brokerThrottleTime.Update(throttleTimeInMs)
Expand Down
95 changes: 95 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1435,3 +1435,98 @@ func BenchmarkBroker_No_Metrics_Open(b *testing.B) {
broker.Close()
}
}

func Test_handleThrottledResponse(t *testing.T) {
mb := NewMockBroker(nil, 0)
broker := NewBroker(mb.Addr())
broker.id = 0
conf := NewTestConfig()
conf.Version = V1_0_0_0
throttleTimeMs := 100
throttleTime := time.Duration(throttleTimeMs) * time.Millisecond
tests := []struct {
name string
response protocolBody
expectDelay bool
}{
{
name: "throttled response w/millisecond field",
response: &MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs),
},
expectDelay: true,
},
{
name: "not throttled response w/millisecond field",
response: &MetadataResponse{
ThrottleTimeMs: 0,
},
},
{
name: "throttled response w/time.Duration field",
response: &ProduceResponse{
ThrottleTime: throttleTime,
},
expectDelay: true,
},
{
name: "not throttled response w/time.Duration field",
response: &ProduceResponse{
ThrottleTime: time.Duration(0),
},
},
{
name: "not throttled response with no throttle time field",
response: &SaslHandshakeResponse{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
broker.metricRegistry = metrics.NewRegistry()
broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms")
broker.handleThrottledResponse(tt.response)
startTime := time.Now()
broker.waitIfThrottled()
if tt.expectDelay {
if time.Since(startTime) < throttleTime {
t.Fatal("expected throttling to cause delay")
}
if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) {
t.Fatal("expected throttling to update metrics")
}
} else {
if time.Since(startTime) > throttleTime {
t.Fatal("expected no throttling delay")
}
if broker.brokerThrottleTime.Count() != 0 {
t.Fatal("expected no metrics update")
}
}
})
}
t.Run("test second throttle timer overrides first", func(t *testing.T) {
broker.metricRegistry = metrics.NewRegistry()
broker.brokerThrottleTime = broker.registerHistogram("throttle-time-in-ms")
broker.handleThrottledResponse(&MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs),
})
firstTimer := broker.throttleTimer
broker.handleThrottledResponse(&MetadataResponse{
ThrottleTimeMs: int32(throttleTimeMs * 2),
})
if firstTimer.Stop() {
t.Fatal("expected first timer to be stopped")
}
startTime := time.Now()
broker.waitIfThrottled()
if time.Since(startTime) < throttleTime*2 {
t.Fatal("expected throttling to use second delay")
}
if broker.brokerThrottleTime.Min() != int64(throttleTimeMs) {
t.Fatal("expected throttling to update metrics")
}
if broker.brokerThrottleTime.Max() != int64(throttleTimeMs*2) {
t.Fatal("expected throttling to update metrics")
}
})
}
Loading