Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate Checkpoint in ChangePack for PushPull API requests #959

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
495a54f
Add HTTP health check handler for server health monitoring (#952)
taeng0204 Aug 7, 2024
4344fce
Implement validation method to ensure sequential ClientSeq in reqPack…
binary-ho Aug 9, 2024
c5abdcd
Add ErrClientSeqNotSequential to errorToCode Mappers in yorkie.connect
binary-ho Aug 9, 2024
62e7f3d
Write 'packs' test code for the following cases: sequential ClientSeq…
binary-ho Aug 9, 2024
2b58d1d
Fix linting issues in packs_test
binary-ho Aug 9, 2024
2c48484
Write 'packs' test code for the following cases: sequential ClientSeq…
binary-ho Aug 10, 2024
f41ea88
Merge branch 'main' into validate-checkpoint
binary-ho Aug 10, 2024
6d13dc9
Fix goimports issues in packs_test
binary-ho Aug 10, 2024
a348192
Fix goimports issues in packs_test
binary-ho Aug 10, 2024
e447471
Rewrite test description
binary-ho Aug 10, 2024
59392e0
Fix goimports issues in packs_test.go
binary-ho Aug 10, 2024
7712bb8
Fix push/pull test where ClientSeq is less than ClientInfo's ClientSeq
binary-ho Aug 10, 2024
be485a7
Fix goimports issue at health_test.go
binary-ho Aug 10, 2024
b2472f4
Rollback health_test.go
binary-ho Aug 10, 2024
511f8f1
Add Validation ClientSeq is Sequential With DocInfo.Checkpoint.ClientSeq
binary-ho Aug 11, 2024
1917a7b
Write Test Code about validate ClientSeq is sequential with DocInfo.C…
binary-ho Aug 11, 2024
38cc7ed
Fix validate ClientSeq Sequential With Checkpoint
binary-ho Aug 11, 2024
4871d92
Remove TODO comment about needs of Changes validate
binary-ho Aug 11, 2024
a32137e
Fix lint issues: some lines too long
binary-ho Aug 11, 2024
0217951
Change to return `CodeFailedPrecondition` Status Code to the client i…
binary-ho Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package packs

import (
"context"
"errors"
"fmt"
"strconv"
gotime "time"
Expand All @@ -37,6 +38,14 @@ import (
"github.com/yorkie-team/yorkie/server/logging"
)

var (
// ErrClientSeqNotSequentialWithCheckpoint is returned when ClientSeq in reqPack are not sequential With DocInfo.Checkpoint.ClientSeq
ErrClientSeqNotSequentialWithCheckpoint = errors.New("ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq")

// ErrClientSeqInChangesAreNotSequential is returned when ClientSeq in reqPack.Changes are not sequential
ErrClientSeqInChangesAreNotSequential = errors.New("ClientSeq in reqPack.Changes are not sequential")
)

// PushPullKey creates a new sync.Key of PushPull for the given document.
func PushPullKey(projectID types.ID, docKey key.Key) sync.Key {
return sync.NewKey(fmt.Sprintf("pushpull-%s-%s", projectID, docKey))
Expand Down Expand Up @@ -77,6 +86,12 @@ func PushPull(

// TODO: Changes may be reordered or missing during communication on the network.
// We should check the change.pack with checkpoint to make sure the changes are in the correct order.
checkpoint := clientInfo.Checkpoint(docInfo.ID)
err := validateClientSeqSequential(reqPack.Changes, checkpoint)
if err != nil {
return nil, err
}

initialServerSeq := docInfo.ServerSeq

// 01. push changes: filter out the changes that are already saved in the database.
Expand Down Expand Up @@ -272,3 +287,47 @@ func BuildDocumentForServerSeq(

return doc, nil
}

func validateClientSeqSequential(changes []*change.Change, checkpoint change.Checkpoint) error {
if len(changes) < 1 {
return nil
}

if err := validateClientSeqSequentialWithCheckpoint(changes, checkpoint); err != nil {
return err
}

return validateClientSeqInChangesAreSequential(changes)
}

func validateClientSeqSequentialWithCheckpoint(changes []*change.Change, checkpoint change.Checkpoint) error {
expectedClientSeq := checkpoint.ClientSeq + 1
actualFirstClientSeq := changes[0].ClientSeq()

if expectedClientSeq != actualFirstClientSeq {
return fmt.Errorf(
"ClientSeq is not sequential with DocInfo.Checkpoint.ClientSeq (expected: %d, actual: %d) : %w",
expectedClientSeq,
actualFirstClientSeq,
ErrClientSeqNotSequentialWithCheckpoint,
)
}
return nil
}

func validateClientSeqInChangesAreSequential(changes []*change.Change) error {
nextClientSeq := changes[0].ClientSeq()
for _, cn := range changes[1:] {
nextClientSeq++

if nextClientSeq != cn.ClientSeq() {
return fmt.Errorf(
"ClientSeq in Changes are not sequential (expected: %d, actual: %d) : %w",
nextClientSeq,
cn.ClientSeq(),
ErrClientSeqInChangesAreNotSequential,
)
}
}
return nil
}
261 changes: 261 additions & 0 deletions server/packs/packs_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package packs_test

import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/net/context"

"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
api "github.com/yorkie-team/yorkie/api/yorkie/v1"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/clients"
"github.com/yorkie-team/yorkie/server/documents"
"github.com/yorkie-team/yorkie/server/packs"
"github.com/yorkie-team/yorkie/server/profiling/prometheus"
"github.com/yorkie-team/yorkie/server/rpc/connecthelper"
"github.com/yorkie-team/yorkie/test/helper"
)

var (
clientID = "000000000000000000000001"
)

func Test(t *testing.T) {
t.Run("push/pull sequential ClientSeq test (happy case)", func(t *testing.T) {
RunPushPullWithSequentialClientSeqTest(t)
})

t.Run("push/pull not sequential ClientSeq test", func(t *testing.T) {
RunPushPullWithNotSequentialClientSeqTest(t)
})

t.Run("push/pull ClientSeq less than ClientInfo's ClientSeq (duplicated request)", func(t *testing.T) {
RunPushPullWithClientSeqLessThanClientInfoTest(t)
})

t.Run("push/pull ServerSeq greater than DocInfo's ServerSeq", func(t *testing.T) {
RunPushPullWithServerSeqGreaterThanDocInfoTest(t)
})
}

func RunPushPullWithSequentialClientSeqTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)
actorID, _ := time.ActorIDFromHex(clientID)

changePackWithSequentialClientSeq, _ :=
createChangePackWithSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackWithSequentialClientSeq.DocumentKey, true)
err := clientInfo.AttachDocument(
docInfo.ID, changePackWithSequentialClientSeq.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(
ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithSequentialClientSeq, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.NoError(t, err)
}

func RunPushPullWithNotSequentialClientSeqTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackWithNotSequentialClientSeq, _ :=
createChangePackWithNotSequentialClientSeq(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(ctx, be, clientInfo,
changePackWithNotSequentialClientSeq.DocumentKey, true)
err := clientInfo.AttachDocument(
docInfo.ID, changePackWithNotSequentialClientSeq.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(
ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithNotSequentialClientSeq, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
assert.Equal(t, connecthelper.CodeOf(packs.ErrClientSeqNotSequential), connecthelper.CodeOf(err))

Check failure on line 106 in server/packs/packs_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: packs.ErrClientSeqNotSequential (typecheck)
}

func RunPushPullWithClientSeqLessThanClientInfoTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackFixture, _ :=
createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackFixture.DocumentKey, true)
err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, err = packs.PushPull(ctx, be, project.ToProject(),
clientInfo, docInfo, changePackFixture, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})
if err != nil {
assert.Fail(t, "failed to push pull")
}

changePackWithClientSeqLessThanClientInfo, _ :=
createChangePackWithClientSeqLessThanClientInfo(helper.TestDocKey(t).String(), actorID.Bytes())
_, err = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo,
changePackWithClientSeqLessThanClientInfo, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

assert.NoError(t, err)
}

func RunPushPullWithServerSeqGreaterThanDocInfoTest(t *testing.T) {
ctx := context.Background()
be := setUpBackend(t)
project, _ := be.DB.FindProjectInfoByID(
ctx,
database.DefaultProjectID,
)

clientInfo, _ := clients.Activate(ctx, be.DB, project.ToProject(), clientID)

actorID, _ := time.ActorIDFromHex(clientID)
changePackFixture, _ :=
createChangePackFixture(helper.TestDocKey(t).String(), actorID.Bytes())

docInfo, _ := documents.FindDocInfoByKeyAndOwner(
ctx, be, clientInfo, changePackFixture.DocumentKey, true)
err := clientInfo.AttachDocument(docInfo.ID, changePackFixture.IsAttached())
if err != nil {
assert.Fail(t, "failed to attach document")
}

_, _ = packs.PushPull(ctx, be, project.ToProject(), clientInfo, docInfo,
changePackFixture, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

changePackWithServerSeqGreaterThanDocInfo, _ :=
createChangePackWithServerSeqGreaterThanDocInfo(helper.TestDocKey(t).String())

_, err = packs.PushPull(ctx, be, project.ToProject(),
clientInfo, docInfo, changePackWithServerSeqGreaterThanDocInfo, packs.PushPullOptions{
Mode: types.SyncModePushPull,
Status: document.StatusAttached,
})

assert.Equal(t, connecthelper.CodeOf(packs.ErrInvalidServerSeq), connecthelper.CodeOf(err))
}

func createChangePackWithSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 2},
Changes: []*api.Change{
createChange(0, 0, actorID),
createChange(1, 1, actorID),
createChange(2, 2, actorID),
},
})
}

