Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: support IPROTO_WATCH_ONCE request type #340

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support `fetch_latest_metadata` option for crud requests with metadata (#335)
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
- Support `noreturn` option for data change crud requests (#335)
- Support `crud.schema` request (#336)
- Support `IPROTO_WATCH_ONCE` request type for Tarantool
version >= 3.0.0-alpha1 (#337)

### Changed

Expand All @@ -38,6 +40,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
`pool.Connect` and `pool.Add` now accept context as first argument, which
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved
user may cancel in process. If `pool.Connect` is canceled in progress, an
error will be returned. All created connections will be closed.
- `iproto.Feature` type now used instead of `ProtocolFeature` (#337)
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
- `iproto.IPROTO_FEATURE_` constants now used instead of local `Feature`
constants for `protocol` (#337)

### Deprecated

Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

#### Protocol changes

* `iproto.Feature` type used instead of `ProtocolFeature`.
* `iproto.IPROTO_FEATURE_` constants used instead of local ones.

## Contributing

See [the contributing guide](CONTRIBUTING.md) for detailed instructions on how
Expand Down
14 changes: 8 additions & 6 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,8 @@ func (conn *Connection) dial(ctx context.Context) error {

// Subscribe shutdown event to process graceful shutdown.
if conn.shutdownWatcher == nil &&
isFeatureInSlice(WatchersFeature, conn.serverProtocolInfo.Features) {
isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.serverProtocolInfo.Features) {
watcher, werr := conn.newWatcherImpl(shutdownEventKey, shutdownEventCallback)
if werr != nil {
return werr
Expand Down Expand Up @@ -1425,7 +1426,7 @@ func subscribeWatchChannel(conn *Connection, key string) (chan watchState, error
return st, nil
}

func isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) bool {
func isFeatureInSlice(expected iproto.Feature, actualSlice []iproto.Feature) bool {
for _, actual := range actualSlice {
if expected == actual {
return true
Expand All @@ -1436,8 +1437,8 @@ func isFeatureInSlice(expected ProtocolFeature, actualSlice []ProtocolFeature) b

// NewWatcher creates a new Watcher object for the connection.
//
// You need to require WatchersFeature to use watchers, see examples for the
// function.
// You need to require IPROTO_FEATURE_WATCHERS to use watchers, see examples
// for the function.
//
// After watcher creation, the watcher callback is invoked for the first time.
// In this case, the callback is triggered whether or not the key has already
Expand Down Expand Up @@ -1472,9 +1473,10 @@ func (conn *Connection) NewWatcher(key string, callback WatchCallback) (Watcher,
// asynchronous. We do not expect any response from a Tarantool instance
// That's why we can't just check the Tarantool response for an unsupported
// request error.
if !isFeatureInSlice(WatchersFeature, conn.opts.RequiredProtocolInfo.Features) {
if !isFeatureInSlice(iproto.IPROTO_FEATURE_WATCHERS,
conn.opts.RequiredProtocolInfo.Features) {
err := fmt.Errorf("the feature %s must be required by connection "+
"options to create a watcher", WatchersFeature)
"options to create a watcher", iproto.IPROTO_FEATURE_WATCHERS)
return nil, err
}

Expand Down
7 changes: 4 additions & 3 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/stretchr/testify/require"
"github.com/tarantool/go-iproto"

. "github.com/tarantool/go-tarantool/v2"
)
Expand All @@ -12,20 +13,20 @@ func TestOptsClonePreservesRequiredProtocolFeatures(t *testing.T) {
original := Opts{
RequiredProtocolInfo: ProtocolInfo{
Version: ProtocolVersion(100),
Features: []ProtocolFeature{ProtocolFeature(99), ProtocolFeature(100)},
Features: []iproto.Feature{iproto.Feature(99), iproto.Feature(100)},
},
}

origCopy := original.Clone()

original.RequiredProtocolInfo.Features[1] = ProtocolFeature(98)
original.RequiredProtocolInfo.Features[1] = iproto.Feature(98)

require.Equal(t,
origCopy,
Opts{
RequiredProtocolInfo: ProtocolInfo{
Version: ProtocolVersion(100),
Features: []ProtocolFeature{ProtocolFeature(99), ProtocolFeature(100)},
Features: []iproto.Feature{iproto.Feature(99), iproto.Feature(100)},
},
})
}
9 changes: 5 additions & 4 deletions dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tarantool/go-iproto"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/test_helpers"
Expand Down Expand Up @@ -72,8 +73,8 @@ func TestDialer_Dial_passedOpts(t *testing.T) {
RequiredProtocol: tarantool.ProtocolInfo{
Auth: tarantool.ChapSha1Auth,
Version: 33,
Features: []tarantool.ProtocolFeature{
tarantool.ErrorExtensionFeature,
Features: []iproto.Feature{
iproto.IPROTO_FEATURE_ERROR_EXTENSION,
},
},
Auth: tarantool.ChapSha1Auth,
Expand Down Expand Up @@ -302,8 +303,8 @@ func TestConn_ProtocolInfo(t *testing.T) {
info := tarantool.ProtocolInfo{
Auth: tarantool.ChapSha1Auth,
Version: 33,
Features: []tarantool.ProtocolFeature{
tarantool.ErrorExtensionFeature,
Features: []iproto.Feature{
iproto.IPROTO_FEATURE_ERROR_EXTENSION,
},
}
conn, dialer := dialIo(t, func(conn *mockIoConn) {
Expand Down
46 changes: 36 additions & 10 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/tarantool/go-iproto"

"github.com/tarantool/go-tarantool/v2"
"github.com/tarantool/go-tarantool/v2/test_helpers"
)
Expand Down Expand Up @@ -626,12 +628,13 @@ func ExampleProtocolVersion() {
fmt.Println("Connector client protocol feature:", f)
}
// Output:
// Connector client protocol version: 4
// Connector client protocol feature: StreamsFeature
// Connector client protocol feature: TransactionsFeature
// Connector client protocol feature: ErrorExtensionFeature
// Connector client protocol feature: WatchersFeature
// Connector client protocol feature: PaginationFeature
// Connector client protocol version: 6
// Connector client protocol feature: IPROTO_FEATURE_STREAMS
// Connector client protocol feature: IPROTO_FEATURE_TRANSACTIONS
// Connector client protocol feature: IPROTO_FEATURE_ERROR_EXTENSION
// Connector client protocol feature: IPROTO_FEATURE_WATCHERS
// Connector client protocol feature: IPROTO_FEATURE_PAGINATION
// Connector client protocol feature: IPROTO_FEATURE_WATCH_ONCE
}

func getTestTxnOpts() tarantool.Opts {
Expand All @@ -640,9 +643,9 @@ func getTestTxnOpts() tarantool.Opts {
// Assert that server supports expected protocol features
txnOpts.RequiredProtocolInfo = tarantool.ProtocolInfo{
Version: tarantool.ProtocolVersion(1),
Features: []tarantool.ProtocolFeature{
tarantool.StreamsFeature,
tarantool.TransactionsFeature,
Features: []iproto.Feature{
iproto.IPROTO_FEATURE_STREAMS,
iproto.IPROTO_FEATURE_TRANSACTIONS,
},
}

Expand Down Expand Up @@ -1167,7 +1170,7 @@ func ExampleConnection_NewWatcher() {
Pass: "test",
// You need to require the feature to create a watcher.
RequiredProtocolInfo: tarantool.ProtocolInfo{
Features: []tarantool.ProtocolFeature{tarantool.WatchersFeature},
Features: []iproto.Feature{iproto.IPROTO_FEATURE_WATCHERS},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
Expand Down Expand Up @@ -1230,3 +1233,26 @@ func ExampleConnection_CloseGraceful_force() {
// Result:
// <nil> connection closed by client (0x4001)
}

func ExampleWatchOnceRequest() {
const key = "foo"
const value = "bar"

// WatchOnce request present in Tarantool since version 3.0
isLess, err := test_helpers.IsTarantoolVersionLess(3, 0, 0)
if err != nil || isLess {
return
}

conn := exampleConnect(opts)
defer conn.Close()

conn.Do(tarantool.NewBroadcastRequest(key).Value(value)).Get()

resp, err := conn.Do(tarantool.NewWatchOnceRequest(key)).Get()
if err != nil {
fmt.Printf("Failed to execute the request: %s\n", err)
} else {
fmt.Println(resp.Data)
}
}
6 changes: 6 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,9 @@ func RefImplRollbackBody(enc *msgpack.Encoder) error {
func RefImplIdBody(enc *msgpack.Encoder, protocolInfo ProtocolInfo) error {
return fillId(enc, protocolInfo)
}

// RefImplWatchOnceBody is reference implementation for filling of an watchOnce
// request's body.
func RefImplWatchOnceBody(enc *msgpack.Encoder, key string) error {
return fillWatchOnce(enc, key)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/shopspring/decimal v1.3.1
github.com/stretchr/testify v1.7.1
github.com/tarantool/go-iproto v0.1.0
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca
github.com/vmihailenco/msgpack/v5 v5.3.5
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tarantool/go-iproto v0.1.0 h1:zHN9AA8LDawT+JBD0/Nxgr/bIsWkkpDzpcMuaNPSIAQ=
github.com/tarantool/go-iproto v0.1.0/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931 h1:YrsRc1sDZ6HOZccvM2eJ3Nu2TMBq7NMZMsaT5KCu5qU=
github.com/tarantool/go-iproto v0.1.1-0.20231025103136-cb7894473931/go.mod h1:LNCtdyZxojUed8SbOiYHoc3v9NvaZTB7p96hUySMlIo=
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca h1:oOrBh73tDDyooIXajfr+0pfnM+89404ClAhJpTTHI7E=
github.com/tarantool/go-openssl v0.0.8-0.20231004103608-336ca939d2ca/go.mod h1:M7H4xYSbzqpW/ZRBMyH0eyqQBsnhAMfsYk5mv0yid7A=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
Expand Down
12 changes: 7 additions & 5 deletions pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"sync"
"time"

"github.com/tarantool/go-iproto"

"github.com/tarantool/go-tarantool/v2"
)

Expand Down Expand Up @@ -911,8 +913,8 @@ func (p *ConnectionPool) NewPrepared(expr string, userMode Mode) (*tarantool.Pre

// NewWatcher creates a new Watcher object for the connection pool.
//
// You need to require WatchersFeature to use watchers, see examples for the
// function.
// You need to require IPROTO_FEATURE_WATCHERS to use watchers, see examples
// for the function.
//
// The behavior is same as if Connection.NewWatcher() called for each
// connection with a suitable role.
Expand All @@ -932,14 +934,14 @@ func (p *ConnectionPool) NewWatcher(key string,
callback tarantool.WatchCallback, mode Mode) (tarantool.Watcher, error) {
watchersRequired := false
for _, feature := range p.connOpts.RequiredProtocolInfo.Features {
if tarantool.WatchersFeature == feature {
if iproto.IPROTO_FEATURE_WATCHERS == feature {
watchersRequired = true
break
}
}
if !watchersRequired {
return nil, errors.New("the feature WatchersFeature must be " +
"required by connection options to create a watcher")
return nil, errors.New("the feature IPROTO_FEATURE_WATCHERS must " +
"be required by connection options to create a watcher")
}

watcher := &poolWatcher{
Expand Down
27 changes: 14 additions & 13 deletions pool/connection_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"

"github.com/tarantool/go-tarantool/v2"
Expand Down Expand Up @@ -2832,7 +2833,7 @@ func TestConnectionPool_NewWatcher_noWatchersFeature(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{}
opts.RequiredProtocolInfo.Features = []iproto.Feature{}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")

Expand All @@ -2847,8 +2848,8 @@ func TestConnectionPool_NewWatcher_noWatchersFeature(t *testing.T) {
func(event tarantool.WatchEvent) {}, pool.ANY)
require.Nilf(t, watcher, "watcher must not be created")
require.NotNilf(t, err, "an error is expected")
expected := "the feature WatchersFeature must be required by connection " +
"options to create a watcher"
expected := "the feature IPROTO_FEATURE_WATCHERS must be required by " +
"connection options to create a watcher"
require.Equal(t, expected, err.Error())
}

Expand All @@ -2860,8 +2861,8 @@ func TestConnectionPool_NewWatcher_modes(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{
tarantool.WatchersFeature,
opts.RequiredProtocolInfo.Features = []iproto.Feature{
iproto.IPROTO_FEATURE_WATCHERS,
}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")
Expand Down Expand Up @@ -2941,8 +2942,8 @@ func TestConnectionPool_NewWatcher_update(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{
tarantool.WatchersFeature,
opts.RequiredProtocolInfo.Features = []iproto.Feature{
iproto.IPROTO_FEATURE_WATCHERS,
}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")
Expand Down Expand Up @@ -3030,8 +3031,8 @@ func TestWatcher_Unregister(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{
tarantool.WatchersFeature,
opts.RequiredProtocolInfo.Features = []iproto.Feature{
iproto.IPROTO_FEATURE_WATCHERS,
}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")
Expand Down Expand Up @@ -3091,8 +3092,8 @@ func TestConnectionPool_NewWatcher_concurrent(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{
tarantool.WatchersFeature,
opts.RequiredProtocolInfo.Features = []iproto.Feature{
iproto.IPROTO_FEATURE_WATCHERS,
}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")
Expand Down Expand Up @@ -3133,8 +3134,8 @@ func TestWatcher_Unregister_concurrent(t *testing.T) {
roles := []bool{true, false, false, true, true}

opts := connOpts.Clone()
opts.RequiredProtocolInfo.Features = []tarantool.ProtocolFeature{
tarantool.WatchersFeature,
opts.RequiredProtocolInfo.Features = []iproto.Feature{
iproto.IPROTO_FEATURE_WATCHERS,
}
err := test_helpers.SetClusterRO(servers, opts, roles)
require.Nilf(t, err, "fail to set roles for cluster")
Expand Down
Loading
Loading