Skip to content

Commit

Permalink
api: add pagination support
Browse files Browse the repository at this point in the history
A user could fetch a position of a last tuple using a new method
of the SelectRequest type:

selectRequest = selectRequest.FetchPos(true)

The position will be stored in a new field of the Response type:

Response.Pos

A user could specify a tuple from which selection must continue or
its position with a new method of the SelectRequest type:

selectRequest = selectRequest.After([]interface{}{23})

or

selectRequest = selectRequest.After(resp.Pos)

In action it looks like:

req := NewSelectRequest(space).Key(key).Limit(10).FetchPos(true)
for condition {
    resp, err := conn.Do(req).Get()
    // ...
    req = req.After(resp.Pos)
}

1. tarantool/tarantool#7639

Part of #246
  • Loading branch information
oleg-jukovec committed Jan 13, 2023
1 parent c973b8e commit d792580
Show file tree
Hide file tree
Showing 10 changed files with 406 additions and 73 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support pagination (#246)

### Changed

### Fixed
Expand Down
4 changes: 4 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,20 @@ const (
KeyLimit = 0x12
KeyOffset = 0x13
KeyIterator = 0x14
KeyFetchPos = 0x1f
KeyKey = 0x20
KeyTuple = 0x21
KeyFunctionName = 0x22
KeyUserName = 0x23
KeyExpression = 0x27
KeyAfterPos = 0x2e
KeyAfterTuple = 0x2f
KeyDefTuple = 0x28
KeyData = 0x30
KeyError24 = 0x31 /* Error in pre-2.4 format. */
KeyMetaData = 0x32
KeyBindCount = 0x34
KeyPos = 0x35
KeySQLText = 0x40
KeySQLBind = 0x41
KeySQLInfo = 0x42
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func ExampleProtocolVersion() {
fmt.Println("Connector client protocol features:", clientProtocolInfo.Features)
// Output:
// Connector client protocol version: 4
// Connector client protocol features: [StreamsFeature TransactionsFeature ErrorExtensionFeature WatchersFeature]
// Connector client protocol features: [StreamsFeature TransactionsFeature ErrorExtensionFeature WatchersFeature PaginationFeature]
}

func getTestTxnOpts() tarantool.Opts {
Expand Down
5 changes: 3 additions & 2 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func RefImplPingBody(enc *encoder) error {

// RefImplSelectBody is reference implementation for filling of a select
// request's body.
func RefImplSelectBody(enc *encoder, space, index, offset, limit, iterator uint32, key interface{}) error {
return fillSelect(enc, space, index, offset, limit, iterator, key)
func RefImplSelectBody(enc *encoder, space, index, offset, limit, iterator uint32,
key, after interface{}, fetchPos bool) error {
return fillSelect(enc, space, index, offset, limit, iterator, key, after, fetchPos)
}

// RefImplInsertBody is reference implementation for filling of an insert
Expand Down
5 changes: 4 additions & 1 deletion protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
// (supported by connector).
WatchersFeature ProtocolFeature = 3
// PaginationFeature represents support of pagination
// (unsupported by connector).
// (supported by connector).
PaginationFeature ProtocolFeature = 4
)

Expand Down Expand Up @@ -83,11 +83,14 @@ var clientProtocolInfo ProtocolInfo = ProtocolInfo{
// version 2 (Tarantool 2.10.0), in connector since 1.10.0.
// Watchers were introduced in protocol version 3 (Tarantool 2.10.0), in
// connector since 1.10.0.
// Pagination were introduced in protocol version 4 (Tarantool 2.11.0), in
// connector since 1.11.0.
Features: []ProtocolFeature{
StreamsFeature,
TransactionsFeature,
ErrorExtensionFeature,
WatchersFeature,
PaginationFeature,
},
}

Expand Down
141 changes: 118 additions & 23 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,35 +10,103 @@ import (
)

func fillSearch(enc *encoder, spaceNo, indexNo uint32, key interface{}) error {
encodeUint(enc, KeySpaceNo)
encodeUint(enc, uint64(spaceNo))
encodeUint(enc, KeyIndexNo)
encodeUint(enc, uint64(indexNo))
encodeUint(enc, KeyKey)
if err := encodeUint(enc, KeySpaceNo); err != nil {
return err
}
if err := encodeUint(enc, uint64(spaceNo)); err != nil {
return err
}
if err := encodeUint(enc, KeyIndexNo); err != nil {
return err
}
if err := encodeUint(enc, uint64(indexNo)); err != nil {
return err
}
if err := encodeUint(enc, KeyKey); err != nil {
return err
}
return enc.Encode(key)
}

func fillIterator(enc *encoder, offset, limit, iterator uint32) {
encodeUint(enc, KeyIterator)
encodeUint(enc, uint64(iterator))
encodeUint(enc, KeyOffset)
encodeUint(enc, uint64(offset))
encodeUint(enc, KeyLimit)
encodeUint(enc, uint64(limit))
func fillIterator(enc *encoder, offset, limit, iterator uint32) error {
if err := encodeUint(enc, KeyIterator); err != nil {
return err
}
if err := encodeUint(enc, uint64(iterator)); err != nil {
return err
}
if err := encodeUint(enc, KeyOffset); err != nil {
return err
}
if err := encodeUint(enc, uint64(offset)); err != nil {
return err
}
if err := encodeUint(enc, KeyLimit); err != nil {
return err
}
return encodeUint(enc, uint64(limit))
}

func fillInsert(enc *encoder, spaceNo uint32, tuple interface{}) error {
enc.EncodeMapLen(2)
encodeUint(enc, KeySpaceNo)
encodeUint(enc, uint64(spaceNo))
encodeUint(enc, KeyTuple)
if err := enc.EncodeMapLen(2); err != nil {
return err
}
if err := encodeUint(enc, KeySpaceNo); err != nil {
return err
}
if err := encodeUint(enc, uint64(spaceNo)); err != nil {
return err
}
if err := encodeUint(enc, KeyTuple); err != nil {
return err
}
return enc.Encode(tuple)
}

func fillSelect(enc *encoder, spaceNo, indexNo, offset, limit, iterator uint32, key interface{}) error {
enc.EncodeMapLen(6)
fillIterator(enc, offset, limit, iterator)
return fillSearch(enc, spaceNo, indexNo, key)
func fillSelect(enc *encoder, spaceNo, indexNo, offset, limit, iterator uint32,
key, after interface{}, fetchPos bool) error {
mapLen := 6
if fetchPos {
mapLen += 1
}
if after != nil {
mapLen += 1
}
if err := enc.EncodeMapLen(mapLen); err != nil {
return err
}
if err := fillIterator(enc, offset, limit, iterator); err != nil {
return err
}
if err := fillSearch(enc, spaceNo, indexNo, key); err != nil {
return err
}
if fetchPos {
if err := encodeUint(enc, KeyFetchPos); err != nil {
return err
}
if err := enc.EncodeBool(fetchPos); err != nil {
return err
}
}
if after != nil {
if pos, ok := after.([]byte); ok {
if err := encodeUint(enc, KeyAfterPos); err != nil {
return err
}
if err := enc.EncodeString(string(pos)); err != nil {
return err
}
} else {
if err := encodeUint(enc, KeyAfterTuple); err != nil {
return err
}
if err := enc.Encode(after); err != nil {
return err
}
}
}
return nil
}

func fillUpdate(enc *encoder, spaceNo, indexNo uint32, key, ops interface{}) error {
Expand Down Expand Up @@ -660,9 +728,9 @@ func (req *PingRequest) Context(ctx context.Context) *PingRequest {
// by a Connection.
type SelectRequest struct {
spaceIndexRequest
isIteratorSet bool
isIteratorSet, fetchPos bool
offset, limit, iterator uint32
key interface{}
key, after interface{}
}

// NewSelectRequest returns a new empty SelectRequest.
Expand All @@ -671,8 +739,10 @@ func NewSelectRequest(space interface{}) *SelectRequest {
req.requestCode = SelectRequestCode
req.setSpace(space)
req.isIteratorSet = false
req.fetchPos = false
req.iterator = IterAll
req.key = []interface{}{}
req.after = nil
req.limit = 0xFFFFFFFF
return req
}
Expand Down Expand Up @@ -716,14 +786,39 @@ func (req *SelectRequest) Key(key interface{}) *SelectRequest {
return req
}

// FetchPos determines whether to fetch positions of the last tuple. A position
// descriptor will be saved in Response.Pos value.
//
// Note: default value is false.
//
// Requires Tarantool >= 2.11.
// Since 1.11.0
func (req *SelectRequest) FetchPos(fetch bool) *SelectRequest {
req.fetchPos = fetch
return req
}

// After must contain a tuple from which selection must continue or its
// position (a value from Response.Pos).
//
// Note: default value in nil.
//
// Requires Tarantool >= 2.11.
// Since 1.11.0
func (req *SelectRequest) After(after interface{}) *SelectRequest {
req.after = after
return req
}

// Body fills an encoder with the select request body.
func (req *SelectRequest) Body(res SchemaResolver, enc *encoder) error {
spaceNo, indexNo, err := res.ResolveSpaceIndex(req.space, req.index)
if err != nil {
return err
}

return fillSelect(enc, spaceNo, indexNo, req.offset, req.limit, req.iterator, req.key)
return fillSelect(enc, spaceNo, indexNo, req.offset, req.limit, req.iterator,
req.key, req.after, req.fetchPos)
}

// Context sets a passed context to the request.
Expand Down
46 changes: 36 additions & 10 deletions request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,8 @@ func TestSelectRequestDefaultValues(t *testing.T) {
var refBuf bytes.Buffer

refEnc := NewEncoder(&refBuf)
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF, IterAll, []interface{}{})
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF,
IterAll, []interface{}{}, nil, false)
if err != nil {
t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error())
return
Expand All @@ -334,7 +335,8 @@ func TestSelectRequestDefaultIteratorEqIfKey(t *testing.T) {
key := []interface{}{uint(18)}

refEnc := NewEncoder(&refBuf)
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF, IterEq, key)
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF,
IterEq, key, nil, false)
if err != nil {
t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error())
return
Expand All @@ -351,7 +353,8 @@ func TestSelectRequestIteratorNotChangedIfKey(t *testing.T) {
const iter = IterGe

refEnc := NewEncoder(&refBuf)
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF, iter, key)
err := RefImplSelectBody(refEnc, validSpace, defaultIndex, 0, 0xFFFFFFFF,
iter, key, nil, false)
if err != nil {
t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error())
return
Expand All @@ -368,22 +371,45 @@ func TestSelectRequestSetters(t *testing.T) {
const limit = 5
const iter = IterLt
key := []interface{}{uint(36)}
var refBuf bytes.Buffer
afterBytes := []byte{0x1, 0x2, 0x3}
afterKey := []interface{}{uint(13)}
var refBufAfterBytes, refBufAfterKey bytes.Buffer

refEnc := NewEncoder(&refBuf)
err := RefImplSelectBody(refEnc, validSpace, validIndex, offset, limit, iter, key)
refEncAfterBytes := NewEncoder(&refBufAfterBytes)
err := RefImplSelectBody(refEncAfterBytes, validSpace, validIndex, offset,
limit, iter, key, afterBytes, true)
if err != nil {
t.Errorf("An unexpected RefImplSelectBody() error %q", err.Error())
t.Errorf("An unexpected RefImplSelectBody() error %s", err)
return
}

req := NewSelectRequest(validSpace).
refEncAfterKey := NewEncoder(&refBufAfterKey)
err = RefImplSelectBody(refEncAfterKey, validSpace, validIndex, offset,
limit, iter, key, afterKey, true)
if err != nil {
t.Errorf("An unexpected RefImplSelectBody() error %s", err)
return
}

reqAfterBytes := NewSelectRequest(validSpace).
Index(validIndex).
Offset(offset).
Limit(limit).
Iterator(iter).
Key(key)
assertBodyEqual(t, refBuf.Bytes(), req)
Key(key).
After(afterBytes).
FetchPos(true)
reqAfterKey := NewSelectRequest(validSpace).
Index(validIndex).
Offset(offset).
Limit(limit).
Iterator(iter).
Key(key).
After(afterKey).
FetchPos(true)

assertBodyEqual(t, refBufAfterBytes.Bytes(), reqAfterBytes)
assertBodyEqual(t, refBufAfterKey.Bytes(), reqAfterKey)
}

func TestInsertRequestDefaultValues(t *testing.T) {
Expand Down
17 changes: 14 additions & 3 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
type Response struct {
RequestId uint32
Code uint32
Error string // error message
// Data contains deserialized data for untyped requests
Data []interface{}
// Error contains an error message.
Error string
// Data contains deserialized data for untyped requests.
Data []interface{}
// Pos contains a position descriptor of last selected tuple.
Pos []byte
MetaData []ColumnMetaData
SQLInfo SQLInfo
buf smallBuf
Expand Down Expand Up @@ -228,6 +231,10 @@ func (resp *Response) decodeBody() (err error) {
if !found {
return fmt.Errorf("unknown auth type %s", auth)
}
case KeyPos:
if resp.Pos, err = d.DecodeBytes(); err != nil {
return fmt.Errorf("unable to decode a position: %w", err)
}
default:
if err = d.Skip(); err != nil {
return err
Expand Down Expand Up @@ -300,6 +307,10 @@ func (resp *Response) decodeBodyTyped(res interface{}) (err error) {
if err = d.Decode(&resp.MetaData); err != nil {
return err
}
case KeyPos:
if resp.Pos, err = d.DecodeBytes(); err != nil {
return fmt.Errorf("unable to decode a position: %w", err)
}
default:
if err = d.Skip(); err != nil {
return err
Expand Down
Loading

0 comments on commit d792580

Please sign in to comment.