Skip to content

Commit

Permalink
kversion: add FromApiVersions, rename some methods; kgo: fix api change
Browse files Browse the repository at this point in the history
Since this changes not-yet-released APIs, this is fine: we rename some
methods to be future compatible for Min methods.

Adds FromApiVersionsResponse to allow quickly parsing an ApiVersions
response, which quickly allows for checking and setting keys.
  • Loading branch information
twmb committed Jan 13, 2021
1 parent 303186a commit 2493ae7
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 23 deletions.
4 changes: 2 additions & 2 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (b *broker) handleReqs() {

ourMax := req.MaxVersion()
if b.cl.cfg.maxVersions != nil {
userMax, _ := b.cl.cfg.maxVersions.LookupVersion(req.Key()) // we validated HasKey above
userMax, _ := b.cl.cfg.maxVersions.LookupMaxKeyVersion(req.Key()) // we validated HasKey above
if userMax < ourMax {
ourMax = userMax
}
Expand All @@ -264,7 +264,7 @@ func (b *broker) handleReqs() {
// lower than we desire, we fail the request for the broker is
// too old.
if b.cl.cfg.minVersions != nil {
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupVersion(req.Key())
minVersion, minVersionExists := b.cl.cfg.minVersions.LookupMaxKeyVersion(req.Key())
if minVersionExists && version < minVersion {
pr.promise(nil, ErrBrokerTooOld)
continue
Expand Down
59 changes: 38 additions & 21 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ import (

// Versions is a list of versions, with each item corresponding to a Kafka key
// and each item's value corresponding to the max version supported.
//
// Minimum versions are not currently tracked because all keys have a minimum
// version of zero. The internals of a Versions may change in the future to
// support minimum versions; the outward facing API of Versions should not
// change to support this.
//
// As well, supported features may be added in the future.
type Versions struct {
// If any version is -1, then it is left out in that version.
// This was first done in version 2.7.0, where Kafka added support
Expand All @@ -22,15 +29,24 @@ type Versions struct {
k2v []int16
}

// FromApiVersionsResponse returns a Versions from a kmsg.ApiVersionsResponse.
func FromApiVersionsResponse(r *kmsg.ApiVersionsResponse) Versions {
var v Versions
for _, key := range r.ApiKeys {
v.SetMaxKeyVersion(key.ApiKey, key.MaxVersion)
}
return v
}

// HasKey returns true if the versions contains the given key.
func (vs Versions) HasKey(k int16) bool {
_, has := vs.LookupVersion(k)
_, has := vs.LookupMaxKeyVersion(k)
return has
}

// LookupVersion returns the version for the given key and whether the key
// exists. If the key does not exist, this returns (-1, false).
func (vs Versions) LookupVersion(k int16) (int16, bool) {
// LookupMaxKeyVersion returns the version for the given key and whether the
// key exists. If the key does not exist, this returns (-1, false).
func (vs Versions) LookupMaxKeyVersion(k int16) (int16, bool) {
if k < 0 {
return -1, false
}
Expand All @@ -44,6 +60,23 @@ func (vs Versions) LookupVersion(k int16) (int16, bool) {
return version, true
}

// SetMaxKeyVersion sets the max version for the given key.
//
// Setting a version to -1 unsets the key.
//
// Versions are backed by a slice; if the slice is not long enough, it is
// extended to fit the key.
func (vs *Versions) SetMaxKeyVersion(k, v int16) {
if k < 0 {
return
}
needLen := int(k + 1)
for len(vs.k2v) < needLen {
vs.k2v = append(vs.k2v, -1)
}
vs.k2v[k] = v
}

// Returns whether two versions are equal.
func (vs Versions) Equal(other Versions) bool {
// We allow the version slices to be of different lengths, so long as
Expand All @@ -67,23 +100,7 @@ func (vs Versions) Equal(other Versions) bool {
return true
}

// SetKeyVersion sets the version for the given key.
//
// Setting a version to -1 unsets the key.
//
// Versions are backed by a slice; if the slice is not long enough, it is
// extended to fit the key.
func (vs *Versions) SetKeyVersion(k, v int16) {
if k < 0 {
return
}
needLen := int(k + 1)
for len(vs.k2v) < needLen {
vs.k2v = append(vs.k2v, -1)
}
vs.k2v[k] = v
}

// Returns a string representation of the versions; the format may change.
func (vs Versions) String() string {
var buf bytes.Buffer
w := tabwriter.NewWriter(&buf, 0, 0, 2, ' ', 0)
Expand Down

0 comments on commit 2493ae7

Please sign in to comment.