Skip to content

Commit

Permalink
api: support iproto feature discovery
Browse files Browse the repository at this point in the history
Since version 2.10.0 Tarantool supports feature discovery [1]. Client
can send the schema version and supported features and receive
server-side schema version and supported features information to tune
its behavior.

After this patch, the request will be send on `dial`, where
authentication is performed. Connector stores server info in connection
internals. After that, user may call API handles to check if it is
possible to use a feature.

Feature check iterates over lists to check if feature is
enabled. It seems that iterating over a small list is way faster than
building a map, see [2]. Benchmark tests show that this check is rather
fast (0.5 ns for client and server check on HP ProBook 440 G5) so it is
not necessary to cache it in any way.

Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4
could be found in Tarantool source code but they were removed in the
following commits before the release and treated like they never
existed. We also ignore them here too. See [3] for more info.

1. tarantool/tarantool#6253
2. https://stackoverflow.com/a/52710077/11646599
3. tarantool/tarantool-python#262

Closes #120
  • Loading branch information
DifferentialOrange committed Nov 15, 2022
1 parent f05dac4 commit bb61bef
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 6 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.

### Added

- Support IProto feature discovery (#120).

### Changed

### Fixed
Expand Down
98 changes: 93 additions & 5 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ type Connection struct {
lenbuf [PacketLengthBytes]byte

lastStreamId uint64

serverProtocolVersion ProtocolVersion
serverFeatures []Feature
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -502,6 +505,13 @@ func (conn *Connection) dial() (err error) {
conn.Greeting.Version = bytes.NewBuffer(greeting[:64]).String()
conn.Greeting.auth = bytes.NewBuffer(greeting[64:108]).String()

// IPROTO_ID requests can be processed without authentication.
// https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/requests/#iproto-id
if err = conn.loadProtocolInfo(w, r); err != nil {
connection.Close()
return err
}

// Auth
if opts.User != "" {
scr, err := scramble(conn.Greeting.auth, opts.Pass)
Expand Down Expand Up @@ -603,33 +613,43 @@ func (conn *Connection) writeAuthRequest(w *bufio.Writer, scramble []byte) (err
return conn.writeRequest(w, req, "auth")
}

func (conn *Connection) readResponse(r io.Reader) (resp Response, err error) {
func (conn *Connection) writeProtocolInfoRequest(w *bufio.Writer, version ProtocolVersion,
features []Feature) (err error) {
req := newProtocolInfoRequest(version, features)
return conn.writeRequest(w, req, "iproto id")
}

func (conn *Connection) readResponse(r io.Reader, reqName string) (resp Response, err error) {
respBytes, err := conn.read(r)
if err != nil {
return resp, errors.New("auth: read error " + err.Error())
return resp, errors.New(reqName + ": read error " + err.Error())
}
resp = Response{buf: smallBuf{b: respBytes}}
err = resp.decodeHeader(conn.dec)
if err != nil {
return resp, errors.New("auth: decode response header error " + err.Error())
return resp, errors.New(reqName + ": decode response header error " + err.Error())
}
err = resp.decodeBody()
if err != nil {
switch err.(type) {
case Error:
return resp, err
default:
return resp, errors.New("auth: decode response body error " + err.Error())
return resp, errors.New(reqName + ": decode response body error " + err.Error())
}
}
return resp, nil
}

func (conn *Connection) readAuthResponse(r io.Reader) (err error) {
_, err = conn.readResponse(r)
_, err = conn.readResponse(r, "auth")
return err
}

func (conn *Connection) readProtocolInfoResponse(r io.Reader) (resp Response, err error) {
return conn.readResponse(r, "iproto id")
}

func (conn *Connection) createConnection(reconnect bool) (err error) {
var reconnects uint
for conn.c == nil && conn.state == connDisconnected {
Expand Down Expand Up @@ -1173,3 +1193,71 @@ func (conn *Connection) NewStream() (*Stream, error) {
Conn: conn,
}, nil
}

// loadProtocolInfo sends info about client protocol,
// receives info about server protocol in response
// and store in in connection serverProtocolInfo.
func (conn *Connection) loadProtocolInfo(w *bufio.Writer, r *bufio.Reader) error {
var resp Response
var err error

err = conn.writeProtocolInfoRequest(w, ClientProtocolVersion, ClientFeatures)
if err != nil {
return err
}

resp, err = conn.readProtocolInfoResponse(r)
if err != nil {
tarantoolError, ok := err.(Error)
if ok && tarantoolError.Code == ErrUnknownRequestType {
// IPROTO_ID requests are not supported by server.
conn.serverProtocolVersion = ProtocolVersionUnsupported
conn.serverFeatures = []Feature{}

return nil
}

return err
}

if len(resp.Data) == 0 {
return fmt.Errorf("Unexpected response on protocol info exchange: no data")
}

serverProtocolInfo, ok := resp.Data[0].(protocolInfo)
if !ok {
return fmt.Errorf("Unexpected response on protocol info exchange: wrong data")
}
conn.serverProtocolVersion = serverProtocolInfo.version
conn.serverFeatures = serverProtocolInfo.features

return nil
}

// ServerProtocolVersion returns protocol version supported by
// connected Tarantool server.
// Since 1.10.0
func (conn *Connection) ServerProtocolVersion() ProtocolVersion {
return conn.serverProtocolVersion
}

// ClientProtocolVersion returns protocol version supported by
// Go connection client.
// Since 1.10.0
func (conn *Connection) ClientProtocolVersion() ProtocolVersion {
return ClientProtocolVersion
}

// DoesServerSupportFeature checks if expected feature
// is supported by connected Tarantool server.
// Since 1.10.0
func (conn *Connection) DoesServerSupportFeature(feature Feature) bool {
return isFeatureSupported(feature, conn.serverFeatures)
}

// DoesClientSupportFeature checks if expected feature
// is supported by Go connection client.
// Since 1.10.0
func (conn *Connection) DoesClientSupportFeature(feature Feature) bool {
return isFeatureSupported(feature, ClientFeatures)
}
44 changes: 44 additions & 0 deletions connection_pool/connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,3 +1028,47 @@ func newErrorFuture(err error) *tarantool.Future {
fut.SetError(err)
return fut
}

// ServerProtocolVersion returns protocol version supported by
// Tarantool server for a connection selected by userMode from connPool.
// Since 1.10.0
func (connPool *ConnectionPool) ServerProtocolVersion(userMode Mode) (tarantool.ProtocolVersion, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return tarantool.ProtocolVersionUnsupported, err
}
return conn.ServerProtocolVersion(), nil
}

// ClientProtocolVersion returns protocol version supported by
// Go connection client for a connection selected by userMode from connPool.
// Since 1.10.0
func (connPool *ConnectionPool) ClientProtocolVersion(userMode Mode) (tarantool.ProtocolVersion, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return tarantool.ProtocolVersionUnsupported, err
}
return conn.ClientProtocolVersion(), nil
}

// DoesServerSupportFeature checks if expected feature is supported
// by Tarantool server for a connection selected by userMode from connPool.
// Since 1.10.0
func (connPool *ConnectionPool) DoesServerSupportFeature(feature tarantool.Feature, userMode Mode) (bool, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return false, err
}
return conn.DoesServerSupportFeature(feature), nil
}

// DoesClientSupportFeature checks if expected feature is supported
// by Go connection client for a connection selected by userMode from connPool.
// Since 1.10.0
func (connPool *ConnectionPool) DoesClientSupportFeature(feature tarantool.Feature, userMode Mode) (bool, error) {
conn, err := connPool.getNextConnection(userMode)
if err != nil {
return false, err
}
return conn.DoesClientSupportFeature(feature), nil
}
3 changes: 3 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
RollbackRequestCode = 16
PingRequestCode = 64
SubscribeRequestCode = 66
IdRequestCode = 73

KeyCode = 0x00
KeySync = 0x01
Expand All @@ -41,6 +42,8 @@ const (
KeySQLBind = 0x41
KeySQLInfo = 0x42
KeyStmtID = 0x43
KeyVersion = 0x54
KeyFeatures = 0x55
KeyTimeout = 0x56
KeyTxnIsolation = 0x59

Expand Down
44 changes: 44 additions & 0 deletions protocol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package tarantool

type ProtocolVersion uint64
type Feature uint64

type protocolInfo struct {
version ProtocolVersion
features []Feature
}

const ProtocolVersionUnsupported ProtocolVersion = 0

const (
// Streams support.
FeatureStreams Feature = 0
// Interactive transactions support.
FeatureTransactions Feature = 1
// Support of MP_ERROR object over MessagePack.
FeatureErrorExtension Feature = 2
// Support of watchers.
FeatureWatchers Feature = 3
)

// Protocol version supported by connector. Version 3
// was introduced in Tarantool 2.10.0 and used in latest 2.10.4.
const ClientProtocolVersion ProtocolVersion = 3

// Protocol features supported by connector.
var ClientFeatures = []Feature{
FeatureStreams,
FeatureTransactions,
}

func isFeatureSupported(feature Feature, supportedFeatures []Feature) bool {
// It seems that iterating over a small list is way faster
// than building a map: https://stackoverflow.com/a/52710077/11646599
for _, supportedFeature := range supportedFeatures {
if feature == supportedFeature {
return true
}
}

return false
}
57 changes: 57 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,60 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
req.ctx = ctx
return req
}

// protocolInfoRequest informs the server about supported protocol
// version and features.
type protocolInfoRequest struct {
baseRequest
protocolInfo
}

// newProtocolInfoRequest returns a new protocolInfoRequest.
func newProtocolInfoRequest(version ProtocolVersion,
features []Feature) *protocolInfoRequest {
req := new(protocolInfoRequest)
req.requestCode = IdRequestCode
req.version = version
req.features = features
return req
}

// Body fills an encoder with the protocol version request body.
func (req *protocolInfoRequest) Body(res SchemaResolver, enc *encoder) error {
return req.fillProtocolInfoRequest(enc)
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *protocolInfoRequest) Context(ctx context.Context) *protocolInfoRequest {
req.ctx = ctx
return req
}

func (req *protocolInfoRequest) fillProtocolInfoRequest(enc *encoder) error {
enc.EncodeMapLen(2)

encodeUint(enc, KeyVersion)
if err := enc.Encode(req.version); err != nil {
return err
}

encodeUint(enc, KeyFeatures)

t := len(req.features)
if err := enc.EncodeArrayLen(t); err != nil {
return err
}

for _, feature := range req.features {
if err := enc.Encode(feature); err != nil {
return err
}
}

return nil
}
31 changes: 30 additions & 1 deletion response.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,14 @@ func (resp *Response) decodeBody() (err error) {
offset := resp.buf.Offset()
defer resp.buf.Seek(offset)

var l int
var l, larr int
var stmtID, bindCount uint64
var serverProtocolInfo protocolInfo = protocolInfo{
version: ProtocolVersionUnsupported,
features: []Feature{},
}
var feature Feature
isProtocolInfoResponse := false

d := newDecoder(&resp.buf)

Expand Down Expand Up @@ -190,6 +196,24 @@ func (resp *Response) decodeBody() (err error) {
if bindCount, err = d.DecodeUint64(); err != nil {
return err
}
case KeyVersion:
isProtocolInfoResponse = true
if err = d.Decode(&serverProtocolInfo.version); err != nil {
return err
}
case KeyFeatures:
isProtocolInfoResponse = true
if larr, err = d.DecodeArrayLen(); err != nil {
return err
}

serverProtocolInfo.features = make([]Feature, larr)
for i := 0; i < larr; i++ {
if err = d.Decode(&feature); err != nil {
return err
}
serverProtocolInfo.features[i] = feature
}
default:
if err = d.Skip(); err != nil {
return err
Expand All @@ -204,6 +228,11 @@ func (resp *Response) decodeBody() (err error) {
}
resp.Data = []interface{}{stmt}
}

if isProtocolInfoResponse {
resp.Data = []interface{}{serverProtocolInfo}
}

if resp.Code != OkCode && resp.Code != PushCode {
resp.Code &^= ErrorCodeBit
err = Error{resp.Code, resp.Error}
Expand Down
Loading

0 comments on commit bb61bef

Please sign in to comment.