Skip to content

Commit

Permalink
Add iteration to etcd backend
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Feb 14, 2025
1 parent a5a57d6 commit 2d169d0
Showing 1 changed file with 63 additions and 24 deletions.
87 changes: 63 additions & 24 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"crypto/x509"
"encoding/base64"
"errors"
"iter"
"log/slog"
"os"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -649,6 +649,60 @@ func (b *EtcdBackend) NewWatcher(ctx context.Context, watch backend.Watch) (back
return b.buf.NewWatcher(ctx, watch)
}

func (b *EtcdBackend) Iterate(ctx context.Context, startKey, endKey backend.Key, limit int, order backend.IterationOrder) (iter.Seq2[backend.Item, error], error) {
if startKey.IsZero() {
return nil, trace.BadParameter("missing parameter startKey")
}
if endKey.IsZero() {
return nil, trace.BadParameter("missing parameter endKey")
}

sort := clientv3.SortAscend
if order == backend.IterateDescending {
sort = clientv3.SortDescend
}

return func(yield func(backend.Item, error) bool) {
// etcd's range query includes the start point and excludes the end point,
// but Backend.GetRange is supposed to be inclusive at both ends, so we
// query until the very next key in lexicographic order (i.e., the same key
// followed by a 0 byte)
opts := []clientv3.OpOption{
clientv3.WithRange(b.prependPrefix(endKey) + "\x00"),
clientv3.WithSort(clientv3.SortByKey, sort),
}
if limit > 0 {
opts = append(opts, clientv3.WithLimit(int64(limit)))
}
start := b.clock.Now()
re, err := b.clients.Next().Get(ctx, b.prependPrefix(startKey), opts...)
batchReadLatencies.Observe(time.Since(start).Seconds())
batchReadRequests.Inc()
if err := convertErr(err); err != nil {
if !yield(backend.Item{}, trace.Wrap(err)) {
return
}
}

for _, kv := range re.Kvs {
value, err := unmarshal(kv.Value)
if err != nil {
if !yield(backend.Item{}, trace.Wrap(err)) {
return
}
}

if !yield(backend.Item{
Key: b.trimPrefix(kv.Key),
Value: value,
Revision: toBackendRevision(kv.ModRevision),
}, nil) {
return
}
}
}, nil
}

// GetRange returns query range
func (b *EtcdBackend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
if startKey.IsZero() {
Expand All @@ -657,35 +711,20 @@ func (b *EtcdBackend) GetRange(ctx context.Context, startKey, endKey backend.Key
if endKey.IsZero() {
return nil, trace.BadParameter("missing parameter endKey")
}
// etcd's range query includes the start point and excludes the end point,
// but Backend.GetRange is supposed to be inclusive at both ends, so we
// query until the very next key in lexicographic order (i.e., the same key
// followed by a 0 byte)
opts := []clientv3.OpOption{clientv3.WithRange(b.prependPrefix(endKey) + "\x00")}
if limit > 0 {
opts = append(opts, clientv3.WithLimit(int64(limit)))
}
start := b.clock.Now()
re, err := b.clients.Next().Get(ctx, b.prependPrefix(startKey), opts...)
batchReadLatencies.Observe(time.Since(start).Seconds())
batchReadRequests.Inc()
if err := convertErr(err); err != nil {

iter, err := b.Iterate(ctx, startKey, endKey, limit, backend.IterateAscending)
if err != nil {
return nil, trace.Wrap(err)
}
items := make([]backend.Item, 0, len(re.Kvs))
for _, kv := range re.Kvs {
value, err := unmarshal(kv.Value)

var result backend.GetResult
for item, err := range iter {
if err != nil {
return nil, trace.Wrap(err)
}
items = append(items, backend.Item{
Key: b.trimPrefix(kv.Key),
Value: value,
Revision: toBackendRevision(kv.ModRevision),
})
result.Items = append(result.Items, item)
}
sort.Sort(backend.Items(items))
return &backend.GetResult{Items: items}, nil
return &result, nil
}

func toBackendRevision(rev int64) string {
Expand Down

0 comments on commit 2d169d0

Please sign in to comment.