Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mdbx: Return err early in iter.Next() #10078

Merged
merged 8 commits into from
Apr 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 47 additions & 39 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,6 @@ type cursor2iter struct {
tx *MdbxTx

fromPrefix, toPrefix, nextK, nextV []byte
err error
orderAscend order.By
limit int64
ctx context.Context
Expand Down Expand Up @@ -1998,44 +1997,51 @@ func (s *cursor2iter) init(table string, tx kv.Tx) (*cursor2iter, error) {

if s.fromPrefix == nil { // no initial position
if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.First()
s.nextK, s.nextV, err = s.c.First()
} else {
s.nextK, s.nextV, s.err = s.c.Last()
s.nextK, s.nextV, err = s.c.Last()
}
return s, s.err
return s, err
}

if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.Seek(s.fromPrefix)
return s, s.err
s.nextK, s.nextV, err = s.c.Seek(s.fromPrefix)
return s, err
} else {
// seek exactly to given key or previous one
s.nextK, s.nextV, s.err = s.c.SeekExact(s.fromPrefix)
if s.err != nil {
return s, s.err
s.nextK, s.nextV, err = s.c.SeekExact(s.fromPrefix)
if err != nil {
return s, err
}
if s.nextK != nil { // go to last value of this key
if casted, ok := s.c.(kv.CursorDupSort); ok {
s.nextV, s.err = casted.LastDup()
s.nextV, err = casted.LastDup()
}
} else { // key not found, go to prev one
s.nextK, s.nextV, s.err = s.c.Prev()
s.nextK, s.nextV, err = s.c.Prev()
}
return s, s.err
return s, err
}
}

func (s *cursor2iter) advance() (err error) {
if s.orderAscend {
s.nextK, s.nextV, err = s.c.Next()
} else {
s.nextK, s.nextV, err = s.c.Prev()
}
return err
}

func (s *cursor2iter) Close() {
if s.c != nil {
s.c.Close()
delete(s.tx.streams, s.id)
s.c = nil
}
}

func (s *cursor2iter) HasNext() bool {
if s.err != nil { // always true, then .Next() call will return this error
return true
}
if s.limit == 0 { // limit reached
return false
}
Expand All @@ -2051,20 +2057,19 @@ func (s *cursor2iter) HasNext() bool {
cmp := bytes.Compare(s.nextK, s.toPrefix)
return (bool(s.orderAscend) && cmp < 0) || (!bool(s.orderAscend) && cmp > 0)
}

func (s *cursor2iter) Next() (k, v []byte, err error) {
select {
case <-s.ctx.Done():
return nil, nil, s.ctx.Err()
default:
}
s.limit--
k, v, err = s.nextK, s.nextV, s.err
if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.Next()
} else {
s.nextK, s.nextV, s.err = s.c.Prev()
k, v = s.nextK, s.nextV
if err = s.advance(); err != nil {
return nil, nil, err
}
return k, v, err
return k, v, nil
}

func (tx *MdbxTx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error) {
Expand All @@ -2084,7 +2089,6 @@ type cursorDup2iter struct {

key []byte
fromPrefix, toPrefix, nextV []byte
err error
orderAscend bool
limit int64
ctx context.Context
Expand Down Expand Up @@ -2112,26 +2116,35 @@ func (s *cursorDup2iter) init(table string, tx kv.Tx) (*cursorDup2iter, error) {

if s.fromPrefix == nil { // no initial position
if s.orderAscend {
s.nextV, s.err = s.c.FirstDup()
s.nextV, err = s.c.FirstDup()
} else {
s.nextV, s.err = s.c.LastDup()
s.nextV, err = s.c.LastDup()
}
return s, s.err
return s, err
}

if s.orderAscend {
s.nextV, s.err = s.c.SeekBothRange(s.key, s.fromPrefix)
return s, s.err
s.nextV, err = s.c.SeekBothRange(s.key, s.fromPrefix)
return s, err
} else {
// seek exactly to given key or previous one
_, s.nextV, s.err = s.c.SeekBothExact(s.key, s.fromPrefix)
_, s.nextV, err = s.c.SeekBothExact(s.key, s.fromPrefix)
if s.nextV == nil { // no such key
_, s.nextV, s.err = s.c.PrevDup()
_, s.nextV, err = s.c.PrevDup()
}
return s, s.err
return s, err
}
}

func (s *cursorDup2iter) advance() (err error) {
if s.orderAscend {
_, s.nextV, err = s.c.NextDup()
} else {
_, s.nextV, err = s.c.PrevDup()
}
return err
}

func (s *cursorDup2iter) Close() {
if s.c != nil {
s.c.Close()
Expand All @@ -2140,9 +2153,6 @@ func (s *cursorDup2iter) Close() {
}
}
func (s *cursorDup2iter) HasNext() bool {
if s.err != nil { // always true, then .Next() call will return this error
return true
}
if s.limit == 0 { // limit reached
return false
}
Expand All @@ -2165,13 +2175,11 @@ func (s *cursorDup2iter) Next() (k, v []byte, err error) {
default:
}
s.limit--
v, err = s.nextV, s.err
if s.orderAscend {
_, s.nextV, s.err = s.c.NextDup()
} else {
_, s.nextV, s.err = s.c.PrevDup()
v = s.nextV
if err = s.advance(); err != nil {
return nil, nil, err
}
return s.key, v, err
return s.key, v, nil
}

func (tx *MdbxTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
Expand Down
Loading