diff --git a/server/packs/packs.go b/server/packs/packs.go index f37d94655..6e2a95aeb 100644 --- a/server/packs/packs.go +++ b/server/packs/packs.go @@ -20,6 +20,7 @@ package packs import ( "context" + "errors" "fmt" "strconv" gotime "time" @@ -37,6 +38,15 @@ import ( "github.com/yorkie-team/yorkie/server/logging" ) +var ( + // ErrClientSeqNotSequentialWithCheckpoint is returned + // when ClientSeq in change not sequential With DocInfo.Checkpoint.ClientSeq + ErrClientSeqNotSequentialWithCheckpoint = errors.New("ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq") + + // ErrClientSeqInChangesAreNotSequential is returned when ClientSeq in reqPack.Changes are not sequential + ErrClientSeqInChangesAreNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential") +) + // PushPullKey creates a new sync.Key of PushPull for the given document. func PushPullKey(projectID types.ID, docKey key.Key) sync.Key { return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey)) @@ -75,8 +85,12 @@ func PushPull( be.Metrics.ObservePushPullResponseSeconds(gotime.Since(start).Seconds()) }() - // TODO: Changes may be reordered or missing during communication on the network. - // We should check the change.pack with checkpoint to make sure the changes are in the correct order. + checkpoint := clientInfo.Checkpoint(docInfo.ID) + err := validateClientSeqSequential(reqPack.Changes, checkpoint) + if err != nil { + return nil, err + } + initialServerSeq := docInfo.ServerSeq // 01. push changes: filter out the changes that are already saved in the database. @@ -272,3 +286,47 @@ func BuildDocumentForServerSeq( return doc, nil } + +func validateClientSeqSequential(changes []*change.Change, checkpoint change.Checkpoint) error { + if len(changes) < 1 { + return nil + } + + if err := validateClientSeqSequentialWithCheckpoint(changes, checkpoint); err != nil { + return err + } + + return validateClientSeqInChangesAreSequential(changes) +} + +func validateClientSeqSequentialWithCheckpoint(changes []*change.Change, checkpoint change.Checkpoint) error { + expectedClientSeq := checkpoint.ClientSeq + 1 + actualFirstClientSeq := changes[0].ClientSeq() + + if expectedClientSeq < actualFirstClientSeq { + return fmt.Errorf( + "ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq (expected: %d, actual: %d) : %w", + expectedClientSeq, + actualFirstClientSeq, + ErrClientSeqNotSequentialWithCheckpoint, + ) + } + return nil +} + +func validateClientSeqInChangesAreSequential(changes []*change.Change) error { + nextClientSeq := changes[0].ClientSeq() + for _, cn := range changes[1:] { + nextClientSeq++ + + if nextClientSeq != cn.ClientSeq() { + return fmt.Errorf( + "ClientSeq in Changes are not sequential (expected: %d, actual: %d) : %w", + nextClientSeq, + cn.ClientSeq(), + ErrClientSeqInChangesAreNotSequential, + ) + } + } + return nil +} diff --git a/server/packs/packs_test.go b/server/packs/packs_test.go new file mode 100644 index 000000000..2daaed0a3 --- /dev/null +++ b/server/packs/packs_test.go @@ -0,0 +1,316 @@ +package packs_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + + "github.com/yorkie-team/yorkie/api/converter" + "github.com/yorkie-team/yorkie/api/types" + api "github.com/yorkie-team/yorkie/api/yorkie/v1" + "github.com/yorkie-team/yorkie/pkg/document" + "github.com/yorkie-team/yorkie/pkg/document/change" + "github.com/yorkie-team/yorkie/pkg/document/time" + "github.com/yorkie-team/yorkie/server/backend" + "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/clients" + "github.com/yorkie-team/yorkie/server/documents" + "github.com/yorkie-team/yorkie/server/packs" + "github.com/yorkie-team/yorkie/server/profiling/prometheus" + "github.com/yorkie-team/yorkie/server/rpc/connecthelper" + "github.com/yorkie-team/yorkie/test/helper" +) + +var ( + clientID = "000000000000000000000001" +) + +func Test(t *testing.T) { + t.Run("push/pull sequential ClientSeq test (happy case)", func(t *testing.T) { + RunPushPullWithSequentialClientSeqTest(t) + }) + + t.Run("push/pull not sequential ClientSeq with DocInfo.Checkpoint.ClientSeq test", func(t *testing.T) { + RunPushPullWithNotSequentialClientSeqWithCheckpoint(t) + }) + + t.Run("push/pull not sequential ClientSeq in changes test", func(t *testing.T) { + RunPushPullWithNotSequentialClientSeqInChangesTest(t) + }) + + t.Run("push/pull ClientSeq less than ClientInfo's ClientSeq (duplicated request)", func(t *testing.T) { + RunPushPullWithClientSeqLessThanClientInfoTest(t) + }) + + t.Run("push/pull ServerSeq greater than DocInfo's ServerSeq", func(t *testing.T) { + RunPushPullWithServerSeqGreaterThanDocInfoTest(t) + }) +} + +func RunPushPullWithSequentialClientSeqTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + actorID, _ := time.ActorIDFromHex(clientID) + + changePackWithSequentialClientSeq, _ := + createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true) + err := clientInfo.AttachDocument( + docInfo.ID, changePackWithSequentialClientSeq.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, err = packs.PushPull( + ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.NoError(t, err) +} + +func RunPushPullWithNotSequentialClientSeqInChangesTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + + actorID, _ := time.ActorIDFromHex(clientID) + changePackWithNotSequentialClientSeq, _ := + createChangePackWithNotSequentialClientSeqInChanges(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo, + changePackWithNotSequentialClientSeq.DocumentKey, true) + err := clientInfo.AttachDocument( + docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, err = packs.PushPull( + ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithNotSequentialClientSeq, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqInChangesAreNotSequential), connecthelper.CodeOf(err)) +} + +func RunPushPullWithNotSequentialClientSeqWithCheckpoint(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + if err != nil { + assert.Fail(t, "failed to push pull") + } + + changePackWithNotSequentialClientSeqWithCheckpoint, _ := + createChangePackWithNotSequentialClientSeqWithCheckpoint(helper.TestDocKey(t).String(), actorID.Bytes()) + _, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithNotSequentialClientSeqWithCheckpoint, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequentialWithCheckpoint), connecthelper.CodeOf(err)) +} + +func RunPushPullWithClientSeqLessThanClientInfoTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + if err != nil { + assert.Fail(t, "failed to push pull") + } + + changePackWithClientSeqLessThanClientInfo, _ := + createChangePackWithClientSeqLessThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes()) + _, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackWithClientSeqLessThanClientInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + assert.NoError(t, err) +} + +func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) { + ctx := context.Background() + be := setUpBackend(t) + project, _ := be.DB.FindProjectInfoByID( + ctx, + database.DefaultProjectID, + ) + + clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID) + + actorID, _ := time.ActorIDFromHex(clientID) + changePackFixture, _ := + createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes()) + + docInfo, _ := documents.FindDocInfoByKeyAndOwner( + ctx, be, clientInfo, changePackFixture.DocumentKey, true) + err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached()) + if err != nil { + assert.Fail(t, "failed to attach document") + } + + _, _ = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo, + changePackFixture, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + changePackWithServerSeqGreaterThanDocInfo, _ := + createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String()) + + _, err = packs.PushPull(ctx, be, project.ToProject(), + clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{ + Mode: types.SyncModePushPull, + Status: document.StatusAttached, + }) + + assert.Equal(t, connecthelper.CodeOf(packs.ErrInvalidServerSeq), connecthelper.CodeOf(err)) +} + +func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 3}, + Changes: []*api.Change{ + createChange(1, 1, actorID), + createChange(2, 2, actorID), + createChange(3, 3, actorID), + }, + }) +} + +func createChangePackWithNotSequentialClientSeqInChanges(documentKey string, actorID []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 3}, + Changes: []*api.Change{ + createChange(1, 1, actorID), + createChange(3, 3, actorID), + createChange(2, 2, actorID), + }, + }) +} + +func createChangePackWithNotSequentialClientSeqWithCheckpoint( + documentKey string, actorID []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 3, ClientSeq: 1e9}, + Changes: []*api.Change{ + createChange(1e9, 1e9, actorID), + }, + }) +} + +func createChangePackWithClientSeqLessThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 3, ClientSeq: 3}, + Changes: []*api.Change{ + createChange(0, 0, actorID), + }, + }) +} + +func createChangePackFixture(documentKey string, actorID []byte) (*change.Pack, error) { + return createChangePackWithSequentialClientSeq(documentKey, actorID) +} + +func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) { + return converter.FromChangePack(&api.ChangePack{ + DocumentKey: documentKey, + Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 3}, + }) +} + +func createChange(clientSeq uint32, lamport int64, actorID []byte) *api.Change { + return &api.Change{ + Id: &api.ChangeID{ + ClientSeq: clientSeq, + Lamport: lamport, + ActorId: actorID, + }, + } +} + +func setUpBackend( + t *testing.T, +) *backend.Backend { + conf := helper.TestConfig() + + metrics, err := prometheus.NewMetrics() + assert.NoError(t, err) + + be, err := backend.New( + conf.Backend, + conf.Mongo, + conf.Housekeeping, + metrics, + ) + assert.NoError(t, err) + + return be +} diff --git a/server/rpc/connecthelper/status.go b/server/rpc/connecthelper/status.go index 8708b401b..93a63270f 100644 --- a/server/rpc/connecthelper/status.go +++ b/server/rpc/connecthelper/status.go @@ -39,15 +39,16 @@ import ( // errorToConnectCode maps an error to connectRPC status code. var errorToConnectCode = map[error]connect.Code{ // InvalidArgument means the request is malformed. - converter.ErrPackRequired: connect.CodeInvalidArgument, - converter.ErrCheckpointRequired: connect.CodeInvalidArgument, - time.ErrInvalidHexString: connect.CodeInvalidArgument, - time.ErrInvalidActorID: connect.CodeInvalidArgument, - types.ErrInvalidID: connect.CodeInvalidArgument, - clients.ErrInvalidClientID: connect.CodeInvalidArgument, - clients.ErrInvalidClientKey: connect.CodeInvalidArgument, - key.ErrInvalidKey: connect.CodeInvalidArgument, - types.ErrEmptyProjectFields: connect.CodeInvalidArgument, + converter.ErrPackRequired: connect.CodeInvalidArgument, + converter.ErrCheckpointRequired: connect.CodeInvalidArgument, + time.ErrInvalidHexString: connect.CodeInvalidArgument, + time.ErrInvalidActorID: connect.CodeInvalidArgument, + types.ErrInvalidID: connect.CodeInvalidArgument, + clients.ErrInvalidClientID: connect.CodeInvalidArgument, + clients.ErrInvalidClientKey: connect.CodeInvalidArgument, + key.ErrInvalidKey: connect.CodeInvalidArgument, + types.ErrEmptyProjectFields: connect.CodeInvalidArgument, + packs.ErrClientSeqInChangesAreNotSequential: connect.CodeInvalidArgument, // NotFound means the requested resource does not exist. database.ErrProjectNotFound: connect.CodeNotFound, @@ -62,13 +63,14 @@ var errorToConnectCode = map[error]connect.Code{ // FailedPrecondition means the request is rejected because the state of the // system is not the desired state. - database.ErrClientNotActivated: connect.CodeFailedPrecondition, - database.ErrDocumentNotAttached: connect.CodeFailedPrecondition, - database.ErrDocumentAlreadyAttached: connect.CodeFailedPrecondition, - database.ErrDocumentAlreadyDetached: connect.CodeFailedPrecondition, - documents.ErrDocumentAttached: connect.CodeFailedPrecondition, - packs.ErrInvalidServerSeq: connect.CodeFailedPrecondition, - database.ErrConflictOnUpdate: connect.CodeFailedPrecondition, + database.ErrClientNotActivated: connect.CodeFailedPrecondition, + database.ErrDocumentNotAttached: connect.CodeFailedPrecondition, + database.ErrDocumentAlreadyAttached: connect.CodeFailedPrecondition, + database.ErrDocumentAlreadyDetached: connect.CodeFailedPrecondition, + documents.ErrDocumentAttached: connect.CodeFailedPrecondition, + packs.ErrInvalidServerSeq: connect.CodeFailedPrecondition, + database.ErrConflictOnUpdate: connect.CodeFailedPrecondition, + packs.ErrClientSeqNotSequentialWithCheckpoint: connect.CodeFailedPrecondition, // Unimplemented means the server does not implement the functionality. converter.ErrUnsupportedOperation: connect.CodeUnimplemented, @@ -91,15 +93,17 @@ var errorToConnectCode = map[error]connect.Code{ // TODO(hackerwins): We need to add codes by hand for each error. It would be // better to generate this map automatically. var errorToCode = map[error]string{ - converter.ErrPackRequired: "ErrPackRequired", - converter.ErrCheckpointRequired: "ErrCheckpointRequired", - time.ErrInvalidHexString: "ErrInvalidHexString", - time.ErrInvalidActorID: "ErrInvalidActorID", - types.ErrInvalidID: "ErrInvalidID", - clients.ErrInvalidClientID: "ErrInvalidClientID", - clients.ErrInvalidClientKey: "ErrInvalidClientKey", - key.ErrInvalidKey: "ErrInvalidKey", - types.ErrEmptyProjectFields: "ErrEmptyProjectFields", + converter.ErrPackRequired: "ErrPackRequired", + converter.ErrCheckpointRequired: "ErrCheckpointRequired", + time.ErrInvalidHexString: "ErrInvalidHexString", + time.ErrInvalidActorID: "ErrInvalidActorID", + types.ErrInvalidID: "ErrInvalidID", + clients.ErrInvalidClientID: "ErrInvalidClientID", + clients.ErrInvalidClientKey: "ErrInvalidClientKey", + key.ErrInvalidKey: "ErrInvalidKey", + types.ErrEmptyProjectFields: "ErrEmptyProjectFields", + packs.ErrClientSeqNotSequentialWithCheckpoint: "ErrClientSeqNotSequentialWithCheckpoint", + packs.ErrClientSeqInChangesAreNotSequential: "ErrClientSeqInChangesAreNotSequential", database.ErrProjectNotFound: "ErrProjectNotFound", database.ErrClientNotFound: "ErrClientNotFound",