func createChangePackWithNotSequentialClientSeq(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 0, ClientSeq: 0},
Changes: []*api.Change{
createChange(2, 2, actorID),
createChange(1, 1, actorID),
createChange(0, 0, actorID),
},
})
}

func createChangePackWithClientSeqLessThanClientInfo(documentKey string, actorID []byte) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 2, ClientSeq: 0},
Changes: []*api.Change{
createChange(0, 0, actorID),
},
})
}

func createChangePackFixture(documentKey string, actorID []byte) (*change.Pack, error) {
return createChangePackWithSequentialClientSeq(documentKey, actorID)
}

func createChangePackWithServerSeqGreaterThanDocInfo(documentKey string) (*change.Pack, error) {
return converter.FromChangePack(&api.ChangePack{
DocumentKey: documentKey,
Checkpoint: &api.Checkpoint{ServerSeq: 1e9, ClientSeq: 2},
})
}

func createChange(clientSeq uint32, lamport int64, actorID []byte) *api.Change {
return &api.Change{
Id: &api.ChangeID{
ClientSeq: clientSeq,
Lamport: lamport,
ActorId: actorID,
},
}
}

func setUpBackend(
t *testing.T,
) *backend.Backend {
conf := helper.TestConfig()

metrics, err := prometheus.NewMetrics()
assert.NoError(t, err)

be, err := backend.New(
conf.Backend,
conf.Mongo,
conf.Housekeeping,
metrics,
)
assert.NoError(t, err)

return be
}
40 changes: 22 additions & 18 deletions server/rpc/connecthelper/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,17 @@ import (
// errorToConnectCode maps an error to connectRPC status code.
var errorToConnectCode = map[error]connect.Code{
// InvalidArgument means the request is malformed.
converter.ErrPackRequired: connect.CodeInvalidArgument,
converter.ErrCheckpointRequired: connect.CodeInvalidArgument,
time.ErrInvalidHexString: connect.CodeInvalidArgument,
time.ErrInvalidActorID: connect.CodeInvalidArgument,
types.ErrInvalidID: connect.CodeInvalidArgument,
clients.ErrInvalidClientID: connect.CodeInvalidArgument,
clients.ErrInvalidClientKey: connect.CodeInvalidArgument,
key.ErrInvalidKey: connect.CodeInvalidArgument,
types.ErrEmptyProjectFields: connect.CodeInvalidArgument,
converter.ErrPackRequired: connect.CodeInvalidArgument,
converter.ErrCheckpointRequired: connect.CodeInvalidArgument,
time.ErrInvalidHexString: connect.CodeInvalidArgument,
time.ErrInvalidActorID: connect.CodeInvalidArgument,
types.ErrInvalidID: connect.CodeInvalidArgument,
clients.ErrInvalidClientID: connect.CodeInvalidArgument,
clients.ErrInvalidClientKey: connect.CodeInvalidArgument,
key.ErrInvalidKey: connect.CodeInvalidArgument,
types.ErrEmptyProjectFields: connect.CodeInvalidArgument,
packs.ErrClientSeqNotSequentialWithCheckpoint: connect.CodeInvalidArgument,
packs.ErrClientSeqInChangesAreNotSequential: connect.CodeInvalidArgument,

// NotFound means the requested resource does not exist.
database.ErrProjectNotFound: connect.CodeNotFound,
Expand Down Expand Up @@ -91,15 +93,17 @@ var errorToConnectCode = map[error]connect.Code{
// TODO(hackerwins): We need to add codes by hand for each error. It would be
// better to generate this map automatically.
var errorToCode = map[error]string{
converter.ErrPackRequired: "ErrPackRequired",
converter.ErrCheckpointRequired: "ErrCheckpointRequired",
time.ErrInvalidHexString: "ErrInvalidHexString",
time.ErrInvalidActorID: "ErrInvalidActorID",
types.ErrInvalidID: "ErrInvalidID",
clients.ErrInvalidClientID: "ErrInvalidClientID",
clients.ErrInvalidClientKey: "ErrInvalidClientKey",
key.ErrInvalidKey: "ErrInvalidKey",
types.ErrEmptyProjectFields: "ErrEmptyProjectFields",
converter.ErrPackRequired: "ErrPackRequired",
converter.ErrCheckpointRequired: "ErrCheckpointRequired",
time.ErrInvalidHexString: "ErrInvalidHexString",
time.ErrInvalidActorID: "ErrInvalidActorID",
types.ErrInvalidID: "ErrInvalidID",
clients.ErrInvalidClientID: "ErrInvalidClientID",
clients.ErrInvalidClientKey: "ErrInvalidClientKey",
key.ErrInvalidKey: "ErrInvalidKey",
types.ErrEmptyProjectFields: "ErrEmptyProjectFields",
packs.ErrClientSeqNotSequentialWithCheckpoint: "ErrClientSeqNotSequentialWithCheckpoint",
packs.ErrClientSeqInChangesAreNotSequential: "ErrClientSeqInChangesAreNotSequential",

database.ErrProjectNotFound: "ErrProjectNotFound",
database.ErrClientNotFound: "ErrClientNotFound",
Expand Down
Loading