Skip to content

Commit

Permalink
iproto: support feature discovery
Browse files Browse the repository at this point in the history
Since version 2.10.0 Tarantool supports feature discovery [1]. Client
can send the schema version and supported features and receive
server-side schema version and supported features information to tune
its behavior.

After this patch, the request will be send on `Connect`. Connector will
use protocol version that is minimal of connector version (now it's 3)
and server version. Feature will be enabled if both client and server
supports it (for now client does not support any features from the
list). Unknown request type error response is expected for pre-2.10.0
versions. In this case, protocol version would be
ProtocolVersionUnsupported (-1) and no features would be enabled.

For now it doesn't seems like exposing protocol version and enabled
features is useful for a client so private variables are used to store
this info. Getters added in export_test for tests.

Traces of IPROTO_FEATURE_GRACEFUL_SHUTDOWN flag and protocol version 4
could be found in Tarantool source code but they were removed in the
following commits before the release and treated like they never
existed. We also ignore them here too. See [2] for more info.

1. tarantool/tarantool#6253
2. tarantool/tarantool-python#262

Closes #120
  • Loading branch information
DifferentialOrange committed Nov 11, 2022
1 parent 48cf0c7 commit 253de07
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 2 deletions.
74 changes: 74 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
type ConnEventKind int
type ConnLogKind int

var clientProtocolVersion int64 = 3
var clientFeatures []uint64 = []uint64{}

const (
// Connected signals that connection is established or reestablished.
Connected ConnEventKind = iota + 1
Expand Down Expand Up @@ -146,6 +149,13 @@ type Connection struct {
lenbuf [PacketLengthBytes]byte

lastStreamId uint64

// protocolVersion is IProto max protocol version supported both by
// client and server. Equal to ProtocolVersionUnsupported is server
// does not support IPROTO_ID.
protocolVersion int64
// features contains the list of features supported both by client and server.
features []uint64
}

var _ = Connector(&Connection{}) // Check compatibility with connector interface.
Expand Down Expand Up @@ -391,6 +401,10 @@ func Connect(addr string, opts Opts) (conn *Connection, err error) {
}
}

if err = conn.checkFeatures(); err != nil {
return nil, err
}

return conn, err
}

Expand Down Expand Up @@ -1163,3 +1177,63 @@ func (conn *Connection) NewStream() (*Stream, error) {
Conn: conn,
}, nil
}

func (conn *Connection) checkFeatures() error {
resp, err := conn.exchangeProtocolVersion(clientProtocolVersion, clientFeatures)

if err != nil {
if resp.Code == ErrUnknownRequestType {
// IPROTO_ID is not supported by server.
conn.protocolVersion = ProtocolVersionUnsupported
conn.features = []uint64{}
return nil
}

return err
}

if len(resp.Data) == 0 {
return fmt.Errorf("Unexpected response on protocol version exchange: no data")
}

server, ok := resp.Data[0].(*protocolVersionResponse)
if !ok {
return fmt.Errorf("Unexpected response on protocol version exchange: wrong data")
}

conn.protocolVersion = minInt64(server.protocolVersion, clientProtocolVersion)
conn.features = intersect(server.features, clientFeatures)

return nil
}

func intersect(arr1 []uint64, arr2 []uint64) []uint64 {
var res []uint64 = make([]uint64, minInt(len(arr1), len(arr2)))
var i int = 0

for _, el1 := range arr1 {
for _, el2 := range arr2 {
if el1 == el2 {
res[i] = (el1)
}
}
}

return res[0:i]
}

func minInt(v1 int, v2 int) int {
if v1 > v2 {
return v2
}

return v1
}

func minInt64(v1 int64, v2 int64) int64 {
if v1 > v2 {
return v2
}

return v1
}
10 changes: 10 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
RollbackRequestCode = 16
PingRequestCode = 64
SubscribeRequestCode = 66
IdRequestCode = 73

