Skip to content

Commit

Permalink
Fix clients.Deactivate to reflect detached documents in clientInfo …
Browse files Browse the repository at this point in the history
…to the database
  • Loading branch information
raararaara committed Jun 24, 2024
1 parent 1df48f5 commit 438170c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 6 deletions.
5 changes: 5 additions & 0 deletions server/backend/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ type Database interface {
// after handling PushPull.
UpdateClientInfoAfterPushPull(ctx context.Context, clientInfo *ClientInfo, docInfo *DocInfo) error

// UpdateClientDocumentInfo updates the client from the given clientInfo.
// TODO(raararaara): This method is identical to `UpdateClientInfoAfterPushPull`.
// I wrote this for testing first and plan to integrate it later.
UpdateClientDocumentInfo(ctx context.Context, clientInfo *ClientInfo, docInfo *DocInfo) error

// FindNextNCyclingProjectInfos finds the next N cycling projects from the given projectID.
FindNextNCyclingProjectInfos(
ctx context.Context,
Expand Down
61 changes: 61 additions & 0 deletions server/backend/database/memory/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,67 @@ func (d *DB) UpdateClientInfoAfterPushPull(
return nil
}

// UpdateClientDocumentInfo updates the client from the given clientInfo and docInfo.
func (d *DB) UpdateClientDocumentInfo(
_ context.Context,
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
) error {
docRefKey := docInfo.RefKey()
clientDocInfo := clientInfo.Documents[docRefKey.DocID]
attached, err := clientInfo.IsAttached(docRefKey.DocID)
if err != nil {
return err
}

txn := d.db.Txn(true)
defer txn.Abort()

raw, err := txn.First(tblClients, "id", clientInfo.ID.String())
if err != nil {
return fmt.Errorf("find client by id: %w", err)
}
if raw == nil {
return fmt.Errorf("%s: %w", clientInfo.ID, database.ErrDocumentNotFound)
}

loaded := raw.(*database.ClientInfo).DeepCopy()

if !attached {
loaded.Documents[docRefKey.DocID] = &database.ClientDocInfo{
Status: clientDocInfo.Status,
}
loaded.UpdatedAt = gotime.Now()
} else {
if _, ok := loaded.Documents[docRefKey.DocID]; !ok {
loaded.Documents[docRefKey.DocID] = &database.ClientDocInfo{}
}

loadedClientDocInfo := loaded.Documents[docRefKey.DocID]
serverSeq := loadedClientDocInfo.ServerSeq
if clientDocInfo.ServerSeq > loadedClientDocInfo.ServerSeq {
serverSeq = clientDocInfo.ServerSeq
}
clientSeq := loadedClientDocInfo.ClientSeq
if clientDocInfo.ClientSeq > loadedClientDocInfo.ClientSeq {
clientSeq = clientDocInfo.ClientSeq
}
loaded.Documents[docRefKey.DocID] = &database.ClientDocInfo{
ServerSeq: serverSeq,
ClientSeq: clientSeq,
Status: clientDocInfo.Status,
}
loaded.UpdatedAt = gotime.Now()
}

if err := txn.Insert(tblClients, loaded); err != nil {
return fmt.Errorf("update client: %w", err)
}
txn.Commit()

return nil
}

// FindDeactivateCandidatesPerProject finds the clients that need housekeeping per project.
func (d *DB) FindDeactivateCandidatesPerProject(
_ context.Context,
Expand Down
54 changes: 54 additions & 0 deletions server/backend/database/mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,60 @@ func (c *Client) UpdateClientInfoAfterPushPull(
return nil
}

// UpdateClientDocumentInfo updates the client from the given clientInfo and docInfo.
func (c *Client) UpdateClientDocumentInfo(
ctx context.Context,
clientInfo *database.ClientInfo,
docInfo *database.DocInfo,
) error {
clientDocInfoKey := getClientDocInfoKey(docInfo.ID)
clientDocInfo, ok := clientInfo.Documents[docInfo.ID]
if !ok {
return fmt.Errorf("client doc info: %w", database.ErrDocumentNeverAttached)
}

updater := bson.M{
"$max": bson.M{
clientDocInfoKey + "server_seq": clientDocInfo.ServerSeq,
clientDocInfoKey + "client_seq": clientDocInfo.ClientSeq,
},
"$set": bson.M{
clientDocInfoKey + StatusKey: clientDocInfo.Status,
"updated_at": clientInfo.UpdatedAt,
},
}

attached, err := clientInfo.IsAttached(docInfo.ID)
if err != nil {
return err
}

if !attached {
updater = bson.M{
"$set": bson.M{
clientDocInfoKey + "server_seq": 0,
clientDocInfoKey + "client_seq": 0,
clientDocInfoKey + StatusKey: clientDocInfo.Status,
"updated_at": clientInfo.UpdatedAt,
},
}
}

result := c.collection(ColClients).FindOneAndUpdate(ctx, bson.M{
"project_id": clientInfo.ProjectID,
"_id": clientInfo.ID,
}, updater)

if result.Err() != nil {
if result.Err() == mongo.ErrNoDocuments {
return fmt.Errorf("%s: %w", clientInfo.Key, database.ErrDocumentNotFound)
}
return fmt.Errorf("update client info: %w", result.Err())
}

return nil
}

// FindDeactivateCandidatesPerProject finds the clients that need housekeeping per project.
func (c *Client) FindDeactivateCandidatesPerProject(
ctx context.Context,
Expand Down
35 changes: 29 additions & 6 deletions server/clients/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func Deactivate(
return nil, err
}

for docID, clientDocInfo := range clientInfo.Documents {
// 01. Detach documents from client.
for docID := range clientInfo.Documents {
// 01-1. Check if given document is attached.
isAttached, err := clientInfo.IsAttached(docID)
if err != nil {
return nil, err
Expand All @@ -63,15 +65,36 @@ func Deactivate(
continue
}

docInfo, err := db.FindDocInfoByRefKey(ctx, types.DocRefKey{
ProjectID: refKey.ProjectID,
DocID: docID,
})
if err != nil {
return nil, err
}

// 01-2. Modify status of given document.
if err := clientInfo.DetachDocument(docID); err != nil {
return nil, err
}

// TODO(hackerwins): We need to remove the presence of the client from the document.
// Be careful that housekeeping is executed by the leader. And documents are sharded
// by the servers in the cluster. So, we need to consider the case where the leader is
// not the same as the server that handles the document.
// 01-3. Update DB
if err := db.UpdateClientDocumentInfo(ctx, clientInfo, docInfo); err != nil {
return nil, err
}
}
updatedClientInfo, err := db.DeactivateClient(ctx, refKey)
if err != nil {
return nil, err
}

// TODO(hackerwins): We need to remove the presence of the client from the document.
// Be careful that housekeeping is executed by the leader. And documents are sharded
// by the servers in the cluster. So, we need to consider the case where the leader is
// not the same as the server that handles the document.

// 02. Update syncedSeq if
for docID, clientDocInfo := range clientInfo.Documents {
if err := db.UpdateSyncedSeq(
ctx,
clientInfo,
Expand All @@ -85,7 +108,7 @@ func Deactivate(
}
}

return db.DeactivateClient(ctx, refKey)
return updatedClientInfo, err
}

// FindActiveClientInfo find the active client info by the given ref key.
Expand Down

0 comments on commit 438170c

Please sign in to comment.