From 9379fb716ab10a08dbd0c6591595e5a484c7893c Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 9 Mar 2022 13:07:32 +0300 Subject: [PATCH] connection-pool: implement connection pool with master discovery Main features: - Return available connection from pool according to round-robin strategy. - Automatic master discovery by `mode` parameter. Additional options (configurable via `ConnectWithOpts`): * `CheckTimeout` - time interval to check for connection timeout and try to switch connection `Mode` parameter: * `ANY` (use any instance) - the request can be executed on any instance (master or replica). * `RW` (writeable instance (master)) - the request can only be executed on master. * `RO` (read only instance (replica)) - the request can only be executed on replica. * `PREFER_RO` (prefer read only instance (replica)) - if there is one, otherwise fallback to a writeable one (master). * `PREFER_RW` (prefer write only instance (master)) - if there is one, otherwise fallback to a read only one (replica). Closes #113 --- CHANGELOG.md | 1 + Makefile | 6 + connection_pool/config.lua | 35 + connection_pool/connection_pool.go | 732 +++++++++++++ connection_pool/connection_pool_test.go | 1293 +++++++++++++++++++++++ connection_pool/const.go | 52 + connection_pool/example_test.go | 531 ++++++++++ connection_pool/round_robin.go | 117 ++ go.mod | 1 + go.sum | 10 + request.go | 5 + test_helpers/main.go | 24 +- test_helpers/pool_helper.go | 248 +++++ 13 files changed, 3050 insertions(+), 5 deletions(-) create mode 100644 connection_pool/config.lua create mode 100644 connection_pool/connection_pool.go create mode 100644 connection_pool/connection_pool_test.go create mode 100644 connection_pool/const.go create mode 100644 connection_pool/example_test.go create mode 100644 connection_pool/round_robin.go create mode 100644 test_helpers/pool_helper.go diff --git a/CHANGELOG.md b/CHANGELOG.md index f0cc0b40a..6168ede00 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release. - Support UUID type in msgpack (#90) - Go modules support (#91) - queue-utube handling (#85) +- Master discovery (#113) ### Fixed diff --git a/Makefile b/Makefile index 5af3358d0..ed94ae137 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,12 @@ deps: clean test: go test ./... -v -p 1 +.PHONY: test-connection-pool +test-connection-pool: + @echo "Running tests in connection_pool package" + go clean -testcache + go test ./connection_pool/ -v -p 1 + .PHONY: test-multi test-multi: @echo "Running tests in multiconnection package" diff --git a/connection_pool/config.lua b/connection_pool/config.lua new file mode 100644 index 000000000..b1492dd13 --- /dev/null +++ b/connection_pool/config.lua @@ -0,0 +1,35 @@ +-- Do not set listen for now so connector won't be +-- able to send requests until everything is configured. +box.cfg{ + work_dir = os.getenv("TEST_TNT_WORK_DIR"), +} + +box.once("init", function() + box.schema.user.create('test', { password = 'test' }) + box.schema.user.grant('test', 'read,write,execute', 'universe') + + local s = box.schema.space.create('testPool', { + id = 520, + if_not_exists = true, + format = { + {name = "key", type = "string"}, + {name = "value", type = "string"}, + }, + }) + s:create_index('pk', { + type = 'tree', + parts = {{ field = 1, type = 'string' }}, + if_not_exists = true + }) +end) + +local function simple_incr(a) + return a + 1 +end + +rawset(_G, 'simple_incr', simple_incr) + +-- Set listen only when every other thing is configured. +box.cfg{ + listen = os.getenv("TEST_TNT_LISTEN"), +} diff --git a/connection_pool/connection_pool.go b/connection_pool/connection_pool.go new file mode 100644 index 000000000..d4a343911 --- /dev/null +++ b/connection_pool/connection_pool.go @@ -0,0 +1,732 @@ +// Package with methods to work with a Tarantool cluster +// considering master discovery. +// +// Main features: +// +// - Return available connection from pool according to round-robin strategy. +// +// - Automatic master discovery by mode parameter. +// +// Since: 1.6.0 +package connection_pool + +import ( + "errors" + "log" + "sync/atomic" + "time" + + "github.com/tarantool/go-tarantool" +) + +var ( + ErrEmptyAddrs = errors.New("addrs (first argument) should not be empty") + ErrWrongCheckTimeout = errors.New("wrong check timeout, must be greater than 0") + ErrNoConnection = errors.New("no active connections") + ErrTooManyArgs = errors.New("too many arguments") + ErrIncorrectResponse = errors.New("Incorrect response format") + ErrIncorrectStatus = errors.New("Incorrect instance status: status should be `running`") + ErrNoRwInstance = errors.New("Can't find rw instance in pool") + ErrNoRoInstance = errors.New("Can't find ro instance in pool") + ErrNoHealthyInstance = errors.New("Can't find healthy instance in pool") +) + +/* +Additional options (configurable via ConnectWithOpts): + +- CheckTimeout - time interval to check for connection timeout and try to switch connection. +*/ +type OptsPool struct { + // timeout for timer to reopen connections + // that have been closed by some events and + // to relocate connection between subpools + // if ro/rw role has been updated + CheckTimeout time.Duration +} + +/* +ConnectionInfo structure for information about connection statuses: + +- ConnectedNow reports if connection is established at the moment. + +- ConnRole reports master/replica role of instance. +*/ +type ConnectionInfo struct { + ConnectedNow bool + ConnRole Role +} + +/* +Main features: + +- Return available connection from pool according to round-robin strategy. + +- Automatic master discovery by mode parameter. +*/ +type ConnectionPool struct { + addrs []string + connOpts tarantool.Opts + opts OptsPool + + notify chan tarantool.ConnEvent + state State + control chan struct{} + roPool *RoundRobinStrategy + rwPool *RoundRobinStrategy + anyPool *RoundRobinStrategy +} + +// ConnectWithOpts creates pool for instances with addresses addrs +// with options opts. +func ConnectWithOpts(addrs []string, connOpts tarantool.Opts, opts OptsPool) (connPool *ConnectionPool, err error) { + if len(addrs) == 0 { + return nil, ErrEmptyAddrs + } + if opts.CheckTimeout <= 0 { + return nil, ErrWrongCheckTimeout + } + + notify := make(chan tarantool.ConnEvent, 10*len(addrs)) // x10 to accept disconnected and closed event (with a margin) + connOpts.Notify = notify + + size := len(addrs) + rwPool := NewEmptyRoundRobin(size) + roPool := NewEmptyRoundRobin(size) + anyPool := NewEmptyRoundRobin(size) + + connPool = &ConnectionPool{ + addrs: addrs, + connOpts: connOpts, + opts: opts, + notify: notify, + control: make(chan struct{}), + rwPool: rwPool, + roPool: roPool, + anyPool: anyPool, + } + + somebodyAlive := connPool.fillPools() + if !somebodyAlive { + connPool.Close() + return nil, ErrNoConnection + } + + go connPool.checker() + + return connPool, nil +} + +// ConnectWithOpts creates pool for instances with addresses addrs. +func Connect(addrs []string, connOpts tarantool.Opts) (connPool *ConnectionPool, err error) { + opts := OptsPool{ + CheckTimeout: 1 * time.Second, + } + return ConnectWithOpts(addrs, connOpts, opts) +} + +// ConnectedNow gets connected status of pool. +func (connPool *ConnectionPool) ConnectedNow(mode Mode) (bool, error) { + if connPool.getState() != connConnected { + return false, nil + } + + conn, err := connPool.getNextConnection(mode) + if err != nil || conn == nil { + return false, err + } + + return conn.ConnectedNow(), nil +} + +// ConfiguredTimeout gets timeout of current connection. +func (connPool *ConnectionPool) ConfiguredTimeout(mode Mode) (time.Duration, error) { + conn, err := connPool.getNextConnection(mode) + if err != nil { + return 0, err + } + + return conn.ConfiguredTimeout(), nil +} + +// Close closes connections in pool. +func (connPool *ConnectionPool) Close() []error { + close(connPool.control) + connPool.state = connClosed + + rwErrs := connPool.rwPool.CloseConns() + roErrs := connPool.roPool.CloseConns() + + allErrs := append(rwErrs, roErrs...) + + return allErrs +} + +// GetAddrs gets addresses of connections in pool. +func (connPool *ConnectionPool) GetAddrs() []string { + return connPool.addrs +} + +// GetPoolInfo gets information of connections (connected status, ro/rw role). +func (connPool *ConnectionPool) GetPoolInfo() map[string]*ConnectionInfo { + info := make(map[string]*ConnectionInfo) + + for _, addr := range connPool.addrs { + conn, role := connPool.getConnectionFromPool(addr) + if conn != nil { + info[addr] = &ConnectionInfo{ConnectedNow: conn.ConnectedNow(), ConnRole: role} + } + } + + return info +} + +// Ping sends empty request to Tarantool to check connection. +func (connPool *ConnectionPool) Ping(userMode Mode) (*tarantool.Response, error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Ping() +} + +// Select performs select to box space. +func (connPool *ConnectionPool) Select(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(ANY, userMode) + if err != nil { + return nil, err + } + + return conn.Select(space, index, offset, limit, iterator, key) +} + +// Insert performs insertion to box space. +// Tarantool will reject Insert when tuple with same primary key exists. +func (connPool *ConnectionPool) Insert(space interface{}, tuple interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return nil, err + } + + return conn.Insert(space, tuple) +} + +// Replace performs "insert or replace" action to box space. +// If tuple with same primary key exists, it will be replaced. +func (connPool *ConnectionPool) Replace(space interface{}, tuple interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return nil, err + } + + return conn.Replace(space, tuple) +} + +// Delete performs deletion of a tuple by key. +// Result will contain array with deleted tuple. +func (connPool *ConnectionPool) Delete(space, index interface{}, key interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return nil, err + } + + return conn.Delete(space, index, key) +} + +// Update performs update of a tuple by key. +// Result will contain array with updated tuple. +func (connPool *ConnectionPool) Update(space, index interface{}, key, ops interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return nil, err + } + + return conn.Update(space, index, key, ops) +} + +// Upsert performs "update or insert" action of a tuple by key. +// Result will not contain any tuple. +func (connPool *ConnectionPool) Upsert(space interface{}, tuple, ops interface{}, userMode ...Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return nil, err + } + + return conn.Upsert(space, tuple, ops) +} + +// Call calls registered Tarantool function. +// It uses request code for Tarantool 1.6, so result is converted to array of arrays. +func (connPool *ConnectionPool) Call(functionName string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Call(functionName, args) +} + +// Call17 calls registered Tarantool function. +// It uses request code for Tarantool 1.7, so result is not converted +// (though, keep in mind, result is always array). +func (connPool *ConnectionPool) Call17(functionName string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Call17(functionName, args) +} + +// Eval passes lua expression for evaluation. +func (connPool *ConnectionPool) Eval(expr string, args interface{}, userMode Mode) (resp *tarantool.Response, err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return nil, err + } + + return conn.Eval(expr, args) +} + +// GetTyped performs select (with limit = 1 and offset = 0) +// to box space and fills typed result. +func (connPool *ConnectionPool) GetTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(ANY, userMode) + if err != nil { + return err + } + + return conn.GetTyped(space, index, key, result) +} + +// SelectTyped performs select to box space and fills typed result. +func (connPool *ConnectionPool) SelectTyped(space, index interface{}, offset, limit, iterator uint32, key interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(ANY, userMode) + if err != nil { + return err + } + + return conn.SelectTyped(space, index, offset, limit, iterator, key, result) +} + +// InsertTyped performs insertion to box space. +// Tarantool will reject Insert when tuple with same primary key exists. +func (connPool *ConnectionPool) InsertTyped(space interface{}, tuple interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return err + } + + return conn.InsertTyped(space, tuple, result) +} + +// ReplaceTyped performs "insert or replace" action to box space. +// If tuple with same primary key exists, it will be replaced. +func (connPool *ConnectionPool) ReplaceTyped(space interface{}, tuple interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return err + } + + return conn.ReplaceTyped(space, tuple, result) +} + +// DeleteTyped performs deletion of a tuple by key and fills result with deleted tuple. +func (connPool *ConnectionPool) DeleteTyped(space, index interface{}, key interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return err + } + + return conn.DeleteTyped(space, index, key, result) +} + +// UpdateTyped performs update of a tuple by key and fills result with updated tuple. +func (connPool *ConnectionPool) UpdateTyped(space, index interface{}, key, ops interface{}, result interface{}, userMode ...Mode) (err error) { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return err + } + + return conn.UpdateTyped(space, index, key, ops, result) +} + +// CallTyped calls registered function. +// It uses request code for Tarantool 1.6, so result is converted to array of arrays. +func (connPool *ConnectionPool) CallTyped(functionName string, args interface{}, result interface{}, userMode Mode) (err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return err + } + + return conn.CallTyped(functionName, args, result) +} + +// Call17Typed calls registered function. +// It uses request code for Tarantool 1.7, so result is not converted +// (though, keep in mind, result is always array). +func (connPool *ConnectionPool) Call17Typed(functionName string, args interface{}, result interface{}, userMode Mode) (err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return err + } + + return conn.Call17Typed(functionName, args, result) +} + +// EvalTyped passes lua expression for evaluation. +func (connPool *ConnectionPool) EvalTyped(expr string, args interface{}, result interface{}, userMode Mode) (err error) { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return err + } + + return conn.EvalTyped(expr, args, result) +} + +// SelectAsync sends select request to Tarantool and returns Future. +func (connPool *ConnectionPool) SelectAsync(space, index interface{}, offset, limit, iterator uint32, key interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(ANY, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.SelectAsync(space, index, offset, limit, iterator, key) +} + +// InsertAsync sends insert action to Tarantool and returns Future. +// Tarantool will reject Insert when tuple with same primary key exists. +func (connPool *ConnectionPool) InsertAsync(space interface{}, tuple interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.InsertAsync(space, tuple) +} + +// ReplaceAsync sends "insert or replace" action to Tarantool and returns Future. +// If tuple with same primary key exists, it will be replaced. +func (connPool *ConnectionPool) ReplaceAsync(space interface{}, tuple interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.ReplaceAsync(space, tuple) +} + +// DeleteAsync sends deletion action to Tarantool and returns Future. +// Future's result will contain array with deleted tuple. +func (connPool *ConnectionPool) DeleteAsync(space, index interface{}, key interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.DeleteAsync(space, index, key) +} + +// UpdateAsync sends deletion of a tuple by key and returns Future. +// Future's result will contain array with updated tuple. +func (connPool *ConnectionPool) UpdateAsync(space, index interface{}, key, ops interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.UpdateAsync(space, index, key, ops) +} + +// UpsertAsync sends "update or insert" action to Tarantool and returns Future. +// Future's sesult will not contain any tuple. +func (connPool *ConnectionPool) UpsertAsync(space interface{}, tuple interface{}, ops interface{}, userMode ...Mode) *tarantool.Future { + conn, err := connPool.getConnByMode(RW, userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.UpsertAsync(space, tuple, ops) +} + +// CallAsync sends a call to registered Tarantool function and returns Future. +// It uses request code for Tarantool 1.6, so future's result is always array of arrays. +func (connPool *ConnectionPool) CallAsync(functionName string, args interface{}, userMode Mode) *tarantool.Future { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.CallAsync(functionName, args) +} + +// Call17Async sends a call to registered Tarantool function and returns Future. +// It uses request code for Tarantool 1.7, so future's result will not be converted +// (though, keep in mind, result is always array). +func (connPool *ConnectionPool) Call17Async(functionName string, args interface{}, userMode Mode) *tarantool.Future { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.Call17Async(functionName, args) +} + +// EvalAsync sends a lua expression for evaluation and returns Future. +func (connPool *ConnectionPool) EvalAsync(expr string, args interface{}, userMode Mode) *tarantool.Future { + conn, err := connPool.getNextConnection(userMode) + if err != nil { + return tarantool.NewErrorFuture(err) + } + + return conn.EvalAsync(expr, args) +} + +// +// private +// + +func (connPool *ConnectionPool) getConnectionRole(conn *tarantool.Connection) (Role, error) { + resp, err := conn.Call17("box.info", []interface{}{}) + if err != nil { + return unknown, err + } + if resp == nil { + return unknown, ErrIncorrectResponse + } + if len(resp.Data) < 1 { + return unknown, ErrIncorrectResponse + } + + instanceStatus, ok := resp.Data[0].(map[interface{}]interface{})["status"] + if !ok { + return unknown, ErrIncorrectResponse + } + if instanceStatus != "running" { + return unknown, ErrIncorrectStatus + } + + resp, err = conn.Call17("box.info", []interface{}{}) + if err != nil { + return unknown, err + } + if resp == nil { + return unknown, ErrIncorrectResponse + } + if len(resp.Data) < 1 { + return unknown, ErrIncorrectResponse + } + + replicaRole, ok := resp.Data[0].(map[interface{}]interface{})["ro"] + if !ok { + return unknown, ErrIncorrectResponse + } + + switch replicaRole { + case false: + return master, nil + case true: + return replica, nil + } + + return unknown, nil +} + +func (connPool *ConnectionPool) getConnectionFromPool(addr string) (*tarantool.Connection, Role) { + conn := connPool.rwPool.GetConnByAddr(addr) + if conn != nil { + return conn, master + } + + conn = connPool.roPool.GetConnByAddr(addr) + if conn != nil { + return conn, replica + } + + return connPool.anyPool.GetConnByAddr(addr), unknown +} + +func (connPool *ConnectionPool) deleteConnectionFromPool(addr string) { + _ = connPool.anyPool.DeleteConnByAddr(addr) + conn := connPool.rwPool.DeleteConnByAddr(addr) + if conn != nil { + return + } + + connPool.roPool.DeleteConnByAddr(addr) +} + +func (connPool *ConnectionPool) setConnectionToPool(addr string, conn *tarantool.Connection) error { + role, err := connPool.getConnectionRole(conn) + if err != nil { + return err + } + + connPool.anyPool.AddConn(addr, conn) + + switch role { + case master: + connPool.rwPool.AddConn(addr, conn) + case replica: + connPool.roPool.AddConn(addr, conn) + } + + return nil +} + +func (connPool *ConnectionPool) refreshConnection(addr string) { + if conn, oldRole := connPool.getConnectionFromPool(addr); conn != nil { + if !conn.ClosedNow() { + curRole, _ := connPool.getConnectionRole(conn) + if oldRole != curRole { + connPool.deleteConnectionFromPool(addr) + err := connPool.setConnectionToPool(addr, conn) + if err != nil { + conn.Close() + log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) + } + } + } + } else { + conn, _ := tarantool.Connect(addr, connPool.connOpts) + if conn != nil { + err := connPool.setConnectionToPool(addr, conn) + if err != nil { + conn.Close() + log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) + } + } + } +} + +func (connPool *ConnectionPool) checker() { + + timer := time.NewTicker(connPool.opts.CheckTimeout) + defer timer.Stop() + + for connPool.getState() != connClosed { + select { + case <-connPool.control: + return + case e := <-connPool.notify: + if connPool.getState() == connClosed { + return + } + if e.Conn.ClosedNow() { + addr := e.Conn.Addr() + if conn, _ := connPool.getConnectionFromPool(addr); conn == nil { + continue + } + conn, _ := tarantool.Connect(addr, connPool.connOpts) + if conn != nil { + err := connPool.setConnectionToPool(addr, conn) + if err != nil { + conn.Close() + log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) + } + } else { + connPool.deleteConnectionFromPool(addr) + } + } + case <-timer.C: + for _, addr := range connPool.addrs { + if connPool.getState() == connClosed { + return + } + + // Reopen connection + // Relocate connection between subpools + // if ro/rw was updated + connPool.refreshConnection(addr) + } + } + } +} + +func (connPool *ConnectionPool) fillPools() bool { + somebodyAlive := false + + for _, addr := range connPool.addrs { + conn, err := tarantool.Connect(addr, connPool.connOpts) + if err != nil { + log.Printf("tarantool: connect to %s failed: %s\n", addr, err.Error()) + } else if conn != nil { + err = connPool.setConnectionToPool(addr, conn) + if err != nil { + conn.Close() + log.Printf("tarantool: storing connection to %s failed: %s\n", addr, err.Error()) + } else if conn.ConnectedNow() { + somebodyAlive = true + } + } + } + + return somebodyAlive +} + +func (connPool *ConnectionPool) getState() uint32 { + return atomic.LoadUint32((*uint32)(&connPool.state)) +} + +func (connPool *ConnectionPool) getNextConnection(mode Mode) (*tarantool.Connection, error) { + + switch mode { + case ANY: + if connPool.anyPool.IsEmpty() { + return nil, ErrNoHealthyInstance + } + + return connPool.anyPool.GetNextConnection(), nil + + case RW: + if connPool.rwPool.IsEmpty() { + return nil, ErrNoRwInstance + } + + return connPool.rwPool.GetNextConnection(), nil + + case RO: + if connPool.roPool.IsEmpty() { + return nil, ErrNoRoInstance + } + + return connPool.roPool.GetNextConnection(), nil + + case PreferRW: + if !connPool.rwPool.IsEmpty() { + return connPool.rwPool.GetNextConnection(), nil + } + + if !connPool.roPool.IsEmpty() { + return connPool.roPool.GetNextConnection(), nil + } + + return nil, ErrNoHealthyInstance + + case PreferRO: + if !connPool.roPool.IsEmpty() { + return connPool.roPool.GetNextConnection(), nil + } + + if !connPool.rwPool.IsEmpty() { + return connPool.rwPool.GetNextConnection(), nil + } + + return nil, ErrNoHealthyInstance + } + + return nil, ErrNoHealthyInstance +} + +func (connPool *ConnectionPool) getConnByMode(defaultMode Mode, userMode []Mode) (*tarantool.Connection, error) { + if len(userMode) > 1 { + return nil, ErrTooManyArgs + } + + mode := defaultMode + if len(userMode) > 0 { + mode = userMode[0] + } + + return connPool.getNextConnection(mode) +} diff --git a/connection_pool/connection_pool_test.go b/connection_pool/connection_pool_test.go new file mode 100644 index 000000000..0f0b0a6a6 --- /dev/null +++ b/connection_pool/connection_pool_test.go @@ -0,0 +1,1293 @@ +package connection_pool_test + +import ( + "log" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +var spaceNo = uint32(520) +var spaceName = "testPool" +var indexNo = uint32(0) +var indexName = "pk" + +var ports = []string{"3013", "3014", "3015", "3016", "3017"} +var host = "127.0.0.1" +var servers = []string{ + strings.Join([]string{host, ports[0]}, ":"), + strings.Join([]string{host, ports[1]}, ":"), + strings.Join([]string{host, ports[2]}, ":"), + strings.Join([]string{host, ports[3]}, ":"), + strings.Join([]string{host, ports[4]}, ":"), +} + +var connOpts = tarantool.Opts{ + Timeout: 500 * time.Millisecond, + User: "test", + Pass: "test", +} + +var defaultCountRetry = 5 +var defaultTimeoutRetry = 500 * time.Millisecond + +var instances []test_helpers.TarantoolInstance + +func TestConnError_IncorrectParams(t *testing.T) { + connPool, err := connection_pool.Connect([]string{}, tarantool.Opts{}) + require.Nilf(t, connPool, "conn is not nil with incorrect param") + require.NotNilf(t, err, "err is nil with incorrect params") + require.Equal(t, "addrs (first argument) should not be empty", err.Error()) + + connPool, err = connection_pool.Connect([]string{"err1", "err2"}, connOpts) + require.Nilf(t, connPool, "conn is not nil with incorrect param") + require.NotNilf(t, err, "err is nil with incorrect params") + require.Equal(t, "no active connections", err.Error()) + + connPool, err = connection_pool.ConnectWithOpts(servers, tarantool.Opts{}, connection_pool.OptsPool{}) + require.Nilf(t, connPool, "conn is not nil with incorrect param") + require.NotNilf(t, err, "err is nil with incorrect params") + require.Equal(t, "wrong check timeout, must be greater than 0", err.Error()) +} + +func TestConnSuccessfully(t *testing.T) { + server := servers[0] + connPool, err := connection_pool.Connect([]string{"err", server}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server: true, + }, + } + + err = test_helpers.CheckPoolStatuses(args) + require.Nil(t, err) +} + +func TestReconnect(t *testing.T) { + server := servers[0] + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + test_helpers.StopTarantoolWithCleanup(instances[0]) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server: false, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + err = test_helpers.RestartTarantool(&instances[0]) + require.Nilf(t, err, "failed to restart tarantool") + + args = test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestDisconnectAll(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + test_helpers.StopTarantoolWithCleanup(instances[0]) + test_helpers.StopTarantoolWithCleanup(instances[1]) + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + server1: false, + server2: false, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + err = test_helpers.RestartTarantool(&instances[0]) + require.Nilf(t, err, "failed to restart tarantool") + + err = test_helpers.RestartTarantool(&instances[1]) + require.Nilf(t, err, "failed to restart tarantool") + + args = test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server1: true, + server2: true, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestClose(t *testing.T) { + server1 := servers[0] + server2 := servers[1] + + connPool, err := connection_pool.Connect([]string{server1, server2}, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + args := test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: true, + ExpectedStatuses: map[string]bool{ + server1: true, + server2: true, + }, + } + + err = test_helpers.CheckPoolStatuses(args) + require.Nil(t, err) + + connPool.Close() + + args = test_helpers.CheckStatusesArgs{ + ConnPool: connPool, + Mode: connection_pool.ANY, + Servers: []string{server1, server2}, + ExpectedPoolStatus: false, + ExpectedStatuses: map[string]bool{ + server1: false, + server2: false, + }, + } + + err = test_helpers.Retry(test_helpers.CheckPoolStatuses, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestCall(t *testing.T) { + roles := []bool{false, true, false, false, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // PreferRO + resp, err := connPool.Call17("box.info", []interface{}{}, connection_pool.PreferRO) + require.Nilf(t, err, "failed to Call") + require.NotNilf(t, resp, "response is nil after Call") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Call") + + val := resp.Data[0].(map[interface{}]interface{})["ro"] + ro, ok := val.(bool) + require.Truef(t, ok, "expected `true` with mode `PreferRO`") + require.Truef(t, ro, "expected `true` with mode `PreferRO`") + + // PreferRW + resp, err = connPool.Call17("box.info", []interface{}{}, connection_pool.PreferRW) + require.Nilf(t, err, "failed to Call") + require.NotNilf(t, resp, "response is nil after Call") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Call") + + val = resp.Data[0].(map[interface{}]interface{})["ro"] + ro, ok = val.(bool) + require.Truef(t, ok, "expected `false` with mode `PreferRW`") + require.Falsef(t, ro, "expected `false` with mode `PreferRW`") + + // RO + resp, err = connPool.Call17("box.info", []interface{}{}, connection_pool.RO) + require.Nilf(t, err, "failed to Call") + require.NotNilf(t, resp, "response is nil after Call") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Call") + + val = resp.Data[0].(map[interface{}]interface{})["ro"] + ro, ok = val.(bool) + require.Truef(t, ok, "expected `true` with mode `RO`") + require.Truef(t, ro, "expected `true` with mode `RO`") + + // RW + resp, err = connPool.Call17("box.info", []interface{}{}, connection_pool.RW) + require.Nilf(t, err, "failed to Call") + require.NotNilf(t, resp, "response is nil after Call") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Call") + + val = resp.Data[0].(map[interface{}]interface{})["ro"] + ro, ok = val.(bool) + require.Truef(t, ok, "expected `false` with mode `RW`") + require.Falsef(t, ro, "expected `false` with mode `RW`") +} + +func TestEval(t *testing.T) { + roles := []bool{false, true, false, false, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // PreferRO + resp, err := connPool.Eval("return box.info().ro", []interface{}{}, connection_pool.PreferRO) + require.Nilf(t, err, "failed to Eval") + require.NotNilf(t, resp, "response is nil after Eval") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Eval") + + val, ok := resp.Data[0].(bool) + require.Truef(t, ok, "expected `true` with mode `PreferRO`") + require.Truef(t, val, "expected `true` with mode `PreferRO`") + + // PreferRW + resp, err = connPool.Eval("return box.info().ro", []interface{}{}, connection_pool.PreferRW) + require.Nilf(t, err, "failed to Eval") + require.NotNilf(t, resp, "response is nil after Eval") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Eval") + + val, ok = resp.Data[0].(bool) + require.Truef(t, ok, "expected `false` with mode `PreferRW`") + require.Falsef(t, val, "expected `false` with mode `PreferRW`") + + // RO + resp, err = connPool.Eval("return box.info().ro", []interface{}{}, connection_pool.RO) + require.Nilf(t, err, "failed to Eval") + require.NotNilf(t, resp, "response is nil after Eval") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Eval") + + val, ok = resp.Data[0].(bool) + require.Truef(t, ok, "expected `true` with mode `RO`") + require.Truef(t, val, "expected `true` with mode `RO`") + + // RW + resp, err = connPool.Eval("return box.info().ro", []interface{}{}, connection_pool.RW) + require.Nilf(t, err, "failed to Eval") + require.NotNilf(t, resp, "response is nil after Eval") + require.GreaterOrEqualf(t, len(resp.Data), 1, "response.Data is empty after Eval") + + val, ok = resp.Data[0].(bool) + require.Truef(t, ok, "expected `false` with mode `RW`") + require.Falsef(t, val, "expected `false` with mode `RW`") +} + +func TestRoundRobinStrategy(t *testing.T) { + roles := []bool{false, true, false, false, true} + + allPorts := map[string]bool{ + servers[0]: true, + servers[1]: true, + servers[2]: true, + servers[3]: true, + servers[4]: true, + } + + masterPorts := map[string]bool{ + servers[0]: true, + servers[2]: true, + servers[3]: true, + } + + replicaPorts := map[string]bool{ + servers[1]: true, + servers[4]: true, + } + + serversNumber := len(servers) + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // ANY + args := test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.ANY, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.RW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.RO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) +} + +func TestRoundRobinStrategy_NoReplica(t *testing.T) { + roles := []bool{false, false, false, false, false} + serversNumber := len(servers) + + allPorts := map[string]bool{ + servers[0]: true, + servers[1]: true, + servers[2]: true, + servers[3]: true, + servers[4]: true, + } + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // RO + _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RO) + require.NotNilf(t, err, "expected to fail after Eval, but error is nil") + require.Equal(t, "Can't find ro instance in pool", err.Error()) + + // ANY + args := test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.ANY, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.RW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) +} + +func TestRoundRobinStrategy_NoMaster(t *testing.T) { + roles := []bool{true, true, true, true, true} + serversNumber := len(servers) + + allPorts := map[string]bool{ + servers[0]: true, + servers[1]: true, + servers[2]: true, + servers[3]: true, + servers[4]: true, + } + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // RW + _, err = connPool.Eval("return box.cfg.listen", []interface{}{}, connection_pool.RW) + require.NotNilf(t, err, "expected to fail after Eval, but error is nil") + require.Equal(t, "Can't find rw instance in pool", err.Error()) + + // ANY + args := test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.ANY, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.RO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) +} + +func TestUpdateInstancesRoles(t *testing.T) { + roles := []bool{false, true, false, false, true} + + allPorts := map[string]bool{ + servers[0]: true, + servers[1]: true, + servers[2]: true, + servers[3]: true, + servers[4]: true, + } + + masterPorts := map[string]bool{ + servers[0]: true, + servers[2]: true, + servers[3]: true, + } + + replicaPorts := map[string]bool{ + servers[1]: true, + servers[4]: true, + } + + serversNumber := len(servers) + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // ANY + args := test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.ANY, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.RW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // RO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.RO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRW, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + // PreferRO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRO, + } + + err = test_helpers.ProcessListenOnInstance(args) + require.Nil(t, err) + + roles = []bool{true, false, true, true, false} + + masterPorts = map[string]bool{ + servers[1]: true, + servers[4]: true, + } + + replicaPorts = map[string]bool{ + servers[0]: true, + servers[2]: true, + servers[3]: true, + } + + err = test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + // ANY + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: allPorts, + ConnPool: connPool, + Mode: connection_pool.ANY, + } + + err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + // RW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.RW, + } + + err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + // RO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.RO, + } + + err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + // PreferRW + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: masterPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRW, + } + + err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) + + // PreferRO + args = test_helpers.ListenOnInstanceArgs{ + ServersNumber: serversNumber, + ExpectedPorts: replicaPorts, + ConnPool: connPool, + Mode: connection_pool.PreferRO, + } + + err = test_helpers.Retry(test_helpers.ProcessListenOnInstance, args, defaultCountRetry, defaultTimeoutRetry) + require.Nil(t, err) +} + +func TestInsert(t *testing.T) { + roles := []bool{true, true, false, true, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // Mode is `RW` by default, we have only one RW instance (servers[2]) + resp, err := connPool.Insert(spaceName, []interface{}{"rw_insert_key", "rw_insert_value"}) + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Insert") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Insert") + require.Equalf(t, 2, len(tpl), "unexpected body of Insert") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Insert (0)") + require.Equalf(t, "rw_insert_key", key, "unexpected body of Insert (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Insert (1)") + require.Equalf(t, "rw_insert_value", value, "unexpected body of Insert (1)") + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + defer conn.Close() + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"rw_insert_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rw_insert_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rw_insert_value", value, "unexpected body of Select (1)") + + // PreferRW + resp, err = connPool.Insert(spaceName, []interface{}{"preferRW_insert_key", "preferRW_insert_value"}) + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Insert") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Insert") + require.Equalf(t, 2, len(tpl), "unexpected body of Insert") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Insert (0)") + require.Equalf(t, "preferRW_insert_key", key, "unexpected body of Insert (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Insert (1)") + require.Equalf(t, "preferRW_insert_value", value, "unexpected body of Insert (1)") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"preferRW_insert_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "preferRW_insert_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "preferRW_insert_value", value, "unexpected body of Select (1)") +} + +func TestDelete(t *testing.T) { + roles := []bool{true, true, false, true, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + defer conn.Close() + + resp, err := conn.Insert(spaceNo, []interface{}{"delete_key", "delete_value"}) + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Insert") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Insert") + require.Equalf(t, 2, len(tpl), "unexpected body of Insert") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Insert (0)") + require.Equalf(t, "delete_key", key, "unexpected body of Insert (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Insert (1)") + require.Equalf(t, "delete_value", value, "unexpected body of Insert (1)") + + // Mode is `RW` by default, we have only one RW instance (servers[2]) + resp, err = connPool.Delete(spaceName, indexNo, []interface{}{"delete_key"}) + require.Nilf(t, err, "failed to Delete") + require.NotNilf(t, resp, "response is nil after Delete") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Delete") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Delete") + require.Equalf(t, 2, len(tpl), "unexpected body of Delete") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Delete (0)") + require.Equalf(t, "delete_key", key, "unexpected body of Delete (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Delete (1)") + require.Equalf(t, "delete_value", value, "unexpected body of Delete (1)") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"delete_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, 0, len(resp.Data), "response Body len != 0 after Select") +} + +func TestUpsert(t *testing.T) { + roles := []bool{true, true, false, true, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + defer conn.Close() + + // Mode is `RW` by default, we have only one RW instance (servers[2]) + resp, err := connPool.Upsert(spaceName, []interface{}{"upsert_key", "upsert_value"}, []interface{}{[]interface{}{"=", 1, "new_value"}}) + require.Nilf(t, err, "failed to Upsert") + require.NotNilf(t, resp, "response is nil after Upsert") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"upsert_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "upsert_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "upsert_value", value, "unexpected body of Select (1)") + + // PreferRW + resp, err = connPool.Upsert( + spaceName, []interface{}{"upsert_key", "upsert_value"}, + []interface{}{[]interface{}{"=", 1, "new_value"}}, connection_pool.PreferRW) + + require.Nilf(t, err, "failed to Upsert") + require.NotNilf(t, resp, "response is nil after Upsert") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"upsert_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "upsert_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "new_value", value, "unexpected body of Select (1)") +} + +func TestUpdate(t *testing.T) { + roles := []bool{true, true, false, true, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + defer conn.Close() + + resp, err := conn.Insert(spaceNo, []interface{}{"update_key", "update_value"}) + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Insert") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Insert") + require.Equalf(t, 2, len(tpl), "unexpected body of Insert") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Insert (0)") + require.Equalf(t, "update_key", key, "unexpected body of Insert (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Insert (1)") + require.Equalf(t, "update_value", value, "unexpected body of Insert (1)") + + // Mode is `RW` by default, we have only one RW instance (servers[2]) + resp, err = connPool.Update(spaceName, indexNo, []interface{}{"update_key"}, []interface{}{[]interface{}{"=", 1, "new_value"}}) + require.Nilf(t, err, "failed to Update") + require.NotNilf(t, resp, "response is nil after Update") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"update_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "update_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "new_value", value, "unexpected body of Select (1)") + + // PreferRW + resp, err = connPool.Update( + spaceName, indexNo, []interface{}{"update_key"}, + []interface{}{[]interface{}{"=", 1, "another_value"}}, connection_pool.PreferRW) + + require.Nilf(t, err, "failed to Update") + require.NotNilf(t, resp, "response is nil after Update") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"update_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "update_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "another_value", value, "unexpected body of Select (1)") +} + +func TestReplace(t *testing.T) { + roles := []bool{true, true, false, true, true} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + require.Nilf(t, err, "failed to connect %s", servers[2]) + require.NotNilf(t, conn, "conn is nil after Connect") + + defer conn.Close() + + resp, err := conn.Insert(spaceNo, []interface{}{"replace_key", "replace_value"}) + require.Nilf(t, err, "failed to Insert") + require.NotNilf(t, resp, "response is nil after Insert") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Insert") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Insert") + require.Equalf(t, 2, len(tpl), "unexpected body of Insert") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Insert (0)") + require.Equalf(t, "replace_key", key, "unexpected body of Insert (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Insert (1)") + require.Equalf(t, "replace_value", value, "unexpected body of Insert (1)") + + // Mode is `RW` by default, we have only one RW instance (servers[2]) + resp, err = connPool.Replace(spaceNo, []interface{}{"new_key", "new_value"}) + require.Nilf(t, err, "failed to Replace") + require.NotNilf(t, resp, "response is nil after Replace") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"new_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "new_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "new_value", value, "unexpected body of Select (1)") + + // PreferRW + resp, err = connPool.Replace(spaceNo, []interface{}{"new_key", "new_value"}, connection_pool.PreferRW) + require.Nilf(t, err, "failed to Replace") + require.NotNilf(t, resp, "response is nil after Replace") + + resp, err = conn.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, []interface{}{"new_key"}) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "new_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "new_value", value, "unexpected body of Select (1)") +} + +func TestSelect(t *testing.T) { + roles := []bool{true, true, false, true, false} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + roServers := []string{servers[0], servers[1], servers[3]} + rwServers := []string{servers[2], servers[4]} + allServers := []string{servers[0], servers[1], servers[2], servers[3], servers[4]} + + roTpl := []interface{}{"ro_select_key", "ro_select_value"} + rwTpl := []interface{}{"rw_select_key", "rw_select_value"} + anyTpl := []interface{}{"any_select_key", "any_select_value"} + + roKey := []interface{}{"ro_select_key"} + rwKey := []interface{}{"rw_select_key"} + anyKey := []interface{}{"any_select_key"} + + err = test_helpers.InsertOnInstances(roServers, connOpts, spaceNo, roTpl) + require.Nil(t, err) + + err = test_helpers.InsertOnInstances(rwServers, connOpts, spaceNo, rwTpl) + require.Nil(t, err) + + err = test_helpers.InsertOnInstances(allServers, connOpts, spaceNo, anyTpl) + require.Nil(t, err) + + //default: ANY + resp, err := connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, anyKey) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok := resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok := tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "any_select_key", key, "unexpected body of Select (0)") + + value, ok := tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "any_select_value", value, "unexpected body of Select (1)") + + // PreferRO + resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, connection_pool.PreferRO) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "ro_select_key", key, "unexpected body of Select (0)") + + // PreferRW + resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, connection_pool.PreferRW) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rw_select_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rw_select_value", value, "unexpected body of Select (1)") + + // RO + resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, roKey, connection_pool.RO) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "ro_select_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "ro_select_value", value, "unexpected body of Select (1)") + + // RW + resp, err = connPool.Select(spaceNo, indexNo, 0, 1, tarantool.IterEq, rwKey, connection_pool.RW) + require.Nilf(t, err, "failed to Select") + require.NotNilf(t, resp, "response is nil after Select") + require.Equalf(t, len(resp.Data), 1, "response Body len != 1 after Select") + + tpl, ok = resp.Data[0].([]interface{}) + require.Truef(t, ok, "unexpected body of Select") + require.Equalf(t, 2, len(tpl), "unexpected body of Select") + + key, ok = tpl[0].(string) + require.Truef(t, ok, "unexpected body of Select (0)") + require.Equalf(t, "rw_select_key", key, "unexpected body of Select (0)") + + value, ok = tpl[1].(string) + require.Truef(t, ok, "unexpected body of Select (1)") + require.Equalf(t, "rw_select_value", value, "unexpected body of Select (1)") +} + +func TestPing(t *testing.T) { + roles := []bool{true, true, false, true, false} + + err := test_helpers.SetClusterRO(servers, connOpts, roles) + require.Nilf(t, err, "fail to set roles for cluster") + + connPool, err := connection_pool.Connect(servers, connOpts) + require.Nilf(t, err, "failed to connect") + require.NotNilf(t, connPool, "conn is nil after Connect") + + defer connPool.Close() + + // ANY + resp, err := connPool.Ping(connection_pool.ANY) + require.Nilf(t, err, "failed to Ping") + require.NotNilf(t, resp, "response is nil after Ping") + + // RW + resp, err = connPool.Ping(connection_pool.RW) + require.Nilf(t, err, "failed to Ping") + require.NotNilf(t, resp, "response is nil after Ping") + + // RO + resp, err = connPool.Ping(connection_pool.RO) + require.Nilf(t, err, "failed to Ping") + require.NotNilf(t, resp, "response is nil after Ping") + + // PreferRW + resp, err = connPool.Ping(connection_pool.PreferRW) + require.Nilf(t, err, "failed to Ping") + require.NotNilf(t, resp, "response is nil after Ping") + + // PreferRO + resp, err = connPool.Ping(connection_pool.PreferRO) + require.Nilf(t, err, "failed to Ping") + require.NotNilf(t, resp, "response is nil after Ping") +} + +// runTestMain is a body of TestMain function +// (see https://pkg.go.dev/testing#hdr-Main). +// Using defer + os.Exit is not works so TestMain body +// is a separate function, see +// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls +func runTestMain(m *testing.M) int { + initScript := "config.lua" + waitStart := 100 * time.Millisecond + var connectRetry uint = 3 + retryTimeout := 500 * time.Millisecond + workDirs := []string{ + "work_dir1", "work_dir2", + "work_dir3", "work_dir4", + "work_dir5"} + var err error + + instances, err = test_helpers.StartTarantoolInstances(servers, workDirs, test_helpers.StartOpts{ + InitScript: initScript, + User: connOpts.User, + Pass: connOpts.Pass, + WaitStart: waitStart, + ConnectRetry: connectRetry, + RetryTimeout: retryTimeout, + }) + + if err != nil { + log.Fatalf("Failed to prepare test tarantool: %s", err) + return -1 + } + + defer test_helpers.StopTarantoolInstances(instances) + + return m.Run() +} + +func TestMain(m *testing.M) { + code := runTestMain(m) + os.Exit(code) +} diff --git a/connection_pool/const.go b/connection_pool/const.go new file mode 100644 index 000000000..04690d4f5 --- /dev/null +++ b/connection_pool/const.go @@ -0,0 +1,52 @@ +package connection_pool + +type Mode uint32 +type Role uint32 +type State uint32 + +/* +Mode parameter: + +- ANY (use any instance) - the request can be executed on any instance (master or replica). + +- RW (writeable instance (master)) - the request can only be executed on master. + +- RO (read only instance (replica)) - the request can only be executed on replica. + +- PREFER_RO (prefer read only instance (replica)) - if there is one, otherwise fallback to a writeable one (master). + +- PREFER_RW (prefer write only instance (master)) - if there is one, otherwise fallback to a read only one (replica). + + Request Default mode + ---------- -------------- + | call | no default | + | eval | no default | + | ping | no default | + | insert | RW | + | delete | RW | + | replace | RW | + | update | RW | + | upsert | RW | + | select | ANY | + | get | ANY | + */ +const ( + ANY = iota + RW + RO + PreferRW + PreferRO +) + +// master/replica role +const ( + unknown = iota + master + replica +) + +// pool state +const ( + connConnected = iota + connClosed +) diff --git a/connection_pool/example_test.go b/connection_pool/example_test.go new file mode 100644 index 000000000..06341a9b2 --- /dev/null +++ b/connection_pool/example_test.go @@ -0,0 +1,531 @@ +package connection_pool_test + +import ( + "fmt" + + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" + "github.com/tarantool/go-tarantool/test_helpers" +) + +type Tuple struct { + // Instruct msgpack to pack this struct as array, so no custom packer + // is needed. + _msgpack struct{} `msgpack:",asArray"` //nolint: structcheck,unused + Key string + Value string +} + +var testRoles = []bool{true, true, false, true, true} + +func examplePool(roles []bool) (*connection_pool.ConnectionPool, error) { + err := test_helpers.SetClusterRO(servers, connOpts, roles) + if err != nil { + return nil, fmt.Errorf("ConnectionPool is not established") + } + connPool, err := connection_pool.Connect(servers, connOpts) + if err != nil || connPool == nil { + return nil, fmt.Errorf("ConnectionPool is not established") + } + + return connPool, nil +} + +func ExampleConnectionPool_Select() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + // Insert a new tuple {"key2", "value2"}. + _, err = conn.Insert(spaceName, &Tuple{Key: "key2", Value: "value2"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + resp, err := pool.Select( + spaceNo, indexNo, 0, 100, tarantool.IterEq, + []interface{}{"key1"}, connection_pool.PreferRW) + if err != nil { + fmt.Printf("error in select is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + resp, err = pool.Select( + spaceNo, indexNo, 0, 100, tarantool.IterEq, + []interface{}{"key2"}, connection_pool.PreferRW) + if err != nil { + fmt.Printf("error in select is %v", err) + return + } + fmt.Printf("response is %#v\n", resp.Data) + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Delete tuple with primary key "key2". + _, err = conn.Delete(spaceNo, indexNo, []interface{}{"key2"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + + // Output: + // response is []interface {}{[]interface {}{"key1", "value1"}} + // response is []interface {}{[]interface {}{"key2", "value2"}} +} + +func ExampleConnectionPool_SelectTyped() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + // Insert a new tuple {"key2", "value2"}. + _, err = conn.Insert(spaceName, &Tuple{Key: "key2", Value: "value2"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + var res []Tuple + err = pool.SelectTyped( + spaceNo, indexNo, 0, 100, tarantool.IterEq, + []interface{}{"key1"}, &res, connection_pool.PreferRW) + if err != nil { + fmt.Printf("error in select is %v", err) + return + } + fmt.Printf("response is %v\n", res) + err = pool.SelectTyped( + spaceName, indexName, 0, 100, tarantool.IterEq, + []interface{}{"key2"}, &res, connection_pool.PreferRW) + if err != nil { + fmt.Printf("error in select is %v", err) + return + } + fmt.Printf("response is %v\n", res) + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Delete tuple with primary key "key2". + _, err = conn.Delete(spaceNo, indexNo, []interface{}{"key2"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + + // Output: + // response is [{{} key1 value1}] + // response is [{{} key2 value2}] +} + +func ExampleConnectionPool_SelectAsync() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + // Insert a new tuple {"key2", "value2"}. + _, err = conn.Insert(spaceName, &Tuple{Key: "key2", Value: "value2"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + // Insert a new tuple {"key3", "value3"}. + _, err = conn.Insert(spaceNo, []interface{}{"key3", "value3"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + var futs [3]*tarantool.Future + futs[0] = pool.SelectAsync( + spaceName, indexName, 0, 2, tarantool.IterEq, + []interface{}{"key1"}, connection_pool.PreferRW) + futs[1] = pool.SelectAsync( + spaceName, indexName, 0, 1, tarantool.IterEq, + []interface{}{"key2"}, connection_pool.RW) + futs[2] = pool.SelectAsync( + spaceName, indexName, 0, 1,tarantool.IterEq, + []interface{}{"key3"}, connection_pool.RW) + var t []Tuple + err = futs[0].GetTyped(&t) + fmt.Println("Future", 0, "Error", err) + fmt.Println("Future", 0, "Data", t) + + resp, err := futs[1].Get() + fmt.Println("Future", 1, "Error", err) + fmt.Println("Future", 1, "Data", resp.Data) + + resp, err = futs[2].Get() + fmt.Println("Future", 2, "Error", err) + fmt.Println("Future", 2, "Data", resp.Data) + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Delete tuple with primary key "key2". + _, err = conn.Delete(spaceNo, indexNo, []interface{}{"key2"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Delete tuple with primary key "key3". + _, err = conn.Delete(spaceNo, indexNo, []interface{}{"key3"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + + // Output: + // Future 0 Error + // Future 0 Data [{{} key1 value1}] + // Future 1 Error + // Future 1 Data [[key2 value2]] + // Future 2 Error + // Future 2 Data [[key3 value3]] +} + +func ExampleConnectionPool_SelectAsync_err() { + roles := []bool{true, true, true, true, true} + pool, err := examplePool(roles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + var futs [3]*tarantool.Future + futs[0] = pool.SelectAsync( + spaceName, indexName, 0, 2, tarantool.IterEq, + []interface{}{"key1"}, connection_pool.RW) + + err = futs[0].Err() + fmt.Println("Future", 0, "Error", err) + + // Output: + // Future 0 Error Can't find rw instance in pool +} + +func ExampleConnectionPool_Ping() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Ping a Tarantool instance to check connection. + resp, err := pool.Ping(connection_pool.ANY) + fmt.Println("Ping Code", resp.Code) + fmt.Println("Ping Data", resp.Data) + fmt.Println("Ping Error", err) + // Output: + // Ping Code 0 + // Ping Data [] + // Ping Error +} + +func ExampleConnectionPool_Insert() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Insert a new tuple {"key1", "value1"}. + resp, err := pool.Insert(spaceNo, []interface{}{"key1", "value1"}) + fmt.Println("Insert key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + // Insert a new tuple {"key2", "value2"}. + resp, err = pool.Insert(spaceName, &Tuple{Key: "key2", Value: "value2"}, connection_pool.PreferRW) + fmt.Println("Insert key2") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Delete tuple with primary key "key2". + _, err = conn.Delete(spaceNo, indexNo, []interface{}{"key2"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + // Output: + // Insert key1 + // Error + // Code 0 + // Data [[key1 value1]] + // Insert key2 + // Error + // Code 0 + // Data [[key2 value2]] +} + +func ExampleConnectionPool_Delete() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + // Insert a new tuple {"key2", "value2"}. + _, err = conn.Insert(spaceNo, []interface{}{"key2", "value2"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + // Delete tuple with primary key {"key1"}. + resp, err := pool.Delete(spaceNo, indexNo, []interface{}{"key1"}) + fmt.Println("Delete key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + + // Delete tuple with primary key { "key2" }. + resp, err = pool.Delete(spaceName, indexName, []interface{}{"key2"}, connection_pool.PreferRW) + fmt.Println("Delete key2") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + // Output: + // Delete key1 + // Error + // Code 0 + // Data [[key1 value1]] + // Delete key2 + // Error + // Code 0 + // Data [[key2 value2]] +} + +func ExampleConnectionPool_Replace() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + // Replace a tuple with primary key ""key1. + // Note, Tuple is defined within tests, and has EncdodeMsgpack and + // DecodeMsgpack methods. + resp, err := pool.Replace(spaceNo, []interface{}{"key1", "new_value"}) + fmt.Println("Replace key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + resp, err = pool.Replace(spaceName, []interface{}{"key1", "another_value"}) + fmt.Println("Replace key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + resp, err = pool.Replace(spaceName, &Tuple{Key: "key1", Value: "value2"}) + fmt.Println("Replace key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + resp, err = pool.Replace(spaceName, &Tuple{Key: "key1", Value: "new_value2"}, connection_pool.PreferRW) + fmt.Println("Replace key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + + // Output: + // Replace key1 + // Error + // Code 0 + // Data [[key1 new_value]] + // Replace key1 + // Error + // Code 0 + // Data [[key1 another_value]] + // Replace key1 + // Error + // Code 0 + // Data [[key1 value2]] + // Replace key1 + // Error + // Code 0 + // Data [[key1 new_value2]] +} + +func ExampleConnectionPool_Update() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Connect to servers[2] to check if tuple + // was inserted on RW instance + conn, err := tarantool.Connect(servers[2], connOpts) + if err != nil || conn == nil { + fmt.Printf("failed to connect to %s", servers[2]) + return + } + + // Insert a new tuple {"key1", "value1"}. + _, err = conn.Insert(spaceNo, []interface{}{"key1", "value1"}) + if err != nil { + fmt.Printf("Failed to insert: %s", err.Error()) + return + } + + // Update tuple with primary key { "key1" }. + resp, err := pool.Update( + spaceName, indexName, []interface{}{"key1"}, + []interface{}{[]interface{}{"=", 1, "new_value"}}, connection_pool.PreferRW) + fmt.Println("Update key1") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + + // Delete tuple with primary key "key1". + _, err = conn.Delete(spaceName, indexName, []interface{}{"key1"}) + if err != nil { + fmt.Printf("Failed to delete: %s", err.Error()) + } + + // Output: + // Update key1 + // Error + // Code 0 + // Data [[key1 new_value]] +} + +func ExampleConnectionPool_Call17() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Call a function 'simple_incr' with arguments. + resp, err := pool.Call17("simple_incr", []interface{}{1}, connection_pool.PreferRW) + fmt.Println("Call simple_incr()") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + // Output: + // Call simple_incr() + // Error + // Code 0 + // Data [2] +} + +func ExampleConnectionPool_Eval() { + pool, err := examplePool(testRoles) + if err != nil { + fmt.Println(err) + } + defer pool.Close() + + // Run raw Lua code. + resp, err := pool.Eval("return 1 + 2", []interface{}{}, connection_pool.PreferRW) + fmt.Println("Eval 'return 1 + 2'") + fmt.Println("Error", err) + fmt.Println("Code", resp.Code) + fmt.Println("Data", resp.Data) + // Output: + // Eval 'return 1 + 2' + // Error + // Code 0 + // Data [3] +} diff --git a/connection_pool/round_robin.go b/connection_pool/round_robin.go new file mode 100644 index 000000000..7f3f0d098 --- /dev/null +++ b/connection_pool/round_robin.go @@ -0,0 +1,117 @@ +package connection_pool + +import ( + "sync" + "sync/atomic" + + "github.com/tarantool/go-tarantool" +) + +type RoundRobinStrategy struct { + conns []*tarantool.Connection + indexByAddr map[string]int + mutex sync.RWMutex + size int + current uint64 +} + +func (r *RoundRobinStrategy) GetConnByAddr(addr string) *tarantool.Connection { + r.mutex.RLock() + defer r.mutex.RUnlock() + + index, found := r.indexByAddr[addr] + if !found { + return nil + } + + return r.conns[index] +} + +func (r *RoundRobinStrategy) DeleteConnByAddr(addr string) *tarantool.Connection { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.size == 0 { + return nil + } + + index, found := r.indexByAddr[addr] + if !found { + return nil + } + + delete(r.indexByAddr, addr) + + conn := r.conns[index] + r.conns = append(r.conns[:index], r.conns[index+1:]...) + r.size -= 1 + + for index, conn := range r.conns { + r.indexByAddr[conn.Addr()] = index + } + + return conn +} + +func (r *RoundRobinStrategy) IsEmpty() bool { + r.mutex.RLock() + defer r.mutex.RUnlock() + + return r.size == 0 +} + +func (r *RoundRobinStrategy) CloseConns() []error { + r.mutex.Lock() + defer r.mutex.Unlock() + + errs := make([]error, len(r.conns)) + + for i, conn := range r.conns { + errs[i] = conn.Close() + } + + return errs +} + +func (r *RoundRobinStrategy) GetNextConnection() *tarantool.Connection { + r.mutex.RLock() + defer r.mutex.RUnlock() + + // We want to iterate through the elements in a circular order + // so the first element in cycle is connections[next] + // and the last one is connections[next + length]. + next := r.nextIndex() + cycleLen := len(r.conns) + next + for i := next; i < cycleLen; i++ { + idx := i % len(r.conns) + if r.conns[idx].ConnectedNow() { + if i != next { + atomic.StoreUint64(&r.current, uint64(idx)) + } + return r.conns[idx] + } + } + + return nil +} + +func NewEmptyRoundRobin(size int) *RoundRobinStrategy { + return &RoundRobinStrategy{ + conns: make([]*tarantool.Connection, 0, size), + indexByAddr: make(map[string]int), + size: 0, + } +} + +func (r *RoundRobinStrategy) AddConn(addr string, conn *tarantool.Connection) { + r.mutex.Lock() + defer r.mutex.Unlock() + + r.conns = append(r.conns, conn) + r.indexByAddr[addr] = r.size + r.size += 1 +} + +func (r *RoundRobinStrategy) nextIndex() int { + return int(atomic.AddUint64(&r.current, uint64(1)) % uint64(len(r.conns))) +} diff --git a/go.mod b/go.mod index 6914a5870..152329b1f 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.11 require ( github.com/google/uuid v1.3.0 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect + github.com/stretchr/testify v1.7.1 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect gopkg.in/vmihailenco/msgpack.v2 v2.9.2 diff --git a/go.sum b/go.sum index 8a6ae1fa0..1f9e791b4 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -7,6 +9,11 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20190603091049-60506f45cf65 h1:+rhAzEzT3f4JtomfC371qB+0Ola2caSKcY69NUBZrRQ= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= @@ -16,7 +23,10 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/vmihailenco/msgpack.v2 v2.9.2 h1:gjPqo9orRVlSAH/065qw3MsFCDpH7fa1KpiizXyllY4= gopkg.in/vmihailenco/msgpack.v2 v2.9.2/go.mod h1:/3Dn1Npt9+MYyLpYYXjInO/5jvMLamn+AEGwNEOatn8= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/request.go b/request.go index ff8a7e8a5..6065959c4 100644 --- a/request.go +++ b/request.go @@ -457,3 +457,8 @@ func (fut *Future) Err() error { fut.wait() return fut.err } + +// NewErrorFuture returns new set empty Future with filled error field. +func NewErrorFuture(err error) *Future { + return &Future{err: err} +} diff --git a/test_helpers/main.go b/test_helpers/main.go index ad45e00d9..92ee27692 100644 --- a/test_helpers/main.go +++ b/test_helpers/main.go @@ -60,8 +60,8 @@ type TarantoolInstance struct { // Cmd is a Tarantool command. Used to kill Tarantool process. Cmd *exec.Cmd - // WorkDir is a directory with tarantool data. Cleaned up after run. - WorkDir string + // Options for restarting a tarantool instance. + Opts StartOpts } func isReady(server string, opts *tarantool.Opts) error { @@ -148,6 +148,19 @@ func IsTarantoolVersionLess(majorMin uint64, minorMin uint64, patchMin uint64) ( return false, nil } +// RestartTarantool restarts a tarantool instance for tests +// with specifies parameters (refer to StartOpts) +// which were specified in inst parameter. +// inst is a tarantool instance that was started by +// StartTarantool. Rewrites inst.Cmd.Process to stop +// instance with StopTarantool. +// Process must be stopped with StopTarantool. +func RestartTarantool(inst *TarantoolInstance) error { + startedInst, err := StartTarantool(inst.Opts) + inst.Cmd.Process = startedInst.Cmd.Process + return err +} + // StartTarantool starts a tarantool instance for tests // with specifies parameters (refer to StartOpts). // Process must be stopped with StopTarantool. @@ -174,7 +187,8 @@ func StartTarantool(startOpts StartOpts) (TarantoolInstance, error) { return inst, err } - inst.WorkDir = startOpts.WorkDir + // Options for restarting tarantool instance. + inst.Opts = startOpts // Start tarantool. err = inst.Cmd.Start() @@ -236,8 +250,8 @@ func StopTarantool(inst TarantoolInstance) { func StopTarantoolWithCleanup(inst TarantoolInstance) { StopTarantool(inst) - if inst.WorkDir != "" { - if err := os.RemoveAll(inst.WorkDir); err != nil { + if inst.Opts.WorkDir != "" { + if err := os.RemoveAll(inst.Opts.WorkDir); err != nil { log.Fatalf("Failed to clean work directory, got %s", err) } } diff --git a/test_helpers/pool_helper.go b/test_helpers/pool_helper.go new file mode 100644 index 000000000..8293b8185 --- /dev/null +++ b/test_helpers/pool_helper.go @@ -0,0 +1,248 @@ +package test_helpers + +import ( + "fmt" + "reflect" + "time" + + "github.com/tarantool/go-tarantool" + "github.com/tarantool/go-tarantool/connection_pool" +) + +type ListenOnInstanceArgs struct { + ConnPool *connection_pool.ConnectionPool + Mode connection_pool.Mode + ServersNumber int + ExpectedPorts map[string]bool +} + +type CheckStatusesArgs struct { + ConnPool *connection_pool.ConnectionPool + Servers []string + Mode connection_pool.Mode + ExpectedPoolStatus bool + ExpectedStatuses map[string]bool +} + +func compareTuples(expectedTpl []interface{}, actualTpl []interface{}) error { + if len(actualTpl) != len(expectedTpl) { + return fmt.Errorf("Unexpected body of Insert (tuple len)") + } + + for i, field := range actualTpl { + if field != expectedTpl[i] { + return fmt.Errorf("Unexpected field, expected: %v actual: %v", expectedTpl[i], field) + } + } + + return nil +} + +func CheckPoolStatuses(args interface{}) error { + checkArgs, ok := args.(CheckStatusesArgs) + if !ok { + return fmt.Errorf("incorrect args") + } + + connected, _ := checkArgs.ConnPool.ConnectedNow(checkArgs.Mode) + if connected != checkArgs.ExpectedPoolStatus { + return fmt.Errorf( + "incorrect connection pool status: expected status %t actual status %t", + checkArgs.ExpectedPoolStatus, connected) + } + + poolInfo := checkArgs.ConnPool.GetPoolInfo() + for _, server := range checkArgs.Servers { + status := poolInfo[server] != nil && poolInfo[server].ConnectedNow + if checkArgs.ExpectedStatuses[server] != status { + return fmt.Errorf( + "incorrect conn status: addr %s expected status %t actual status %t", + server, checkArgs.ExpectedStatuses[server], status) + } + } + + return nil +} + +// ProcessListenOnInstance helper calls "return box.cfg.listen" +// as many times as there are servers in the connection pool +// with specified mode. +// For RO mode expected received ports equals to replica ports. +// For RW mode expected received ports equals to master ports. +// For PreferRO mode expected received ports equals to replica +// ports or to all ports. +// For PreferRW mode expected received ports equals to master ports +// or to all ports. +func ProcessListenOnInstance(args interface{}) error { + actualPorts := map[string]bool{} + + listenArgs, ok := args.(ListenOnInstanceArgs) + if !ok { + return fmt.Errorf("incorrect args") + } + + for i := 0; i < listenArgs.ServersNumber; i++ { + resp, err := listenArgs.ConnPool.Eval("return box.cfg.listen", []interface{}{}, listenArgs.Mode) + if err != nil { + return fmt.Errorf("fail to Eval: %s", err.Error()) + } + if resp == nil { + return fmt.Errorf("response is nil after Eval") + } + if len(resp.Data) < 1 { + return fmt.Errorf("response.Data is empty after Eval") + } + + port, ok := resp.Data[0].(string) + if !ok { + return fmt.Errorf("response.Data is incorrect after Eval") + } + + actualPorts[port] = true + } + + equal := reflect.DeepEqual(actualPorts, listenArgs.ExpectedPorts) + if !equal { + return fmt.Errorf("expected ports: %v, actual ports: %v", actualPorts, listenArgs.ExpectedPorts) + } + + return nil +} + +func Retry(f func(interface{}) error, args interface{}, count int, timeout time.Duration) error { + var err error + + for i := 0; ; i++ { + err = f(args) + if err == nil { + return err + } + + if i >= (count - 1) { + break + } + + time.Sleep(timeout) + } + + return err +} + +func InsertOnInstance(server string, connOpts tarantool.Opts, space interface{}, tuple interface{}) error { + conn, err := tarantool.Connect(server, connOpts) + if err != nil { + return fmt.Errorf("Fail to connect to %s: %s", server, err.Error()) + } + if conn == nil { + return fmt.Errorf("conn is nil after Connect") + } + defer conn.Close() + + resp, err := conn.Insert(space, tuple) + if err != nil { + return fmt.Errorf("Failed to Insert: %s", err.Error()) + } + if resp == nil { + return fmt.Errorf("Response is nil after Insert") + } + if len(resp.Data) != 1 { + return fmt.Errorf("Response Body len != 1") + } + if tpl, ok := resp.Data[0].([]interface{}); !ok { + return fmt.Errorf("Unexpected body of Insert") + } else { + expectedTpl, ok := tuple.([]interface{}) + if !ok { + return fmt.Errorf("Failed to cast") + } + + err = compareTuples(expectedTpl, tpl) + if err != nil { + return err + } + } + + return nil +} + +func InsertOnInstances(servers []string, connOpts tarantool.Opts, space interface{}, tuple interface{}) error { + serversNumber := len(servers) + roles := make([]bool, serversNumber) + for i:= 0; i < serversNumber; i++{ + roles[i] = false + } + + err := SetClusterRO(servers, connOpts, roles) + if err != nil { + return fmt.Errorf("fail to set roles for cluster: %s", err.Error()) + } + + for _, server := range servers { + err := InsertOnInstance(server, connOpts, space, tuple) + if err != nil { + return err + } + } + + return nil +} + +func SetInstanceRO(server string, connOpts tarantool.Opts, isReplica bool) error { + conn, err := tarantool.Connect(server, connOpts) + if err != nil { + return err + } + + defer conn.Close() + + _, err = conn.Call("box.cfg", []interface{}{map[string]bool{"read_only": isReplica}}) + if err != nil { + return err + } + + return nil +} + +func SetClusterRO(servers []string, connOpts tarantool.Opts, roles []bool) error { + if len(servers) != len(roles) { + return fmt.Errorf("number of servers should be equal to number of roles") + } + + for i, server := range servers { + err := SetInstanceRO(server, connOpts, roles[i]) + if err != nil { + return err + } + } + + return nil +} + +func StartTarantoolInstances(servers []string, workDirs []string, opts StartOpts) ([]TarantoolInstance, error) { + if len(servers) != len(workDirs) { + return nil, fmt.Errorf("number of servers should be equal to number of workDirs") + } + + instances := make([]TarantoolInstance, 0, len(servers)) + + for i, server := range servers { + opts.Listen = server + opts.WorkDir = workDirs[i] + + instance, err := StartTarantool(opts) + if err != nil { + StopTarantoolInstances(instances) + return nil, err + } + + instances = append(instances, instance) + } + + return instances, nil +} + +func StopTarantoolInstances(instances []TarantoolInstance) { + for _, instance := range instances { + StopTarantoolWithCleanup(instance) + } +}