-
Notifications
You must be signed in to change notification settings - Fork 65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Datastore copy util #469
Datastore copy util #469
Changes from 1 commit
bdd8cab
dca7247
2e427a5
821769a
b3f3ce3
9d9b807
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"net/url" | ||
"os" | ||
|
||
ds "github.com/ipfs/go-datastore" | ||
"github.com/ipfs/go-datastore/query" | ||
logging "github.com/ipfs/go-log/v2" | ||
"github.com/namsral/flag" | ||
badger "github.com/textileio/go-ds-badger" | ||
mongods "github.com/textileio/go-ds-mongo" | ||
) | ||
|
||
var log = logging.Logger("ds-copy") | ||
|
||
func main() { | ||
fs := flag.NewFlagSet(os.Args[0], 0) | ||
|
||
fromBadgerRepo := fs.String("fromBadgerRepo", "", "Source badger repo path") | ||
toBadgerRepo := fs.String("toBadgerRepo", "", "Destination badger repo path") | ||
|
||
fromMongoUri := fs.String("fromMongoUri", "", "Source MongoDB URI") | ||
fromMongoDatabase := fs.String("fromMongoDatabase", "", "Source MongoDB database") | ||
fromMongoCollection := fs.String("fromMongoCollection", "", "Source MongoDB collection") | ||
toMongoUri := fs.String("toMongoUri", "", "Destination MongoDB URI") | ||
toMongoDatabase := fs.String("toMongoDatabase", "", "Destination MongoDB database") | ||
toMongoCollection := fs.String("toMongoCollection", "", "Destination MongoDB collection") | ||
|
||
verbose := fs.Bool("verbose", false, "More verbose output") | ||
if err := fs.Parse(os.Args[1:]); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
logging.SetupLogging(logging.Config{ | ||
Format: logging.ColorizedOutput, | ||
Stderr: true, | ||
Level: logging.LevelError, | ||
}) | ||
if err := logging.SetLogLevel("ds-copy", "info"); err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
if len(*fromBadgerRepo) != 0 && len(*fromMongoUri) != 0 { | ||
log.Fatal("multiple sources specified") | ||
} | ||
if len(*fromBadgerRepo) == 0 && len(*fromMongoUri) == 0 { | ||
log.Fatal("source not specified") | ||
} | ||
if len(*toBadgerRepo) != 0 && len(*toMongoUri) != 0 { | ||
log.Fatal("multiple destinations specified") | ||
} | ||
if len(*toBadgerRepo) == 0 && len(*toMongoUri) == 0 { | ||
log.Fatal("destination not specified") | ||
} | ||
|
||
var from, to ds.Datastore | ||
var err error | ||
if len(*fromBadgerRepo) != 0 { | ||
from, err = badger.NewDatastore(*fromBadgerRepo, &badger.DefaultOptions) | ||
if err != nil { | ||
log.Fatalf("connecting to badger source: %v", err) | ||
} | ||
log.Infof("connected to badger source: %s", *fromBadgerRepo) | ||
} | ||
if len(*toBadgerRepo) != 0 { | ||
to, err = badger.NewDatastore(*toBadgerRepo, &badger.DefaultOptions) | ||
if err != nil { | ||
log.Fatalf("connecting to badger destination: %v", err) | ||
} | ||
log.Infof("connected to badger destination: %s", *toBadgerRepo) | ||
} | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
if len(*fromMongoUri) != 0 { | ||
uri, err := url.Parse(*fromMongoUri) | ||
if err != nil { | ||
log.Fatalf("parsing source mongo URI: %v", err) | ||
} | ||
if len(*fromMongoDatabase) == 0 { | ||
log.Fatal("source mongo database not specified") | ||
} | ||
if len(*fromMongoCollection) == 0 { | ||
log.Fatal("source mongo collection not specified") | ||
} | ||
from, err = mongods.New(ctx, *fromMongoUri, *fromMongoDatabase, mongods.WithCollName(*fromMongoCollection)) | ||
if err != nil { | ||
log.Fatalf("connecting to mongo source: %v", err) | ||
} | ||
log.Infof("connected to mongo source: %s", uri.Redacted()) | ||
} | ||
if len(*toMongoUri) != 0 { | ||
uri, err := url.Parse(*toMongoUri) | ||
if err != nil { | ||
log.Fatalf("parsing destination mongo URI: %v", err) | ||
} | ||
if len(*toMongoDatabase) == 0 { | ||
log.Fatal("destination mongo database not specified") | ||
} | ||
if len(*toMongoCollection) == 0 { | ||
log.Fatal("destination mongo collection not specified") | ||
} | ||
to, err = mongods.New(ctx, *toMongoUri, *toMongoDatabase, mongods.WithCollName(*toMongoCollection)) | ||
if err != nil { | ||
log.Fatalf("connecting to mongo destination: %v", err) | ||
} | ||
log.Infof("connected to mongo destination: %s", uri.Redacted()) | ||
} | ||
|
||
results, err := from.Query(query.Query{}) | ||
if err != nil { | ||
log.Fatalf("querying source: %v", err) | ||
} | ||
defer results.Close() | ||
for r := range results.Next() { | ||
if err := to.Put(ds.NewKey(r.Key), r.Value); err != nil { | ||
log.Fatalf("copying %s: %v", r.Key, err) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I discovered while working and testing Powergate migrations that this is too slow for copying (at least for my taste). Here is what I did for migrations; which reduced times by some orders of magnitude. Nothing crazy, ignoring the migration logic, basically some rate-limiting to Read-and-Put up to 1000 in concurrently instead of serially. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, copied your approach there |
||
if *verbose { | ||
log.Infof("copied %s", r.Key) | ||
} | ||
} | ||
|
||
log.Info("done") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Between these two lines we need to add:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, thanks!