Skip to content

api: write a connection schema getter #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- Support `IPROTO_FEATURE_SPACE_AND_INDEX_NAMES` for Tarantool
version >= 3.0.0-alpha1 (#338). It allows to use space and index names
in requests instead of their IDs.
- `GetSchema` function to get the actual schema (#7)

### Changed

Expand All @@ -51,6 +52,9 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
instead of `crud.OptUint` (#342)
- Change all `Upsert` and `Update` requests to accept `*tarantool.Operations`
as `ops` parameters instead of `interface{}` (#348)
- Change `OverrideSchema(*Schema)` to `SetSchema(Schema)` (#7)
- Change values, stored by pointers in the `Schema`, `Space`, `Index` structs,
to be stored by their values (#7)

### Deprecated

Expand All @@ -70,6 +74,7 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
- UUID_extId (#158)
- IPROTO constants (#158)
- Code() method from the Request interface (#158)
- `Schema` field from the `Connection` struct (#7)

### Fixed

Expand Down
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,13 @@ now does not attempt to reconnect and tries to establish a connection only once.
Function might be canceled via context. Context accepted as first argument,
and user may cancel it in process.

#### Connection schema

* Removed `Schema` field from the `Connection` struct. Instead, new
`GetSchema(Connector)` function was added to get the actual connection
schema on demand.
* `OverrideSchema(*Schema)` method replaced with the `SetSchema(Schema)`.

#### Protocol changes

* `iproto.Feature` type used instead of `ProtocolFeature`.
Expand All @@ -260,6 +267,10 @@ and user may cancel it in process.
interface to get information if the usage of space and index names in requests
is supported.
* `Schema` structure no longer implements `SchemaResolver` interface.
* `Spaces` and `SpacesById` fields of the `Schema` struct store spaces by value.
* `Fields` and `FieldsById` fields of the `Space` struct store fields by value.
`Index` and `IndexById` fields of the `Space` struct store indexes by value.
* `Fields` field of the `Index` struct store `IndexField` by value.

## Contributing

Expand Down
28 changes: 17 additions & 11 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,6 @@ type Connection struct {
c Conn
mutex sync.Mutex
cond *sync.Cond
// Schema contains schema loaded on connection.
Schema *Schema
// schemaResolver contains a SchemaResolver implementation.
schemaResolver SchemaResolver
// requestId contains the last request ID for requests with nil context.
Expand Down Expand Up @@ -436,12 +434,14 @@ func Connect(ctx context.Context, addr string, opts Opts) (conn *Connection, err

// TODO: reload schema after reconnect.
if !conn.opts.SkipSchema {
if err = conn.loadSchema(); err != nil {
schema, err := GetSchema(conn)
if err != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.closeConnection(err, true)
return nil, err
}
conn.SetSchema(schema)
}

return conn, err
Expand Down Expand Up @@ -1302,15 +1302,21 @@ func (conn *Connection) ConfiguredTimeout() time.Duration {
return conn.opts.Timeout
}

// OverrideSchema sets Schema for the connection.
func (conn *Connection) OverrideSchema(s *Schema) {
if s != nil {
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()
// SetSchema sets Schema for the connection.
func (conn *Connection) SetSchema(s Schema) {
sCopy := s.copy()
spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.Schema = s
conn.mutex.Lock()
defer conn.mutex.Unlock()
conn.lockShards()
defer conn.unlockShards()

conn.schemaResolver = &loadedSchemaResolver{
Schema: sCopy,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
}

Expand Down
26 changes: 23 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,10 @@ func ExampleSchema() {
conn := exampleConnect(opts)
defer conn.Close()

schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand All @@ -1080,13 +1083,30 @@ func ExampleSchema() {
// Space 2 ID 616 schematest
}

// Example demonstrates how to update the connection schema.
func ExampleConnection_SetSchema() {
conn := exampleConnect(opts)
defer conn.Close()

// Get the actual schema.
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
// Update the current schema to match the actual one.
conn.SetSchema(schema)
}

// Example demonstrates how to retrieve information with space schema.
func ExampleSpace() {
conn := exampleConnect(opts)
defer conn.Close()

// Save Schema to a local variable to avoid races
schema := conn.Schema
schema, err := tarantool.GetSchema(conn)
if err != nil {
fmt.Printf("unexpected error: %s\n", err.Error())
}
if schema.SpacesById == nil {
fmt.Println("schema.SpacesById is nil")
}
Expand Down Expand Up @@ -1120,7 +1140,7 @@ func ExampleSpace() {
// Space 1 ID 617 test memtx
// Space 1 ID 0 false
// Index 0 primary
// &{0 unsigned} &{2 string}
// {0 unsigned} {2 string}
// SpaceField 1 name0 unsigned
// SpaceField 2 name3 unsigned
}
Expand Down
107 changes: 68 additions & 39 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"

"github.com/tarantool/go-iproto"
"github.com/vmihailenco/msgpack/v5"
"github.com/vmihailenco/msgpack/v5/msgpcode"
)
Expand Down Expand Up @@ -58,9 +57,22 @@ type SchemaResolver interface {
type Schema struct {
Version uint
// Spaces is map from space names to spaces.
Spaces map[string]*Space
Spaces map[string]Space
// SpacesById is map from space numbers to spaces.
SpacesById map[uint32]*Space
SpacesById map[uint32]Space
}

func (schema *Schema) copy() Schema {
schemaCopy := *schema
schemaCopy.Spaces = make(map[string]Space, len(schema.Spaces))
for name, space := range schema.Spaces {
schemaCopy.Spaces[name] = space.copy()
}
schemaCopy.SpacesById = make(map[uint32]Space, len(schema.SpacesById))
for id, space := range schema.SpacesById {
schemaCopy.SpacesById[id] = space.copy()
}
return schemaCopy
}

// Space contains information about Tarantool's space.
Expand All @@ -72,12 +84,33 @@ type Space struct {
Temporary bool // Is this space temporary?
// Field configuration is not mandatory and not checked by Tarantool.
FieldsCount uint32
Fields map[string]*Field
FieldsById map[uint32]*Field
Fields map[string]Field
FieldsById map[uint32]Field
// Indexes is map from index names to indexes.
Indexes map[string]*Index
Indexes map[string]Index
// IndexesById is map from index numbers to indexes.
IndexesById map[uint32]*Index
IndexesById map[uint32]Index
}

func (space *Space) copy() Space {
spaceCopy := *space
spaceCopy.Fields = make(map[string]Field, len(space.Fields))
for name, field := range space.Fields {
spaceCopy.Fields[name] = field
}
spaceCopy.FieldsById = make(map[uint32]Field, len(space.FieldsById))
for id, field := range space.FieldsById {
spaceCopy.FieldsById[id] = field
}
spaceCopy.Indexes = make(map[string]Index, len(space.Indexes))
for name, index := range space.Indexes {
spaceCopy.Indexes[name] = index.copy()
}
spaceCopy.IndexesById = make(map[uint32]Index, len(space.IndexesById))
for id, index := range space.IndexesById {
spaceCopy.IndexesById[id] = index.copy()
}
return spaceCopy
}

func (space *Space) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -135,17 +168,17 @@ func (space *Space) DecodeMsgpack(d *msgpack.Decoder) error {
return errors.New("unexpected schema format (space flags)")
}
}
space.FieldsById = make(map[uint32]*Field)
space.Fields = make(map[string]*Field)
space.IndexesById = make(map[uint32]*Index)
space.Indexes = make(map[string]*Index)
space.FieldsById = make(map[uint32]Field)
space.Fields = make(map[string]Field)
space.IndexesById = make(map[uint32]Index)
space.Indexes = make(map[string]Index)
if arrayLen >= vspaceSpFormatFieldNum {
fieldCount, err := d.DecodeArrayLen()
if err != nil {
return err
}
for i := 0; i < fieldCount; i++ {
field := &Field{}
field := Field{}
if err := field.DecodeMsgpack(d); err != nil {
return err
}
Expand Down Expand Up @@ -206,7 +239,14 @@ type Index struct {
Name string
Type string
Unique bool
Fields []*IndexField
Fields []IndexField
}

func (index *Index) copy() Index {
indexCopy := *index
indexCopy.Fields = make([]IndexField, len(index.Fields))
copy(indexCopy.Fields, index.Fields)
return indexCopy
}

func (index *Index) DecodeMsgpack(d *msgpack.Decoder) error {
Expand Down Expand Up @@ -261,9 +301,9 @@ func (index *Index) DecodeMsgpack(d *msgpack.Decoder) error {
if err != nil {
return err
}
index.Fields = make([]*IndexField, fieldCount)
index.Fields = make([]IndexField, fieldCount)
for i := 0; i < int(fieldCount); i++ {
index.Fields[i] = new(IndexField)
index.Fields[i] = IndexField{}
if index.Fields[i].Id, err = d.DecodeUint32(); err != nil {
return err
}
Expand Down Expand Up @@ -340,51 +380,40 @@ func (indexField *IndexField) DecodeMsgpack(d *msgpack.Decoder) error {
return errors.New("unexpected schema format (index fields)")
}

func (conn *Connection) loadSchema() (err error) {
schema := new(Schema)
schema.SpacesById = make(map[uint32]*Space)
schema.Spaces = make(map[string]*Space)
// GetSchema returns the actual schema for the connection.
func GetSchema(conn Connector) (Schema, error) {
schema := Schema{}
schema.SpacesById = make(map[uint32]Space)
schema.Spaces = make(map[string]Space)

// Reload spaces.
var spaces []*Space
err = conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
var spaces []Space
err := conn.SelectTyped(vspaceSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &spaces)
if err != nil {
return err
return Schema{}, err
}
for _, space := range spaces {
schema.SpacesById[space.Id] = space
schema.Spaces[space.Name] = space
}

// Reload indexes.
var indexes []*Index
var indexes []Index
err = conn.SelectTyped(vindexSpId, 0, 0, maxSchemas, IterAll, []interface{}{}, &indexes)
if err != nil {
return err
return Schema{}, err
}
for _, index := range indexes {
spaceId := index.SpaceId
if _, ok := schema.SpacesById[spaceId]; ok {
schema.SpacesById[spaceId].IndexesById[index.Id] = index
schema.SpacesById[spaceId].Indexes[index.Name] = index
} else {
return errors.New("concurrent schema update")
return Schema{}, errors.New("concurrent schema update")
}
}

spaceAndIndexNamesSupported :=
isFeatureInSlice(iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES,
conn.serverProtocolInfo.Features)

conn.lockShards()
conn.Schema = schema
conn.schemaResolver = &loadedSchemaResolver{
Schema: schema,
SpaceAndIndexNamesSupported: spaceAndIndexNamesSupported,
}
conn.unlockShards()

return nil
return schema, nil
}

// resolveSpaceNumber tries to resolve a space number.
Expand Down Expand Up @@ -462,7 +491,7 @@ func resolveIndexNumber(i interface{}) (uint32, error) {
}

type loadedSchemaResolver struct {
Schema *Schema
Schema Schema
// SpaceAndIndexNamesSupported shows if a current Tarantool version supports
// iproto.IPROTO_FEATURE_SPACE_AND_INDEX_NAMES.
SpaceAndIndexNamesSupported bool
Expand Down
Loading