Skip to content

Commit

Permalink
Merge pull request #2269 from zerowidth/fix-admin-describe-log-dir-hang
Browse files Browse the repository at this point in the history
Fix DescribeLogDirs hang in admin client
  • Loading branch information
dnwe authored Jul 22, 2022
2 parents 1dd58a4 + d2f69a0 commit e1fbb94
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
2 changes: 1 addition & 1 deletion admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,12 +1050,12 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
wg := sync.WaitGroup{}

for _, b := range brokerIds {
wg.Add(1)
broker, err := ca.findBroker(b)
if err != nil {
Logger.Printf("Unable to find broker with ID = %v\n", b)
continue
}
wg.Add(1)
go func(b *Broker, conf *Config) {
defer wg.Done()
_ = b.Open(conf) // Ensure that broker is opened
Expand Down
46 changes: 46 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"strings"
"testing"
"time"
)

func TestClusterAdmin(t *testing.T) {
Expand Down Expand Up @@ -1742,3 +1743,48 @@ func TestDescribeLogDirs(t *testing.T) {
t.Fatal(err)
}
}

func TestDescribeLogDirsUnknownBroker(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
})

config := NewTestConfig()
config.Version = V1_0_0_0

admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

type result struct {
metadata map[int32][]DescribeLogDirsResponseDirMetadata
err error
}

res := make(chan result)

go func() {
metadata, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID() + 1})
res <- result{metadata, err}
}()

select {
case <-time.After(time.Second):
t.Fatalf("DescribeLogDirs timed out")
case returned := <-res:
if len(returned.metadata) != 0 {
t.Fatalf("Expected no results, got %v", len(returned.metadata))
}
if returned.err != nil {
t.Fatalf("Expected no error, got %v", returned.err)
}
}
}

0 comments on commit e1fbb94

Please sign in to comment.