Skip to content

Commit

Permalink
Add migration script to remove version vector (#1091)
Browse files Browse the repository at this point in the history
  • Loading branch information
chacha912 authored Dec 9, 2024
1 parent 1084b8d commit 608c7e4
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/yorkie/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/yorkie-team/yorkie/cmd/yorkie/config"
v053 "github.com/yorkie-team/yorkie/migrations/v0.5.3"
v056 "github.com/yorkie-team/yorkie/migrations/v0.5.6"
v057 "github.com/yorkie-team/yorkie/migrations/v0.5.7"
yorkiemongo "github.com/yorkie-team/yorkie/server/backend/database/mongo"
)

Expand All @@ -45,6 +46,7 @@ var (
var migrationMap = map[string]func(ctx context.Context, db *mongo.Client, dbName string, batchSize int) error{
"v0.5.3": v053.RunMigration,
"v0.5.6": v056.RunMigration,
"v0.5.7": v057.RunMigration,
}

// runMigration runs the migration for the given version.
Expand Down
36 changes: 36 additions & 0 deletions migrations/v0.5.7/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package v057 provides migration for v0.5.7
package v057

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/mongo"
)

// RunMigration runs migrations for v0.5.7
func RunMigration(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
if err := RemoveVersionVector(ctx, db, databaseName, batchSize); err != nil {
return err
}

fmt.Println("v0.5.7 migration completed")

return nil
}
107 changes: 107 additions & 0 deletions migrations/v0.5.7/remove-version-vector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package v057

import (
"context"
"fmt"

"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
)

func processDeletionBatch(
ctx context.Context,
collection *mongo.Collection,
documents []bson.M,
) error {
var writeModels []mongo.WriteModel

for _, doc := range documents {
writeModel := mongo.NewUpdateOneModel().SetFilter(bson.M{
"project_id": doc["project_id"],
"doc_id": doc["doc_id"],
"server_seq": doc["server_seq"],
}).SetUpdate(bson.M{
"$unset": bson.M{
"version_vector": "",
},
})
writeModels = append(writeModels, writeModel)
}

if len(writeModels) > 0 {
_, err := collection.BulkWrite(ctx, writeModels)
if err != nil {
return fmt.Errorf("execute bulk write: %w", err)
}
}

return nil
}

// RemoveVersionVector migrates the changes collection to remove version vector field.
func RemoveVersionVector(ctx context.Context, db *mongo.Client, databaseName string, batchSize int) error {
collection := db.Database(databaseName).Collection("changes")
totalCount, err := collection.CountDocuments(ctx, bson.M{})
if err != nil {
return err
}
batchCount := 1
prevPercentage := 0

cursor, err := collection.Find(ctx, bson.M{})
if err != nil {
return err
}

var documents []bson.M

for cursor.Next(ctx) {
var doc bson.M
if err := cursor.Decode(&doc); err != nil {
return fmt.Errorf("decode document: %w", err)
}

documents = append(documents, doc)

if len(documents) >= batchSize {
if err := processDeletionBatch(ctx, collection, documents); err != nil {
return err
}

percentage := int(float64(batchSize*batchCount) / float64(totalCount) * 100)
if percentage != prevPercentage {
fmt.Printf("%s.changes version vector removal %d%% completed \n", databaseName, percentage)
prevPercentage = percentage
}

documents = documents[:0]
batchCount++
}
}

if len(documents) > 0 {
if err := processDeletionBatch(ctx, collection, documents); err != nil {
return fmt.Errorf("process final batch: %w", err)
}
}

fmt.Println("remove version vector completed")

return nil
}

0 comments on commit 608c7e4

Please sign in to comment.