Skip to content

Commit

Permalink
kgo: send all metadata requests through the internal mapped cache
Browse files Browse the repository at this point in the history
Issue #800 was created as a follow up idea to strengthen caching of
metadata requests in the client. This pushes the mapped metadata caching
logic deeper into the guts of issuing metadata requests, so that no
caching is ever missed. The next commit will introduce a new API to
request potentially cached metadata.

For #800.
  • Loading branch information
twmb committed Jan 21, 2025
1 parent f30c518 commit 7945c00
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 33 deletions.
48 changes: 22 additions & 26 deletions pkg/kgo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,11 +858,11 @@ func (cl *Client) fetchBrokerMetadata(ctx context.Context) error {
close(wait.done)
}()

_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true)
_, _, wait.err = cl.fetchMetadata(ctx, kmsg.NewPtrMetadataRequest(), true, nil)
return wait.err
}

func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics []string) (*broker, *kmsg.MetadataResponse, error) {
func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics []string, intoMapped map[string]mappedMetadataTopic) (*broker, *kmsg.MetadataResponse, error) {
req := kmsg.NewPtrMetadataRequest()
req.AllowAutoTopicCreation = cl.cfg.allowAutoTopicCreation
if all {
Expand All @@ -876,10 +876,10 @@ func (cl *Client) fetchMetadataForTopics(ctx context.Context, all bool, topics [
req.Topics = append(req.Topics, reqTopic)
}
}
return cl.fetchMetadata(ctx, req, true)
return cl.fetchMetadata(ctx, req, true, intoMapped)
}

func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool) (*broker, *kmsg.MetadataResponse, error) {
func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest, limitRetries bool, intoMapped map[string]mappedMetadataTopic) (*broker, *kmsg.MetadataResponse, error) {
r := cl.retryable()

// We limit retries for internal metadata refreshes, because these do
Expand All @@ -903,6 +903,9 @@ func (cl *Client) fetchMetadata(ctx context.Context, req *kmsg.MetadataRequest,
cl.controllerIDMu.Unlock()
}
cl.updateBrokers(meta.Brokers)

// Cache the mapped metadata, and potentially store each topic in the results.
cl.storeCachedMappedMetadata(meta, intoMapped)
}
return r.last, meta, err
}
Expand Down Expand Up @@ -1354,7 +1357,7 @@ func (cl *Client) shardedRequest(ctx context.Context, req kmsg.Request) ([]Respo
case *kmsg.MetadataRequest:
// We hijack any metadata request so as to populate our
// own brokers and controller ID.
br, resp, err := cl.fetchMetadata(ctx, t, false)
br, resp, err := cl.fetchMetadata(ctx, t, false, nil)
return shards(shard(br, req, resp, err)), nil

case kmsg.AdminRequest:
Expand Down Expand Up @@ -2383,11 +2386,12 @@ func (cl *Client) maybeDeleteMappedMetadata(unknownTopic bool, ts ...string) (sh
}
}

now := time.Now()
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
for _, t := range ts {
tcached, exists := cl.mappedMeta[t]
if exists && (min == 0 || time.Since(tcached.when) > min) {
if exists && (min == 0 || now.Sub(tcached.when) > min) {
shouldRetry = true
delete(cl.mappedMeta, t)
}
Expand Down Expand Up @@ -2426,34 +2430,25 @@ func (cl *Client) fetchCachedMappedMetadata(ts ...string) (map[string]mappedMeta
// this is garbage heavy, so it is only used in one off requests in this
// package.
func (cl *Client) fetchMappedMetadata(ctx context.Context, topics []string, useCache bool) (map[string]mappedMetadataTopic, error) {
var r map[string]mappedMetadataTopic
var intoMapped map[string]mappedMetadataTopic
needed := topics
if useCache {
r, needed = cl.fetchCachedMappedMetadata(topics...)
intoMapped, needed = cl.fetchCachedMappedMetadata(topics...)
if len(needed) == 0 {
return r, nil
return intoMapped, nil
}
}
if r == nil {
r = make(map[string]mappedMetadataTopic)
if intoMapped == nil {
intoMapped = make(map[string]mappedMetadataTopic)
}

_, meta, err := cl.fetchMetadataForTopics(ctx, false, needed)
if err != nil {
return nil, err
}

// Cache the mapped metadata, and also store each topic in the results.
cl.storeCachedMappedMetadata(meta, func(entry mappedMetadataTopic) {
r[*entry.t.Topic] = entry
})

return r, nil
_, _, err := cl.fetchMetadataForTopics(ctx, false, needed, intoMapped)
return intoMapped, err
}

// storeCachedMappedMetadata caches the fetched metadata in the Client, and calls the onEachTopic callback
// function for each topic in the MetadataResponse.
func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachTopic func(_ mappedMetadataTopic)) {
func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, intoMapped map[string]mappedMetadataTopic) {
cl.mappedMetaMu.Lock()
defer cl.mappedMetaMu.Unlock()
if cl.mappedMeta == nil {
Expand All @@ -2476,16 +2471,17 @@ func (cl *Client) storeCachedMappedMetadata(meta *kmsg.MetadataResponse, onEachT
t.ps[partition.Partition] = partition
}

if onEachTopic != nil {
onEachTopic(t)
if intoMapped != nil {
intoMapped[*t.t.Topic] = t
}
}
if len(meta.Topics) != len(cl.mappedMeta) {
now := time.Now()
for topic, mapped := range cl.mappedMeta {
if mapped.when.Equal(when) {
continue
}
if time.Since(mapped.when) > cl.cfg.metadataMinAge {
if now.Sub(mapped.when) > cl.cfg.metadataMinAge {
delete(cl.mappedMeta, topic)
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
metaTopics = append(metaTopics, topic)
}

_, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics)
_, resp, err := g.cl.fetchMetadataForTopics(g.ctx, false, metaTopics, nil)
if err != nil {
return nil, fmt.Errorf("unable to fetch metadata for group topics: %v", err)
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/kgo/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,16 +535,11 @@ func (mp metadataPartition) newPartition(cl *Client, isProduce bool) *topicParti
// fetchTopicMetadata fetches metadata for all reqTopics and returns new
// topicPartitionsData for each topic.
func (cl *Client) fetchTopicMetadata(all bool, reqTopics []string) (map[string]*metadataTopic, error) {
_, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics)
_, meta, err := cl.fetchMetadataForTopics(cl.ctx, all, reqTopics, nil)
if err != nil {
return nil, err
}

// Since we've fetched the metadata for some topics we can optimistically cache it
// for mapped metadata too. This may reduce the number of Metadata requests issued
// by the client.
cl.storeCachedMappedMetadata(meta, nil)

topics := make(map[string]*metadataTopic, len(meta.Topics))

// Even if metadata returns a leader epoch, we do not use it unless we
Expand Down

0 comments on commit 7945c00

Please sign in to comment.