diff --git a/README.md b/README.md index 59325045a..9757d386d 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ Yorkie is an open source document store for building collaborative editing appli Yorkie consists of three main components: Client, Document and Agent. ``` - Client "A" (Go) Agent Mongo DB + Client "A" (Go) Agent MemDB or MongoDB ┌───────────────────┐ ┌────────────────────────┐ ┌───────────┐ │ Document "D-1" │◄─Changes─►│ Collection "C-1" │ │ Changes │ │ { a: 1, b: {} } │ │ ┌───────────────────┐ │◄─►│ Snapshots │ @@ -29,8 +29,8 @@ Yorkie consists of three main components: Client, Document and Agent. └───────────────────┘ │ └───────────────────┘ │ Client "C" (Admin) │ │ ┌────────────────────┐ └────────────────────────┘ -│ Query "Q-1" │ ▲ -│ db[c-1].find({a:2})├─MongoDB Query─┘ +│ Query "Q-1" │ ▲ +│ db[c-1].find({a:2})├───DB Query───┘ └────────────────────┘ ``` @@ -59,8 +59,6 @@ Yorkie: 0.1.8 ... ``` -Yorkie uses MongoDB to store its data. To start MongoDB, type `docker-compose -f docker/docker-compose.yml up -d`. - Next, let's start a Yorkie agent. Agent runs until they're told to quit and handle the communication of maintenance tasks of Agent. and start the agent: ``` diff --git a/go.mod b/go.mod index c477e1384..2a2cbf09e 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,13 @@ require ( github.com/google/uuid v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.2.2 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 + github.com/hashicorp/go-memdb v1.3.2 github.com/moby/locker v1.0.1 github.com/prometheus/client_golang v1.11.0 github.com/rs/xid v1.2.1 github.com/spf13/cobra v1.1.3 github.com/stretchr/testify v1.7.0 + go.etcd.io/etcd/api/v3 v3.5.1 go.etcd.io/etcd/client/v3 v3.5.1 go.mongodb.org/mongo-driver v1.5.1 go.uber.org/zap v1.17.0 @@ -77,7 +79,9 @@ require ( github.com/gostaticanalysis/forcetypeassert v0.0.0-20200621232751-01d4955beaa5 // indirect github.com/gostaticanalysis/nilerr v0.1.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-immutable-radix v1.3.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jgautheron/goconst v1.5.1 // indirect @@ -150,7 +154,6 @@ require ( github.com/xdg-go/stringprep v1.0.2 // indirect github.com/yeya24/promlinter v0.1.0 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - go.etcd.io/etcd/api/v3 v3.5.1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.1 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect diff --git a/go.sum b/go.sum index a0589ff36..d177ceced 100644 --- a/go.sum +++ b/go.sum @@ -355,6 +355,10 @@ github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/U github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.3.2 h1:RBKHOsnSszpU6vxq80LzC2BaQjuuvoyaQbkLTf7V7g8= +github.com/hashicorp/go-memdb v1.3.2/go.mod h1:Mluclgwib3R93Hk5fxEfiRhB+6Dar64wWh71LpNSe3g= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= @@ -363,10 +367,12 @@ github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= diff --git a/internal/cli/agent.go b/internal/cli/agent.go index fc4a23cf8..a9d7000a8 100644 --- a/internal/cli/agent.go +++ b/internal/cli/agent.go @@ -27,6 +27,7 @@ import ( "github.com/yorkie-team/yorkie/internal/log" "github.com/yorkie-team/yorkie/yorkie" + "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" ) @@ -37,7 +38,9 @@ var ( var ( flagConfPath string + mongoConnectionURI string mongoConnectionTimeout time.Duration + mongoYorkieDatabase string mongoPingTimeout time.Duration authWebhookMaxWaitInterval time.Duration @@ -58,12 +61,19 @@ func newAgentCmd() *cobra.Command { Use: "agent [options]", Short: "Starts yorkie agent", RunE: func(cmd *cobra.Command, args []string) error { - conf.Mongo.ConnectionTimeout = mongoConnectionTimeout.String() - conf.Mongo.PingTimeout = mongoPingTimeout.String() conf.Backend.AuthWebhookMaxWaitInterval = authWebhookMaxWaitInterval.String() conf.Backend.AuthWebhookCacheAuthTTL = authWebhookCacheAuthTTL.String() conf.Backend.AuthWebhookCacheUnauthTTL = authWebhookCacheUnauthTTL.String() + if mongoConnectionURI != "" { + conf.Mongo = &mongo.Config{ + ConnectionURI: mongoConnectionURI, + ConnectionTimeout: mongoConnectionTimeout.String(), + YorkieDatabase: mongoYorkieDatabase, + PingTimeout: mongoPingTimeout.String(), + } + } + if etcdEndpoints != nil { conf.ETCD = &etcd.Config{ Endpoints: etcdEndpoints, @@ -188,6 +198,12 @@ func init() { false, "Enable runtime profiling data via HTTP server.", ) + cmd.Flags().StringVar( + &mongoConnectionURI, + "mongo-connection-uri", + "", + "MongoDB's connection URI", + ) cmd.Flags().DurationVar( &mongoConnectionTimeout, "mongo-connection-timeout", @@ -195,13 +211,7 @@ func init() { "Mongo DB's connection timeout", ) cmd.Flags().StringVar( - &conf.Mongo.ConnectionURI, - "mongo-connection-uri", - yorkie.DefaultMongoConnectionURI, - "MongoDB's connection URI", - ) - cmd.Flags().StringVar( - &conf.Mongo.YorkieDatabase, + &mongoYorkieDatabase, "mongo-yorkie-database", yorkie.DefaultMongoYorkieDatabase, "Yorkie's database name in MongoDB", diff --git a/pkg/document/document.go b/pkg/document/document.go index 43964f071..e72ff22da 100644 --- a/pkg/document/document.go +++ b/pkg/document/document.go @@ -129,6 +129,11 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error { return nil } +// InternalDocument returns the internal document. +func (d *Document) InternalDocument() *InternalDocument { + return d.doc +} + // Key returns the key of this document. func (d *Document) Key() *key.Key { return d.doc.key diff --git a/yorkie/backend/backend.go b/yorkie/backend/backend.go index da92fac42..ce9695524 100644 --- a/yorkie/backend/backend.go +++ b/yorkie/backend/backend.go @@ -26,10 +26,11 @@ import ( "github.com/yorkie-team/yorkie/internal/log" "github.com/yorkie-team/yorkie/pkg/cache" "github.com/yorkie-team/yorkie/yorkie/backend/db" + memdb "github.com/yorkie-team/yorkie/yorkie/backend/db/memory" "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/sync" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" - "github.com/yorkie-team/yorkie/yorkie/backend/sync/memory" + memsync "github.com/yorkie-team/yorkie/yorkie/backend/sync/memory" "github.com/yorkie-team/yorkie/yorkie/profiling/prometheus" ) @@ -77,9 +78,17 @@ func New( UpdatedAt: time.Now(), } - mongoClient, err := mongo.Dial(mongoConf) - if err != nil { - return nil, err + var database db.DB + if mongoConf != nil { + database, err = mongo.Dial(mongoConf) + if err != nil { + return nil, err + } + } else { + database, err = memdb.New() + if err != nil { + return nil, err + } } var coordinator sync.Coordinator @@ -94,13 +103,19 @@ func New( coordinator = etcdClient } else { - coordinator = memory.NewCoordinator(agentInfo) + coordinator = memsync.NewCoordinator(agentInfo) + } + + dbInfo := "memory" + if mongoConf != nil { + dbInfo = mongoConf.ConnectionURI } log.Logger.Infof( - "backend created: id: %s, rpc: %s", + "backend created: id: %s, rpc: %s: db: %s", agentInfo.ID, agentInfo.RPCAddr, + dbInfo, ) lruCache, err := cache.NewLRUExpireCache(authWebhookCacheSize) @@ -111,7 +126,7 @@ func New( return &Backend{ Config: conf, agentInfo: agentInfo, - DB: mongoClient, + DB: database, Coordinator: coordinator, Metrics: metrics, AuthWebhookCache: lruCache, diff --git a/yorkie/backend/db/change_info.go b/yorkie/backend/db/change_info.go index d87913cee..6599c9aec 100644 --- a/yorkie/backend/db/change_info.go +++ b/yorkie/backend/db/change_info.go @@ -29,6 +29,7 @@ import ( // ChangeInfo is a structure representing information of a change. type ChangeInfo struct { + ID ID `bson:"_id"` DocID ID `bson:"doc_id"` ServerSeq uint64 `bson:"server_seq"` ClientSeq uint32 `bson:"client_seq"` diff --git a/yorkie/backend/db/client_info.go b/yorkie/backend/db/client_info.go index 4424ca1d3..2205057ff 100644 --- a/yorkie/backend/db/client_info.go +++ b/yorkie/backend/db/client_info.go @@ -146,6 +146,31 @@ func (i *ClientInfo) EnsureDocumentAttached(docID ID) error { return nil } +// DeepCopy returns a deep copy of this client info. +func (i *ClientInfo) DeepCopy() *ClientInfo { + if i == nil { + return nil + } + + documents := make(map[ID]*ClientDocInfo, len(i.Documents)) + for k, v := range i.Documents { + documents[k] = &ClientDocInfo{ + Status: v.Status, + ServerSeq: v.ServerSeq, + ClientSeq: v.ClientSeq, + } + } + + return &ClientInfo{ + ID: i.ID, + Key: i.Key, + Status: i.Status, + Documents: documents, + CreatedAt: i.CreatedAt, + UpdatedAt: i.UpdatedAt, + } +} + func (i *ClientInfo) hasDocument(docID ID) bool { return i.Documents != nil && i.Documents[docID] != nil } diff --git a/yorkie/backend/db/db.go b/yorkie/backend/db/db.go index 362d613be..4b59b61e2 100644 --- a/yorkie/backend/db/db.go +++ b/yorkie/backend/db/db.go @@ -33,12 +33,12 @@ func (id ID) String() string { } // Bytes returns bytes of decoded hexadecimal string representation of this ID. -func (id ID) Bytes() []byte { +func (id ID) Bytes() ([]byte, error) { decoded, err := hex.DecodeString(id.String()) if err != nil { - return nil + return nil, err } - return decoded + return decoded, nil } // IDFromBytes returns ID represented by the encoded hexadecimal string from bytes. @@ -74,17 +74,14 @@ type DB interface { createDocIfNotExist bool, ) (*DocInfo, error) - // StoreChangeInfos stores the given changes then updates the given docInfo. - StoreChangeInfos( + // CreateChangeInfos stores the given changes then updates the given docInfo. + CreateChangeInfos( ctx context.Context, docInfo *DocInfo, initialServerSeq uint64, changes []*change.Change, ) error - // CreateSnapshotInfo stores the snapshot of the given document. - CreateSnapshotInfo(ctx context.Context, docID ID, doc *document.InternalDocument) error - // FindChangesBetweenServerSeqs returns the changes between two server sequences. FindChangesBetweenServerSeqs( ctx context.Context, @@ -101,6 +98,12 @@ type DB interface { to uint64, ) ([]*ChangeInfo, error) + // CreateSnapshotInfo stores the snapshot of the given document. + CreateSnapshotInfo(ctx context.Context, docID ID, doc *document.InternalDocument) error + + // FindLastSnapshotInfo finds the last snapshot of the given document. + FindLastSnapshotInfo(ctx context.Context, docID ID) (*SnapshotInfo, error) + // UpdateAndFindMinSyncedTicket updates the given serverSeq of the given client // and returns the min synced ticket. UpdateAndFindMinSyncedTicket( @@ -109,7 +112,4 @@ type DB interface { docID ID, serverSeq uint64, ) (*time.Ticket, error) - - // FindLastSnapshotInfo finds the last snapshot of the given document. - FindLastSnapshotInfo(ctx context.Context, docID ID) (*SnapshotInfo, error) } diff --git a/yorkie/backend/db/db_test.go b/yorkie/backend/db/db_test.go index 1528174c5..1f82dfae2 100644 --- a/yorkie/backend/db/db_test.go +++ b/yorkie/backend/db/db_test.go @@ -18,6 +18,8 @@ func TestID(t *testing.T) { t.Run("get ID from bytes test", func(t *testing.T) { bytes := make([]byte, 12) ID := db.IDFromBytes(bytes) - assert.Equal(t, bytes, ID.Bytes()) + bytesID, err := ID.Bytes() + assert.NoError(t, err) + assert.Equal(t, bytes, bytesID) }) } diff --git a/yorkie/backend/db/doc_info.go b/yorkie/backend/db/doc_info.go index 2e08cb765..2080f74eb 100644 --- a/yorkie/backend/db/doc_info.go +++ b/yorkie/backend/db/doc_info.go @@ -48,3 +48,20 @@ func (info *DocInfo) GetKey() (*key.Key, error) { return docKey, nil } + +// DeepCopy creates a deep copy of this DocInfo. +func (info *DocInfo) DeepCopy() *DocInfo { + if info == nil { + return nil + } + + return &DocInfo{ + ID: info.ID, + Key: info.Key, + ServerSeq: info.ServerSeq, + Owner: info.Owner, + CreatedAt: info.CreatedAt, + AccessedAt: info.AccessedAt, + UpdatedAt: info.UpdatedAt, + } +} diff --git a/yorkie/backend/db/memory/db.go b/yorkie/backend/db/memory/db.go new file mode 100644 index 000000000..bc790585f --- /dev/null +++ b/yorkie/backend/db/memory/db.go @@ -0,0 +1,516 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import ( + "context" + "fmt" + "math" + gotime "time" + + "github.com/hashicorp/go-memdb" + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/yorkie-team/yorkie/api/converter" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/yorkie/backend/db" +) + +// DB is an in-memory database for testing or temporarily. +type DB struct { + db *memdb.MemDB +} + +// New returns a new in-memory database. +func New() (*DB, error) { + memDB, err := memdb.NewMemDB(schema) + if err != nil { + return nil, err + } + + return &DB{ + db: memDB, + }, nil +} + +// Close closes the database. +func (d *DB) Close() error { + return nil +} + +// ActivateClient activates a client. +func (d *DB) ActivateClient(ctx context.Context, key string) (*db.ClientInfo, error) { + txn := d.db.Txn(true) + defer txn.Abort() + + raw, err := txn.First(tblClients, "key", key) + if err != nil { + return nil, err + } + + clientInfo := &db.ClientInfo{ + Key: key, + Status: db.ClientActivated, + CreatedAt: gotime.Now(), + } + if raw == nil { + clientInfo.ID = newID() + } else { + clientInfo.ID = raw.(*db.ClientInfo).ID + } + + if err := txn.Insert(tblClients, clientInfo); err != nil { + return nil, err + } + + txn.Commit() + return clientInfo, nil +} + +// DeactivateClient deactivates a client. +func (d *DB) DeactivateClient(ctx context.Context, clientID db.ID) (*db.ClientInfo, error) { + txn := d.db.Txn(true) + defer txn.Abort() + + raw, err := txn.First(tblClients, "id", string(clientID)) + if err != nil { + return nil, err + } + + if raw == nil { + return nil, fmt.Errorf("%s: %w", clientID, db.ErrClientNotFound) + } + + // NOTE(hackerwins): When retrieving objects from go-memdb, references to + // the stored objects are returned instead of new objects. This can cause + // problems when directly modifying loaded objects. So, we need to DeepCopy. + clientInfo := raw.(*db.ClientInfo).DeepCopy() + clientInfo.Status = db.ClientDeactivated + clientInfo.UpdatedAt = gotime.Now() + + if err := txn.Insert(tblClients, clientInfo); err != nil { + return nil, err + } + + txn.Commit() + return clientInfo, nil +} + +// FindClientInfoByID finds a client by ID. +func (d *DB) FindClientInfoByID(ctx context.Context, clientID db.ID) (*db.ClientInfo, error) { + txn := d.db.Txn(false) + defer txn.Abort() + + raw, err := txn.First(tblClients, "id", string(clientID)) + if err != nil { + return nil, err + } + if raw == nil { + return nil, fmt.Errorf("%s: %w", clientID, db.ErrClientNotFound) + } + + return raw.(*db.ClientInfo).DeepCopy(), nil +} + +// UpdateClientInfoAfterPushPull updates the client from the given clientInfo +// after handling PushPull. +func (d *DB) UpdateClientInfoAfterPushPull( + ctx context.Context, + clientInfo *db.ClientInfo, + docInfo *db.DocInfo, +) error { + clientDocInfo := clientInfo.Documents[docInfo.ID] + attached, err := clientInfo.IsAttached(docInfo.ID) + if err != nil { + return err + } + + txn := d.db.Txn(true) + defer txn.Abort() + + raw, err := txn.First(tblClients, "id", string(clientInfo.ID)) + if err != nil { + return err + } + if raw == nil { + return fmt.Errorf("%s: %w", clientInfo.ID, db.ErrClientNotFound) + } + + loaded := raw.(*db.ClientInfo).DeepCopy() + + if !attached { + loaded.Documents[docInfo.ID] = &db.ClientDocInfo{ + ServerSeq: 0, + ClientSeq: 0, + Status: clientDocInfo.Status, + } + loaded.UpdatedAt = gotime.Now() + } else { + loadedClientDocInfo := loaded.Documents[docInfo.ID] + serverSeq := loadedClientDocInfo.ServerSeq + if clientDocInfo.ServerSeq > loadedClientDocInfo.ServerSeq { + serverSeq = clientDocInfo.ServerSeq + } + clientSeq := loadedClientDocInfo.ClientSeq + if clientDocInfo.ClientSeq > loadedClientDocInfo.ClientSeq { + clientSeq = clientDocInfo.ClientSeq + } + loaded.Documents[docInfo.ID] = &db.ClientDocInfo{ + ServerSeq: serverSeq, + ClientSeq: clientSeq, + Status: clientDocInfo.Status, + } + loaded.UpdatedAt = gotime.Now() + } + + if err := txn.Insert(tblClients, loaded); err != nil { + return err + } + txn.Commit() + + return nil +} + +// FindDocInfoByKey finds a docInfo by key. +func (d *DB) FindDocInfoByKey( + ctx context.Context, + clientInfo *db.ClientInfo, + bsonDocKey string, + createDocIfNotExist bool, +) (*db.DocInfo, error) { + txn := d.db.Txn(true) + defer txn.Abort() + + raw, err := txn.First(tblDocuments, "key", bsonDocKey) + if err != nil { + return nil, err + } + if !createDocIfNotExist && raw == nil { + return nil, fmt.Errorf("%s: %w", bsonDocKey, db.ErrDocumentNotFound) + } + + now := gotime.Now() + var docInfo *db.DocInfo + if raw == nil { + docInfo = &db.DocInfo{ + ID: newID(), + Key: bsonDocKey, + Owner: clientInfo.ID, + ServerSeq: 0, + CreatedAt: now, + AccessedAt: now, + } + if err := txn.Insert(tblDocuments, docInfo); err != nil { + return nil, err + } + txn.Commit() + } else { + docInfo = raw.(*db.DocInfo).DeepCopy() + } + + return docInfo, nil +} + +// CreateChangeInfos stores the given changes and doc info. +func (d *DB) CreateChangeInfos( + ctx context.Context, + docInfo *db.DocInfo, + initialServerSeq uint64, + changes []*change.Change, +) error { + txn := d.db.Txn(true) + defer txn.Abort() + + for _, cn := range changes { + encodedOperations, err := db.EncodeOperations(cn.Operations()) + if err != nil { + return err + } + + if err := txn.Insert(tblChanges, &db.ChangeInfo{ + ID: newID(), + DocID: docInfo.ID, + ServerSeq: cn.ServerSeq(), + Actor: db.ID(cn.ID().Actor().String()), + ClientSeq: cn.ClientSeq(), + Lamport: cn.ID().Lamport(), + Message: cn.Message(), + Operations: encodedOperations, + }); err != nil { + return err + } + } + + raw, err := txn.First(tblDocuments, "key", docInfo.Key) + if err != nil { + return err + } + if raw == nil { + return fmt.Errorf("%s: %w", docInfo.Key, db.ErrDocumentNotFound) + } + loadedDocInfo := raw.(*db.DocInfo).DeepCopy() + if loadedDocInfo.ServerSeq != initialServerSeq { + return fmt.Errorf("%s: %w", docInfo.ID, db.ErrConflictOnUpdate) + } + + loadedDocInfo.ServerSeq = docInfo.ServerSeq + loadedDocInfo.UpdatedAt = gotime.Now() + if err := txn.Insert(tblDocuments, loadedDocInfo); err != nil { + return err + } + + txn.Commit() + return nil +} + +// FindChangesBetweenServerSeqs returns the changes between two server sequences. +func (d *DB) FindChangesBetweenServerSeqs( + ctx context.Context, + docID db.ID, + from uint64, + to uint64, +) ([]*change.Change, error) { + infos, err := d.FindChangeInfosBetweenServerSeqs(ctx, docID, from, to) + if err != nil { + return nil, err + } + + var changes []*change.Change + for _, info := range infos { + c, err := info.ToChange() + if err != nil { + return nil, err + } + changes = append(changes, c) + } + + return changes, nil +} + +// FindChangeInfosBetweenServerSeqs returns the changeInfos between two server sequences. +func (d *DB) FindChangeInfosBetweenServerSeqs( + ctx context.Context, + docID db.ID, + from uint64, + to uint64, +) ([]*db.ChangeInfo, error) { + txn := d.db.Txn(false) + defer txn.Abort() + + var infos []*db.ChangeInfo + + iterator, err := txn.LowerBound( + tblChanges, + "doc_id_server_seq", + docID.String(), + from, + ) + if err != nil { + return nil, err + } + + for raw := iterator.Next(); raw != nil; raw = iterator.Next() { + info := raw.(*db.ChangeInfo) + if info.ServerSeq > to { + break + } + infos = append(infos, info) + } + return infos, nil +} + +// CreateSnapshotInfo stores the snapshot of the given document. +func (d *DB) CreateSnapshotInfo( + ctx context.Context, + docID db.ID, + doc *document.InternalDocument, +) error { + snapshot, err := converter.ObjectToBytes(doc.RootObject()) + if err != nil { + return err + } + + txn := d.db.Txn(true) + defer txn.Abort() + + if err := txn.Insert(tblSnapshots, &db.SnapshotInfo{ + ID: newID(), + DocID: docID, + ServerSeq: doc.Checkpoint().ServerSeq, + Snapshot: snapshot, + CreatedAt: gotime.Now(), + }); err != nil { + return err + } + txn.Commit() + return nil +} + +// FindLastSnapshotInfo finds the last snapshot of the given document. +func (d *DB) FindLastSnapshotInfo( + ctx context.Context, + docID db.ID, +) (*db.SnapshotInfo, error) { + txn := d.db.Txn(false) + defer txn.Abort() + + iterator, err := txn.ReverseLowerBound( + tblSnapshots, + "doc_id_server_seq", + docID.String(), + uint64(math.MaxUint64), + ) + if err != nil { + return nil, err + } + + raw := iterator.Next() + if raw == nil { + return &db.SnapshotInfo{}, nil + } + + return raw.(*db.SnapshotInfo), nil +} + +// UpdateAndFindMinSyncedTicket updates the given serverSeq of the given client +// and returns the min synced ticket. +func (d *DB) UpdateAndFindMinSyncedTicket( + ctx context.Context, + clientInfo *db.ClientInfo, + docID db.ID, + serverSeq uint64, +) (*time.Ticket, error) { + if err := d.updateSyncedSeq(clientInfo, docID, serverSeq); err != nil { + return nil, err + } + + txn := d.db.Txn(false) + defer txn.Abort() + + iterator, err := txn.LowerBound( + tblSyncedSeqs, + "doc_id_server_seq", + docID.String(), + uint64(0), + ) + if err != nil { + return nil, err + } + + raw := iterator.Next() + if raw == nil { + return time.InitialTicket, nil + } + syncedSeqInfo := raw.(*db.SyncedSeqInfo) + if syncedSeqInfo.ServerSeq == 0 { + return time.InitialTicket, nil + } + + // 03. find ticket by seq. + // TODO: We need to find a way to not access `changes` collection. + return d.findTicketByServerSeq(txn, docID, syncedSeqInfo.ServerSeq) +} + +func (d *DB) updateSyncedSeq( + clientInfo *db.ClientInfo, + docID db.ID, + serverSeq uint64, +) error { + txn := d.db.Txn(true) + defer txn.Abort() + + isAttached, err := clientInfo.IsAttached(docID) + if err != nil { + return err + } + + if isAttached { + raw, err := txn.First( + tblSyncedSeqs, + "doc_id_client_id", + docID.String(), + clientInfo.ID.String(), + ) + if err != nil { + return err + } + + syncedSeqInfo := &db.SyncedSeqInfo{ + DocID: docID, + ClientID: clientInfo.ID, + ServerSeq: serverSeq, + } + if raw == nil { + syncedSeqInfo.ID = newID() + } else { + syncedSeqInfo.ID = raw.(*db.SyncedSeqInfo).ID + } + + if err := txn.Insert(tblSyncedSeqs, syncedSeqInfo); err != nil { + return err + } + } else { + if _, err = txn.DeleteAll( + tblSyncedSeqs, + "doc_id_client_id", + docID, + clientInfo.ID, + ); err != nil { + return err + } + } + + txn.Commit() + return nil +} + +func (d *DB) findTicketByServerSeq( + txn *memdb.Txn, + docID db.ID, + serverSeq uint64, +) (*time.Ticket, error) { + raw, err := txn.First( + tblChanges, + "doc_id_server_seq", + docID.String(), + serverSeq, + ) + if err != nil { + return nil, err + } + if raw == nil { + return nil, fmt.Errorf("%s: %w", docID.String(), db.ErrDocumentNotFound) + } + + changeInfo := raw.(*db.ChangeInfo) + actorID, err := time.ActorIDFromHex(changeInfo.Actor.String()) + if err != nil { + return nil, err + } + + return time.NewTicket( + changeInfo.Lamport, + time.MaxDelimiter, + actorID, + ), nil +} + +func newID() db.ID { + return db.ID(primitive.NewObjectID().Hex()) +} diff --git a/yorkie/backend/db/memory/db_test.go b/yorkie/backend/db/memory/db_test.go new file mode 100644 index 000000000..aef5118dd --- /dev/null +++ b/yorkie/backend/db/memory/db_test.go @@ -0,0 +1,173 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory_test + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/proxy" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/yorkie/backend/db" + "github.com/yorkie-team/yorkie/yorkie/backend/db/memory" +) + +func TestDB(t *testing.T) { + ctx := context.Background() + memdb, err := memory.New() + assert.NoError(t, err) + + t.Run("activate/deactivate client test", func(t *testing.T) { + // try to deactivate the client with not exists ID. + _, err = memdb.DeactivateClient(ctx, db.ID("not exists")) + assert.ErrorIs(t, err, db.ErrClientNotFound) + + clientInfo, err := memdb.ActivateClient(ctx, t.Name()) + assert.NoError(t, err) + + assert.Equal(t, t.Name(), clientInfo.Key) + assert.Equal(t, db.ClientActivated, clientInfo.Status) + + // try to activate the client twice. + clientInfo, err = memdb.ActivateClient(ctx, t.Name()) + assert.NoError(t, err) + assert.Equal(t, t.Name(), clientInfo.Key) + assert.Equal(t, db.ClientActivated, clientInfo.Status) + + clientID := clientInfo.ID + + clientInfo, err = memdb.DeactivateClient(ctx, clientID) + assert.NoError(t, err) + assert.Equal(t, t.Name(), clientInfo.Key) + assert.Equal(t, db.ClientDeactivated, clientInfo.Status) + + // try to deactivate the client twice. + clientInfo, err = memdb.DeactivateClient(ctx, clientID) + assert.NoError(t, err) + assert.Equal(t, t.Name(), clientInfo.Key) + assert.Equal(t, db.ClientDeactivated, clientInfo.Status) + }) + + t.Run("activate and find client test", func(t *testing.T) { + _, err := memdb.FindClientInfoByID(ctx, db.ID("not exists")) + assert.ErrorIs(t, err, db.ErrClientNotFound) + + clientInfo, err := memdb.ActivateClient(ctx, t.Name()) + assert.NoError(t, err) + + found, err := memdb.FindClientInfoByID(ctx, clientInfo.ID) + assert.NoError(t, err) + assert.Equal(t, clientInfo.Key, found.Key) + }) + + t.Run("find docInfo test", func(t *testing.T) { + clientInfo, err := memdb.ActivateClient(ctx, t.Name()) + assert.NoError(t, err) + + bsonDocKey := fmt.Sprintf("tests$%s", t.Name()) + _, err = memdb.FindDocInfoByKey(ctx, clientInfo, bsonDocKey, false) + assert.ErrorIs(t, err, db.ErrDocumentNotFound) + + docInfo, err := memdb.FindDocInfoByKey(ctx, clientInfo, bsonDocKey, true) + assert.NoError(t, err) + assert.Equal(t, bsonDocKey, docInfo.Key) + }) + + t.Run("update clientInfo after PushPull test", func(t *testing.T) { + clientInfo, err := memdb.ActivateClient(ctx, t.Name()) + assert.NoError(t, err) + + bsonDocKey := fmt.Sprintf("tests$%s", t.Name()) + docInfo, err := memdb.FindDocInfoByKey(ctx, clientInfo, bsonDocKey, true) + assert.NoError(t, err) + + err = memdb.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo) + assert.ErrorIs(t, err, db.ErrDocumentNeverAttached) + assert.NoError(t, clientInfo.AttachDocument(docInfo.ID)) + assert.NoError(t, memdb.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo)) + }) + + t.Run("insert and find changes test", func(t *testing.T) { + bsonDocKey := fmt.Sprintf("tests$%s", t.Name()) + + clientInfo, _ := memdb.ActivateClient(ctx, t.Name()) + docInfo, _ := memdb.FindDocInfoByKey(ctx, clientInfo, bsonDocKey, true) + assert.NoError(t, clientInfo.AttachDocument(docInfo.ID)) + assert.NoError(t, memdb.UpdateClientInfoAfterPushPull(ctx, clientInfo, docInfo)) + + bytesID, _ := clientInfo.ID.Bytes() + actorID, _ := time.ActorIDFromBytes(bytesID) + doc := document.New("tests", t.Name()) + doc.SetActor(actorID) + assert.NoError(t, doc.Update(func(root *proxy.ObjectProxy) error { + root.SetNewArray("array") + return nil + })) + for idx := 0; idx < 10; idx++ { + assert.NoError(t, doc.Update(func(root *proxy.ObjectProxy) error { + root.GetArray("array").AddInteger(idx) + return nil + })) + } + pack := doc.CreateChangePack() + for idx, change := range pack.Changes { + change.SetServerSeq(uint64(idx)) + } + + // Store changes + err = memdb.CreateChangeInfos(ctx, docInfo, 0, pack.Changes) + assert.NoError(t, err) + + // Find changes + loadedChanges, err := memdb.FindChangesBetweenServerSeqs( + ctx, + docInfo.ID, + 6, + 10, + ) + assert.NoError(t, err) + assert.Len(t, loadedChanges, 5) + }) + + t.Run("store and find snapshots test", func(t *testing.T) { + ctx := context.Background() + bsonDocKey := fmt.Sprintf("tests$%s", t.Name()) + + clientInfo, _ := memdb.ActivateClient(ctx, t.Name()) + bytesID, _ := clientInfo.ID.Bytes() + actorID, _ := time.ActorIDFromBytes(bytesID) + docInfo, _ := memdb.FindDocInfoByKey(ctx, clientInfo, bsonDocKey, true) + + doc := document.New("tests", t.Name()) + doc.SetActor(actorID) + assert.NoError(t, doc.Update(func(root *proxy.ObjectProxy) error { + root.SetNewArray("array") + return nil + })) + + err = memdb.CreateSnapshotInfo(ctx, docInfo.ID, doc.InternalDocument()) + assert.NoError(t, err) + + snapshot, err := memdb.FindLastSnapshotInfo(ctx, docInfo.ID) + assert.NoError(t, err) + assert.NotNil(t, snapshot) + }) +} diff --git a/yorkie/backend/db/memory/indexes.go b/yorkie/backend/db/memory/indexes.go new file mode 100644 index 000000000..d45632374 --- /dev/null +++ b/yorkie/backend/db/memory/indexes.go @@ -0,0 +1,131 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package memory + +import "github.com/hashicorp/go-memdb" + +var ( + tblClients = "clients" + tblDocuments = "documents" + tblChanges = "changes" + tblSnapshots = "snapshots" + tblSyncedSeqs = "syncedseqs" +) + +var schema = &memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + tblClients: { + Name: tblClients, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "key": { + Name: "key", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tblDocuments: { + Name: tblDocuments, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "key": { + Name: "key", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "Key"}, + }, + }, + }, + tblChanges: { + Name: tblChanges, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "doc_id_server_seq": { + Name: "doc_id_server_seq", + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.UintFieldIndex{Field: "ServerSeq"}, + }, + }, + }, + }, + }, + tblSnapshots: { + Name: tblSnapshots, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "doc_id_server_seq": { + Name: "doc_id_server_seq", + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.UintFieldIndex{Field: "ServerSeq"}, + }, + }, + }, + }, + }, + tblSyncedSeqs: { + Name: tblSyncedSeqs, + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.StringFieldIndex{Field: "ID"}, + }, + "doc_id_client_id": { + Name: "doc_id_client_id", + Unique: true, + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.StringFieldIndex{Field: "ClientID"}, + }, + }, + }, + "doc_id_server_seq": { + Name: "doc_id_server_seq", + Indexer: &memdb.CompoundIndex{ + Indexes: []memdb.Indexer{ + &memdb.StringFieldIndex{Field: "DocID"}, + &memdb.UintFieldIndex{Field: "ServerSeq"}, + }, + }, + }, + }, + }, + }, +} diff --git a/yorkie/backend/db/mongo/client.go b/yorkie/backend/db/mongo/client.go index 6659d330a..8a45d75c8 100644 --- a/yorkie/backend/db/mongo/client.go +++ b/yorkie/backend/db/mongo/client.go @@ -94,7 +94,7 @@ func (c *Client) ActivateClient(ctx context.Context, key string) (*db.ClientInfo clientInfo := db.ClientInfo{} now := gotime.Now() - res, err := c.collection(ColClients).UpdateOne(ctx, bson.M{ + res, err := c.collection(colClients).UpdateOne(ctx, bson.M{ "key": key, }, bson.M{ "$set": bson.M{ @@ -109,7 +109,7 @@ func (c *Client) ActivateClient(ctx context.Context, key string) (*db.ClientInfo var result *mongo.SingleResult if res.UpsertedCount > 0 { - result = c.collection(ColClients).FindOneAndUpdate(ctx, bson.M{ + result = c.collection(colClients).FindOneAndUpdate(ctx, bson.M{ "_id": res.UpsertedID, }, bson.M{ "$set": bson.M{ @@ -117,7 +117,7 @@ func (c *Client) ActivateClient(ctx context.Context, key string) (*db.ClientInfo }, }) } else { - result = c.collection(ColClients).FindOne(ctx, bson.M{ + result = c.collection(colClients).FindOne(ctx, bson.M{ "key": key, }) } @@ -137,7 +137,7 @@ func (c *Client) DeactivateClient(ctx context.Context, clientID db.ID) (*db.Clie } clientInfo := db.ClientInfo{} - res := c.collection(ColClients).FindOneAndUpdate(ctx, bson.M{ + res := c.collection(colClients).FindOneAndUpdate(ctx, bson.M{ "_id": encodedClientID, }, bson.M{ "$set": bson.M{ @@ -164,7 +164,7 @@ func (c *Client) FindClientInfoByID(ctx context.Context, clientID db.ID) (*db.Cl } clientInfo := db.ClientInfo{} - result := c.collection(ColClients).FindOne(ctx, bson.M{ + result := c.collection(colClients).FindOne(ctx, bson.M{ "_id": encodedClientID, }) if err := result.Decode(&clientInfo); err != nil { @@ -213,7 +213,7 @@ func (c *Client) UpdateClientInfoAfterPushPull( } } - result := c.collection(ColClients).FindOneAndUpdate(ctx, bson.M{ + result := c.collection(colClients).FindOneAndUpdate(ctx, bson.M{ "key": clientInfo.Key, }, updater) @@ -242,9 +242,8 @@ func (c *Client) FindDocInfoByKey( return nil, err } - docInfo := db.DocInfo{} now := gotime.Now() - res, err := c.collection(ColDocuments).UpdateOne(ctx, bson.M{ + res, err := c.collection(colDocuments).UpdateOne(ctx, bson.M{ "key": bsonDocKey, }, bson.M{ "$set": bson.M{ @@ -258,7 +257,7 @@ func (c *Client) FindDocInfoByKey( var result *mongo.SingleResult if res.UpsertedCount > 0 { - result = c.collection(ColDocuments).FindOneAndUpdate(ctx, bson.M{ + result = c.collection(colDocuments).FindOneAndUpdate(ctx, bson.M{ "_id": res.UpsertedID, }, bson.M{ "$set": bson.M{ @@ -268,7 +267,7 @@ func (c *Client) FindDocInfoByKey( }, }) } else { - result = c.collection(ColDocuments).FindOne(ctx, bson.M{ + result = c.collection(colDocuments).FindOne(ctx, bson.M{ "key": bsonDocKey, }) if result.Err() == mongo.ErrNoDocuments { @@ -280,6 +279,8 @@ func (c *Client) FindDocInfoByKey( return nil, result.Err() } } + + docInfo := db.DocInfo{} if err := result.Decode(&docInfo); err != nil { return nil, err } @@ -287,8 +288,8 @@ func (c *Client) FindDocInfoByKey( return &docInfo, nil } -// StoreChangeInfos stores the given changes and doc info. -func (c *Client) StoreChangeInfos( +// CreateChangeInfos stores the given changes and doc info. +func (c *Client) CreateChangeInfos( ctx context.Context, docInfo *db.DocInfo, initialServerSeq uint64, @@ -320,7 +321,7 @@ func (c *Client) StoreChangeInfos( // TODO(hackerwins): We need to handle the updates for the two collections // below atomically. - if _, err = c.collection(ColChanges).BulkWrite( + if _, err = c.collection(colChanges).BulkWrite( ctx, models, options.BulkWrite().SetOrdered(true), @@ -329,7 +330,7 @@ func (c *Client) StoreChangeInfos( return err } - res, err := c.collection(ColDocuments).UpdateOne(ctx, bson.M{ + res, err := c.collection(colDocuments).UpdateOne(ctx, bson.M{ "_id": encodedDocID, "server_seq": initialServerSeq, }, bson.M{ @@ -349,34 +350,6 @@ func (c *Client) StoreChangeInfos( return nil } -// CreateSnapshotInfo stores the snapshot of the given document. -func (c *Client) CreateSnapshotInfo( - ctx context.Context, - docID db.ID, - doc *document.InternalDocument, -) error { - encodedDocID, err := encodeID(docID) - if err != nil { - return err - } - snapshot, err := converter.ObjectToBytes(doc.RootObject()) - if err != nil { - return err - } - - if _, err := c.collection(ColSnapshots).InsertOne(ctx, bson.M{ - "doc_id": encodedDocID, - "server_seq": doc.Checkpoint().ServerSeq, - "snapshot": snapshot, - "created_at": gotime.Now(), - }); err != nil { - log.Logger.Error(err) - return err - } - - return nil -} - // FindChangesBetweenServerSeqs returns the changes between two server sequences. func (c *Client) FindChangesBetweenServerSeqs( ctx context.Context, @@ -414,7 +387,7 @@ func (c *Client) FindChangeInfosBetweenServerSeqs( } var infos []*db.ChangeInfo - cursor, err := c.collection(ColChanges).Find(ctx, bson.M{ + cursor, err := c.collection(colChanges).Find(ctx, bson.M{ "doc_id": encodedDocID, "server_seq": bson.M{ "$gte": from, @@ -434,6 +407,67 @@ func (c *Client) FindChangeInfosBetweenServerSeqs( return infos, nil } +// CreateSnapshotInfo stores the snapshot of the given document. +func (c *Client) CreateSnapshotInfo( + ctx context.Context, + docID db.ID, + doc *document.InternalDocument, +) error { + encodedDocID, err := encodeID(docID) + if err != nil { + return err + } + snapshot, err := converter.ObjectToBytes(doc.RootObject()) + if err != nil { + return err + } + + if _, err := c.collection(colSnapshots).InsertOne(ctx, bson.M{ + "doc_id": encodedDocID, + "server_seq": doc.Checkpoint().ServerSeq, + "snapshot": snapshot, + "created_at": gotime.Now(), + }); err != nil { + log.Logger.Error(err) + return err + } + + return nil +} + +// FindLastSnapshotInfo finds the last snapshot of the given document. +func (c *Client) FindLastSnapshotInfo( + ctx context.Context, + docID db.ID, +) (*db.SnapshotInfo, error) { + encodedDocID, err := encodeID(docID) + if err != nil { + return nil, err + } + + snapshotInfo := &db.SnapshotInfo{} + result := c.collection(colSnapshots).FindOne(ctx, bson.M{ + "doc_id": encodedDocID, + }, options.FindOne().SetSort(bson.M{ + "server_seq": -1, + })) + + if result.Err() == mongo.ErrNoDocuments { + return snapshotInfo, nil + } + + if result.Err() != nil { + log.Logger.Error(result.Err()) + return nil, result.Err() + } + + if err := result.Decode(snapshotInfo); err != nil { + return nil, err + } + + return snapshotInfo, nil +} + // UpdateAndFindMinSyncedTicket updates the given serverSeq of the given client // and returns the min synced ticket. func (c *Client) UpdateAndFindMinSyncedTicket( @@ -458,7 +492,7 @@ func (c *Client) UpdateAndFindMinSyncedTicket( } if isAttached { - if _, err = c.collection(ColSyncedSeqs).UpdateOne(ctx, bson.M{ + if _, err = c.collection(colSyncedSeqs).UpdateOne(ctx, bson.M{ "doc_id": encodedDocID, "client_id": encodedClientID, }, bson.M{ @@ -470,7 +504,7 @@ func (c *Client) UpdateAndFindMinSyncedTicket( return nil, err } } else { - if _, err = c.collection(ColSyncedSeqs).DeleteOne(ctx, bson.M{ + if _, err = c.collection(colSyncedSeqs).DeleteOne(ctx, bson.M{ "doc_id": encodedDocID, "client_id": encodedClientID, }, options.Delete()); err != nil { @@ -481,7 +515,7 @@ func (c *Client) UpdateAndFindMinSyncedTicket( // 02. find min synced seq of the given document. syncedSeqInfo := db.SyncedSeqInfo{} - result := c.collection(ColSyncedSeqs).FindOne(ctx, bson.M{ + result := c.collection(colSyncedSeqs).FindOne(ctx, bson.M{ "doc_id": encodedDocID, }, options.FindOne().SetSort(bson.M{ "server_seq": 1, @@ -511,39 +545,6 @@ func (c *Client) UpdateAndFindMinSyncedTicket( return ticket, nil } -// FindLastSnapshotInfo finds the last snapshot of the given document. -func (c *Client) FindLastSnapshotInfo( - ctx context.Context, - docID db.ID, -) (*db.SnapshotInfo, error) { - encodedDocID, err := encodeID(docID) - if err != nil { - return nil, err - } - - snapshotInfo := &db.SnapshotInfo{} - result := c.collection(ColSnapshots).FindOne(ctx, bson.M{ - "doc_id": encodedDocID, - }, options.FindOne().SetSort(bson.M{ - "server_seq": -1, - })) - - if result.Err() == mongo.ErrNoDocuments { - return snapshotInfo, nil - } - - if result.Err() != nil { - log.Logger.Error(result.Err()) - return nil, result.Err() - } - - if err := result.Decode(snapshotInfo); err != nil { - return nil, err - } - - return snapshotInfo, nil -} - func (c *Client) findTicketByServerSeq( ctx context.Context, docID db.ID, @@ -555,7 +556,7 @@ func (c *Client) findTicketByServerSeq( } changeInfo := db.ChangeInfo{} - result := c.collection(ColChanges).FindOne(ctx, bson.M{ + result := c.collection(colChanges).FindOne(ctx, bson.M{ "doc_id": encodedDocID, "server_seq": serverSeq, }) diff --git a/yorkie/backend/db/mongo/index.go b/yorkie/backend/db/mongo/index.go deleted file mode 100644 index 547cb4565..000000000 --- a/yorkie/backend/db/mongo/index.go +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Copyright 2020 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package mongo - -import ( - "context" - - "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" - "go.mongodb.org/mongo-driver/x/bsonx" - - "github.com/yorkie-team/yorkie/internal/log" -) - -// Below are names and indexes information of collections that stores Yorkie data. -var ( - ColClients = "clients" - idxClientInfos = []mongo.IndexModel{{ - Keys: bsonx.Doc{{Key: "key", Value: bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true), - }} - - ColDocuments = "documents" - idxDocInfos = []mongo.IndexModel{{ - Keys: bsonx.Doc{{Key: "key", Value: bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true), - }} - - ColChanges = "changes" - idxChanges = []mongo.IndexModel{{ - Keys: bsonx.Doc{ - {Key: "doc_id", Value: bsonx.Int32(1)}, - {Key: "server_seq", Value: bsonx.Int32(1)}, - }, - Options: options.Index().SetUnique(true), - }} - - ColSnapshots = "snapshots" - idxSnapshots = []mongo.IndexModel{{ - Keys: bsonx.Doc{ - {Key: "doc_id", Value: bsonx.Int32(1)}, - {Key: "server_seq", Value: bsonx.Int32(1)}, - }, - Options: options.Index().SetUnique(true), - }} - - ColSyncedSeqs = "syncedseqs" - idxSyncedSeqs = []mongo.IndexModel{{ - Keys: bsonx.Doc{ - {Key: "doc_id", Value: bsonx.Int32(1)}, - {Key: "client_id", Value: bsonx.Int32(1)}, - }, - Options: options.Index().SetUnique(true), - }, { - Keys: bsonx.Doc{ - {Key: "doc_id", Value: bsonx.Int32(1)}, - {Key: "server_seq", Value: bsonx.Int32(1)}, - }, - }} -) - -func ensureIndexes(ctx context.Context, db *mongo.Database) error { - if _, err := db.Collection(ColClients).Indexes().CreateMany( - ctx, - idxClientInfos, - ); err != nil { - log.Logger.Error(err) - return err - } - - if _, err := db.Collection(ColDocuments).Indexes().CreateMany( - ctx, - idxDocInfos, - ); err != nil { - log.Logger.Error(err) - return err - } - - if _, err := db.Collection(ColChanges).Indexes().CreateMany( - ctx, - idxChanges, - ); err != nil { - log.Logger.Error(err) - return err - } - - if _, err := db.Collection(ColSnapshots).Indexes().CreateMany( - ctx, - idxSnapshots, - ); err != nil { - log.Logger.Error(err) - return err - } - - if _, err := db.Collection(ColSyncedSeqs).Indexes().CreateMany( - ctx, - idxSyncedSeqs, - ); err != nil { - log.Logger.Error(err) - return err - } - - return nil -} diff --git a/yorkie/backend/db/mongo/indexes.go b/yorkie/backend/db/mongo/indexes.go new file mode 100644 index 000000000..ce4dbb0fb --- /dev/null +++ b/yorkie/backend/db/mongo/indexes.go @@ -0,0 +1,97 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mongo + +import ( + "context" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.mongodb.org/mongo-driver/x/bsonx" +) + +const ( + colClients = "clients" + colDocuments = "documents" + colChanges = "changes" + colSnapshots = "snapshots" + colSyncedSeqs = "syncedseqs" +) + +type collectionInfo struct { + name string + indexes []mongo.IndexModel +} + +// Below are names and indexes information of collections that stores Yorkie data. +var collectionInfos = []collectionInfo{ + { + name: colClients, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{{Key: "key", Value: bsonx.Int32(1)}}, + Options: options.Index().SetUnique(true), + }}, + }, { + name: colDocuments, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{{Key: "key", Value: bsonx.Int32(1)}}, + Options: options.Index().SetUnique(true), + }}, + }, { + name: colChanges, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, + {Key: "server_seq", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }}, + }, { + name: colSnapshots, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, + {Key: "server_seq", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }}, + }, { + name: colSyncedSeqs, + indexes: []mongo.IndexModel{{ + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, + {Key: "client_id", Value: bsonx.Int32(1)}, + }, + Options: options.Index().SetUnique(true), + }, { + Keys: bsonx.Doc{ + {Key: "doc_id", Value: bsonx.Int32(1)}, + {Key: "server_seq", Value: bsonx.Int32(1)}, + }, + }}, + }, +} + +func ensureIndexes(ctx context.Context, db *mongo.Database) error { + for _, info := range collectionInfos { + _, err := db.Collection(info.name).Indexes().CreateMany(ctx, info.indexes) + if err != nil { + return err + } + } + return nil +} diff --git a/yorkie/backend/db/synced_seq_info.go b/yorkie/backend/db/synced_seq_info.go index 4518f3b87..e412c0612 100644 --- a/yorkie/backend/db/synced_seq_info.go +++ b/yorkie/backend/db/synced_seq_info.go @@ -3,6 +3,7 @@ package db // SyncedSeqInfo is a structure representing information about the synchronized // sequence for each client. type SyncedSeqInfo struct { + ID ID `bson:"_id"` DocID ID `bson:"doc_id"` ClientID ID `bson:"client_id"` ServerSeq uint64 `bson:"server_seq"` diff --git a/yorkie/config.go b/yorkie/config.go index 50bd182f9..0d9aeb4d3 100644 --- a/yorkie/config.go +++ b/yorkie/config.go @@ -104,8 +104,10 @@ func (c *Config) Validate() error { return err } - if err := c.Mongo.Validate(); err != nil { - return err + if c.Mongo != nil { + if err := c.Mongo.Validate(); err != nil { + return err + } } if c.ETCD != nil { @@ -129,22 +131,6 @@ func (c *Config) ensureDefaultValue() { c.Profiling.Port = DefaultProfilingPort } - if c.Mongo.ConnectionTimeout == "" { - c.Mongo.ConnectionTimeout = DefaultMongoConnectionTimeout.String() - } - - if c.Mongo.ConnectionURI == "" { - c.Mongo.ConnectionURI = DefaultMongoConnectionURI - } - - if c.Mongo.YorkieDatabase == "" { - c.Mongo.YorkieDatabase = DefaultMongoYorkieDatabase - } - - if c.Mongo.PingTimeout == "" { - c.Mongo.PingTimeout = DefaultMongoPingTimeout.String() - } - if c.Backend.SnapshotThreshold == 0 { c.Backend.SnapshotThreshold = DefaultSnapshotThreshold } @@ -169,6 +155,24 @@ func (c *Config) ensureDefaultValue() { c.Backend.AuthWebhookCacheUnauthTTL = DefaultAuthWebhookCacheUnauthTTL.String() } + if c.Mongo != nil { + if c.Mongo.ConnectionURI == "" { + c.Mongo.ConnectionURI = DefaultMongoConnectionURI + } + + if c.Mongo.ConnectionTimeout == "" { + c.Mongo.ConnectionTimeout = DefaultMongoConnectionTimeout.String() + } + + if c.Mongo.YorkieDatabase == "" { + c.Mongo.YorkieDatabase = DefaultMongoYorkieDatabase + } + + if c.Mongo.PingTimeout == "" { + c.Mongo.PingTimeout = DefaultMongoPingTimeout.String() + } + } + if c.ETCD != nil { if c.ETCD.DialTimeout == "" { c.ETCD.DialTimeout = etcd.DefaultDialTimeout.String() @@ -192,11 +196,5 @@ func newConfig(port int, profilingPort int, dbName string) *Config { SnapshotThreshold: DefaultSnapshotThreshold, SnapshotInterval: DefaultSnapshotInterval, }, - Mongo: &mongo.Config{ - ConnectionURI: DefaultMongoConnectionURI, - ConnectionTimeout: DefaultMongoConnectionTimeout.String(), - PingTimeout: DefaultMongoPingTimeout.String(), - YorkieDatabase: dbName, - }, } } diff --git a/yorkie/config_test.go b/yorkie/config_test.go index b943f666f..7e5007dc3 100644 --- a/yorkie/config_test.go +++ b/yorkie/config_test.go @@ -37,15 +37,6 @@ func TestNewConfigFromFile(t *testing.T) { assert.Equal(t, conf.RPC.CertFile, "") assert.Equal(t, conf.RPC.KeyFile, "") - connTimeout, err := time.ParseDuration(conf.Mongo.ConnectionTimeout) - assert.NoError(t, err) - assert.Equal(t, connTimeout, yorkie.DefaultMongoConnectionTimeout) - assert.Equal(t, conf.Mongo.ConnectionURI, yorkie.DefaultMongoConnectionURI) - assert.Equal(t, conf.Mongo.YorkieDatabase, yorkie.DefaultMongoYorkieDatabase) - - pingTimeout, err := time.ParseDuration(conf.Mongo.PingTimeout) - assert.NoError(t, err) - assert.Equal(t, pingTimeout, yorkie.DefaultMongoPingTimeout) assert.Equal(t, conf.Backend.SnapshotThreshold, uint64(yorkie.DefaultSnapshotThreshold)) assert.Equal(t, conf.Backend.SnapshotInterval, uint64(yorkie.DefaultSnapshotInterval)) assert.Equal(t, conf.Backend.AuthWebhookURL, "") diff --git a/yorkie/packs/packs.go b/yorkie/packs/packs.go index e52541c7b..93e07d6a3 100644 --- a/yorkie/packs/packs.go +++ b/yorkie/packs/packs.go @@ -82,7 +82,7 @@ func PushPull( // 03. store pushed changes, document info and checkpoint of the client to DB. if len(pushedChanges) > 0 { - if err := be.DB.StoreChangeInfos(ctx, docInfo, initialServerSeq, pushedChanges); err != nil { + if err := be.DB.CreateChangeInfos(ctx, docInfo, initialServerSeq, pushedChanges); err != nil { return nil, err } } diff --git a/yorkie/rpc/yorkie_server.go b/yorkie/rpc/yorkie_server.go index a74069461..3f264499b 100644 --- a/yorkie/rpc/yorkie_server.go +++ b/yorkie/rpc/yorkie_server.go @@ -64,9 +64,14 @@ func (s *yorkieServer) ActivateClient( return nil, err } + pbClientID, err := client.ID.Bytes() + if err != nil { + return nil, err + } + return &api.ActivateClientResponse{ ClientKey: client.Key, - ClientId: client.ID.Bytes(), + ClientId: pbClientID, }, nil } @@ -90,8 +95,13 @@ func (s *yorkieServer) DeactivateClient( return nil, err } + pbClientID, err := client.ID.Bytes() + if err != nil { + return nil, err + } + return &api.DeactivateClientResponse{ - ClientId: client.ID.Bytes(), + ClientId: pbClientID, }, nil }