Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Controller() method to Client interface #1063

Merged
merged 2 commits into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 62 additions & 15 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ type Client interface {
// altered after it has been created.
Config() *Config

// Controller returns the cluster controller broker.
Controller() (*Broker, error)

// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

Expand Down Expand Up @@ -97,6 +100,7 @@ type client struct {
seedBrokers []*Broker
deadSeeds []*Broker

controllerID int32 // cluster controller broker id
brokers map[int32]*Broker // maps broker ids to brokers
metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
Expand Down Expand Up @@ -379,6 +383,27 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
return offset, err
}

func (client *client) Controller() (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

controller := client.cachedController()
if controller == nil {
if err := client.refreshMetadata(); err != nil {
return nil, err
}
controller = client.cachedController()
}

if controller == nil {
return nil, ErrControllerNotAvailable
}

_ = controller.Open(client.conf)
return controller, nil
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down Expand Up @@ -607,20 +632,7 @@ func (client *client) backgroundMetadataUpdater() {
for {
select {
case <-ticker.C:
topics := []string{}
if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
Logger.Println("Client background metadata topic load:", err)
break
} else if len(specificTopics) == 0 {
Logger.Println("Client background metadata update: no specific topics to update")
break
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
if err := client.refreshMetadata(); err != nil {
Logger.Println("Client background metadata update:", err)
}
case <-client.closer:
Expand All @@ -629,6 +641,26 @@ func (client *client) backgroundMetadataUpdater() {
}
}

func (client *client) refreshMetadata() error {
topics := []string{}

if !client.conf.Metadata.Full {
if specificTopics, err := client.Topics(); err != nil {
return err
} else if len(specificTopics) == 0 {
return ErrNoTopicsToUpdateMetadata
} else {
topics = specificTopics
}
}

if err := client.RefreshMetadata(topics...); err != nil {
return err
}

return nil
}

func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
retry := func(err error) error {
if attemptsRemaining > 0 {
Expand All @@ -645,7 +677,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
} else {
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
response, err := broker.GetMetadata(&MetadataRequest{Topics: topics})

req := &MetadataRequest{Topics: topics}
if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is this line that is causing the tests to fail. The commit you are based on runs successfully, and this branch fails consistently even when I rerun.

Copy link
Contributor Author

@imjustfly imjustfly Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reset to the base commit eae91468c24263f06ae73dce45eb26252c73b1d6 and run make test, the tests still fail at:

=== RUN   TestMessageEncoding
--- FAIL: TestMessageEncoding (0.01s)
        request_test.go:34: Encoding empty gzip failed
                got  [132 99 80 148 0 1 255 255 255 255 0 0 0 23 31 139 8 0 0 0 0 0 0 255 1 0 0 255 255 0 0 0 0 0 0 0 0]
                want [97 79 149 90 0 1 255 255 255 255 0 0 0 23 31 139 8 0 0 9 110 136 0 255 1 0 0 255 255 0 0 0 0 0 0 0 0]

But fails related to consumer disappeared.

Copy link
Contributor Author

@imjustfly imjustfly Mar 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the test fails related to consumer.

}
response, err := broker.GetMetadata(req)

switch err.(type) {
case nil:
Expand Down Expand Up @@ -686,6 +723,9 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
for _, broker := range data.Brokers {
client.registerBroker(broker)
}

client.controllerID = data.ControllerID

if allKnownMetaData {
client.metadata = make(map[string]map[int32]*PartitionMetadata)
client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
Expand Down Expand Up @@ -739,6 +779,13 @@ func (client *client) cachedCoordinator(consumerGroup string) *Broker {
return nil
}

func (client *client) cachedController() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()

return client.brokers[client.controllerID]
}

func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
retry := func(err error) (*FindCoordinatorResponse, error) {
if attemptsRemaining > 0 {
Expand Down
41 changes: 41 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,47 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
safeClose(t, c)
}

func TestClientController(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
controllerBroker := NewMockBroker(t, 2)
defer controllerBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()),
})

cfg := NewConfig()

// test kafka version greater than 0.10.0.0
cfg.Version = V0_10_0_0
client1, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client1)
broker, err := client1.Controller()
if err != nil {
t.Fatal(err)
}
if broker.Addr() != controllerBroker.Addr() {
t.Errorf("Expected controller to have address %s, found %s", controllerBroker.Addr(), broker.Addr())
}

// test kafka version earlier than 0.10.0.0
cfg.Version = V0_9_0_1
client2, err := NewClient([]string{seedBroker.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client2)
if _, err = client2.Controller(); err != ErrControllerNotAvailable {
t.Errorf("Expected Contoller() to return %s, found %s", ErrControllerNotAvailable, err)
}
}
func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
staleCoordinator := NewMockBroker(t, 2)
Expand Down
8 changes: 8 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetc
// a RecordBatch.
var ErrConsumerOffsetNotAdvanced = errors.New("kafka: consumer offset was not advanced after a RecordBatch")

// ErrControllerNotAvailable is returned when server didn't give correct controller id. May be kafka server's version
// is lower than 0.10.0.0.
var ErrControllerNotAvailable = errors.New("kafka: controller is not avaiable")

// ErrNoTopicsToUpdateMetadata is returned when Meta.Full is set to false but no specific topics were found to update
// the metadata.
var ErrNoTopicsToUpdateMetadata = errors.New("kafka: no specific topics to update metadata")

// PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
type PacketEncodingError struct {
Expand Down
17 changes: 13 additions & 4 deletions mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {

// MockMetadataResponse is a `MetadataResponse` builder.
type MockMetadataResponse struct {
leaders map[string]map[int32]int32
brokers map[string]int32
t TestReporter
controllerID int32
leaders map[string]map[int32]int32
brokers map[string]int32
t TestReporter
}

func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
Expand All @@ -96,9 +97,17 @@ func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMet
return mmr
}

func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
mmr.controllerID = brokerID
return mmr
}

func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
metadataRequest := reqBody.(*MetadataRequest)
metadataResponse := &MetadataResponse{}
metadataResponse := &MetadataResponse{
Version: metadataRequest.version(),
ControllerID: mmr.controllerID,
}
for addr, brokerID := range mmr.brokers {
metadataResponse.AddBroker(addr, brokerID)
}
Expand Down