KeyCode = 0x00
KeySync = 0x01
Expand All @@ -41,6 +42,8 @@ const (
KeySQLBind = 0x41
KeySQLInfo = 0x42
KeyStmtID = 0x43
KeyVersion = 0x54
KeyFeatures = 0x55
KeyTimeout = 0x56
KeyTxnIsolation = 0x59

Expand Down Expand Up @@ -69,10 +72,17 @@ const (
RLimitDrop = 1
RLimitWait = 2

FeatureStreams = uint64(0)
FeatureTransactions = uint64(1)
FeatureErrorExtension = uint64(2)
FeatureWatchers = uint64(3)

OkCode = uint32(0)
PushCode = uint32(0x80)
ErrorCodeBit = 0x8000
PacketLengthBytes = 5
ErSpaceExistsCode = 0xa
IteratorCode = 0x14

ProtocolVersionUnsupported = int64(-1)
)
8 changes: 8 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func RefImplRollbackBody(enc *encoder) error {
func NewEncoder(w io.Writer) *encoder {
return newEncoder(w)
}

func GetProtocolVersion(conn *Connection) int64 {
return conn.protocolVersion
}

func GetFeatures(conn *Connection) []uint64 {
return conn.features
}
66 changes: 66 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,3 +1106,69 @@ func (req *ExecuteRequest) Context(ctx context.Context) *ExecuteRequest {
req.ctx = ctx
return req
}

// protocolVersionRequest informs the server about supported protocol
// version and features.
type protocolVersionRequest struct {
baseRequest
// TODO: or uint32? spaceId is uint32. On the other hand,
// spaceId is casted to uint64 underneath and protocolVersionRequest
// is not a request user would send explicitly.
// Need to ask Oleg about it.
protocolVersion int64
features []uint64
}

// newProtocolVersionRequest returns a new protocolVersionRequest.
func newProtocolVersionRequest(protocolVersion int64,
features []uint64) *protocolVersionRequest {
req := new(protocolVersionRequest)
req.requestCode = IdRequestCode
req.protocolVersion = protocolVersion
req.features = features
return req
}

// exchangeProtocolVersion sends info about client protocol
// and receives info about server protocol in response.
func (conn *Connection) exchangeProtocolVersion(protocolVersion int64,
features []uint64) (resp *Response, err error) {
req := newProtocolVersionRequest(protocolVersion, features)
return conn.Do(req).Get()
}

// Body fills an encoder with the protocol version request body.
func (req *protocolVersionRequest) Body(res SchemaResolver, enc *encoder) error {
return req.fillProtocolVersionRequest(enc)
}

// Context sets a passed context to the request.
//
// Pay attention that when using context with request objects,
// the timeout option for Connection does not affect the lifetime
// of the request. For those purposes use context.WithTimeout() as
// the root context.
func (req *protocolVersionRequest) Context(ctx context.Context) *protocolVersionRequest {
req.ctx = ctx
return req
}

func (req *protocolVersionRequest) fillProtocolVersionRequest(enc *encoder) error {
enc.EncodeMapLen(2)

encodeUint(enc, KeyVersion)
encodeInt(enc, req.protocolVersion)

encodeUint(enc, KeyFeatures)

t := len(req.features)
if err := enc.EncodeArrayLen(t); err != nil {
return err
}

for _, v := range req.features {
encodeUint(enc, v)
}

return nil
}
38 changes: 36 additions & 2 deletions response.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ type SQLInfo struct {
InfoAutoincrementIds []uint64
}

type protocolVersionResponse struct {
protocolVersion int64
features []uint64
}

func (meta *ColumnMetaData) DecodeMsgpack(d *decoder) error {
var err error
var l int
Expand Down Expand Up @@ -147,8 +152,11 @@ func (resp *Response) decodeBody() (err error) {
offset := resp.buf.Offset()
defer resp.buf.Seek(offset)

var l int
var stmtID, bindCount uint64
var l, larr int
var stmtID, bindCount, feature uint64
var protocolVersion int64
var features []uint64
isProtocolVersionResponse := false

d := newDecoder(&resp.buf)

Expand Down Expand Up @@ -190,6 +198,23 @@ func (resp *Response) decodeBody() (err error) {
if bindCount, err = d.DecodeUint64(); err != nil {
return err
}
case KeyVersion:
isProtocolVersionResponse = true
if protocolVersion, err = d.DecodeInt64(); err != nil {
return err
}
case KeyFeatures:
isProtocolVersionResponse = true
if larr, err = d.DecodeArrayLen(); err != nil {
return err
}
features = make([]uint64, larr)
for i := 0; i < larr; i++ {
if feature, err = d.DecodeUint64(); err != nil {
return err
}
features[i] = feature
}
default:
if err = d.Skip(); err != nil {
return err
Expand All @@ -204,6 +229,15 @@ func (resp *Response) decodeBody() (err error) {
}
resp.Data = []interface{}{stmt}
}

if isProtocolVersionResponse {
data := &protocolVersionResponse{
protocolVersion: protocolVersion,
features: features,
}
resp.Data = []interface{}{data}
}

if resp.Code != OkCode && resp.Code != PushCode {
resp.Code &^= ErrorCodeBit
err = Error{resp.Code, resp.Error}
Expand Down
26 changes: 26 additions & 0 deletions tarantool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2830,6 +2830,32 @@ func TestStream_DoWithClosedConn(t *testing.T) {
}
}

func TestConnectionProtocolVersion(t *testing.T) {
conn := test_helpers.ConnectWithValidation(t, server, opts)

isProtocolVersionUnsupported, err := test_helpers.IsTarantoolVersionLess(2, 10, 0)
if err != nil {
t.Fatalf("Unexpected error has been caught: %s", err.Error())
}

protocolVersion := GetProtocolVersion(conn)
features := GetFeatures(conn)

if isProtocolVersionUnsupported {
require.Equal(t, protocolVersion, ProtocolVersionUnsupported)
require.ElementsMatch(t, features, []uint64{})
} else {
// First Tarantool protocol version (1) was introduced between
// 2.10.0-beta1 and 2.10.0-beta2. Versions 2 and 3 were also
// introduced between 2.10.0-beta1 and 2.10.0-beta2. Version 4
// was introduced between 2.10.0-beta2 and 2.10.0-rc1 and reverted
// back to version 3 in the same version interval.
// Tarantool 2.10.3 still has version 3.
require.GreaterOrEqual(t, protocolVersion, int64(3))
require.ElementsMatch(t, features, []uint64{})
}
}

// runTestMain is a body of TestMain function
// (see https://pkg.go.dev/testing#hdr-Main).
// Using defer + os.Exit is not works so TestMain body
Expand Down

0 comments on commit 253de07

Please sign in to comment.