Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

11-to-12: Re-work logic to use the backup file for both migrate and revert #150

Merged
merged 7 commits into from
Feb 15, 2022
Merged
2 changes: 1 addition & 1 deletion fs-repo-11-to-12/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ipfs/fs-repo-migrations/tools v0.0.0-20211209222258-754a2dcb82ea
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5 // indirect
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 // indirect
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions fs-repo-11-to-12/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5 h1:ovdpQk2ZVK6eQLzMCZy1z2tJae7yvE8xaPUw4Pr1RqI=
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 h1:Uvp2/ZNlR3YmH04XJGv4YsPabhPzRPyeroLltVKBSr8=
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-flatfs v0.4.5 h1:4QceuKEbH+HVZ2ZommstJMi3o3II+dWS3IhLaD7IGHs=
github.com/ipfs/go-ds-flatfs v0.4.5/go.mod h1:e4TesLyZoA8k1gV/yCuBTnt2PJtypn4XUlB5n8KQMZY=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
Expand Down
110 changes: 62 additions & 48 deletions fs-repo-11-to-12/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (m *Migration) Apply(opts migrate.Options) error {
log.Error(err)
return err
}
defer f.Close()
buf := bufio.NewWriter(f)

swapCh := make(chan Swap, 1000)
Expand All @@ -119,7 +118,7 @@ func (m *Migration) Apply(opts migrate.Options) error {
for _, prefix := range migrationPrefixes {
log.VLog(" - Adding keys in prefix %s to backup file", prefix)
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore, SwapCh: swapCh}
total, err := cidSwapper.Run(true) // DRY RUN
total, err := cidSwapper.Prepare() // DRY RUN
if err != nil {
close(swapCh)
log.Error(err)
Expand All @@ -131,17 +130,12 @@ func (m *Migration) Apply(opts migrate.Options) error {
// Wait for our writing to finish before doing the flushing.
<-writingDone
buf.Flush()
f.Close()

// MIGRATION: Run the real migration.
for _, prefix := range migrationPrefixes {
log.VLog(" - Migrating keys in prefix %s", prefix)
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore}
total, err := cidSwapper.Run(false) // NOT a Dry Run
if err != nil {
log.Error(err)
return err
}
log.Log("%d CIDv1 keys in %s have been migrated", total, prefix)
err = m.scanAndSwap(filepath.Join(opts.Path, backupFile), false) // revert=false
if err != nil {
log.Error(err)
return err
}

// Wrap up, we are now in repo-version 12.
Expand Down Expand Up @@ -187,21 +181,48 @@ func (m *Migration) Revert(opts migrate.Options) error {

// Open revert path for reading
backupPath := filepath.Join(opts.Path, backupFile)
err = m.scanAndSwap(backupPath, true) // revert = true
if err != nil {
log.Error(err)
return err
}

// Wrap up the Revert. We are back at version 11.
if err := repo.WriteVersion("11"); err != nil {
log.Error("failed to write version file")
return err
}

log.Log("reverted version file to version 11")

// Move the backup file out of the way.
err = os.Rename(backupPath, backupPath+".reverted")
if err != nil {
log.Error("could not rename the backup file, but migration worked: %s", err)
return err
}
return nil
}

// Receives a backup file which contains all the things that need to be
// migrated and reads every line, performing swaps in the needed direction.
func (m *Migration) scanAndSwap(backupPath string, revert bool) error {
f, err := getBackupFile(backupPath)
if err != nil {
log.Error(err)
return err
}
defer f.Close()

unswapCh := make(chan Swap, 1000)
swapCh := make(chan Swap, 1000)
scanner := bufio.NewScanner(f)
var scannerErr error

// This will send swap objects to the Unswapper on unswapCh as they
// This will send swap objects to the swapping channel as they
// are read from the backup file on disk. It will also send MFS and
// pinset pins for reversal.
// pinset pins for reversal when doing a revert.
go func() {
defer close(unswapCh)
defer close(swapCh)

// Process backup file first.
for scanner.Scan() {
Expand All @@ -222,60 +243,53 @@ func (m *Migration) Revert(opts migrate.Options) error {
break
}
mhashPath := prefix.Child(dshelp.MultihashToDsKey(cid.Hash()))
// This is the original swap object which is what we
// wanted to rebuild. Old is the old path and new is
// the new path and the unswapper will revert this.

// The swapper will move cidPath to mhashPath, and the unswapper
// will do the opposite.
sw := Swap{Old: cidPath, New: mhashPath}
unswapCh <- sw
swapCh <- sw
}
if err := scanner.Err(); err != nil {
log.Error(err)
return
}

// Process MFS/pinset. We have to do this in cases the user
// has been running with the migration for some time and made changes to
// the pinset or the MFS root.
if err := walkPinsAndMFS(unswapCh, m.dstore); err != nil {
log.Error(err)
return
if revert {
// Process MFS/pinset. We have to do this in cases the
// user has been running with the migration for some
// time and made changes to the pinset or the MFS
// root.
if err := walkPinsAndMFS(swapCh, m.dstore); err != nil {
log.Error(err)
return
}
}

}()

// The backup file contains prefixed keys, so we do not need to set
// Prefix in the CidSwapper.
cidSwapper := CidSwapper{Store: m.dstore}
total, err := cidSwapper.Revert(unswapCh)
var total uint64
if revert {
total, err = cidSwapper.Revert(swapCh)
} else {
total, err = cidSwapper.Run(swapCh)
}
if err != nil {
log.Error(err)
return err
}
// Revert will only return after unswapCh is closed, so we know

// The swapper will only return after swapCh is closed, so we know
// scannerErr is safe to read at this point.
if scannerErr != nil {
return err
}

// Wrap up the Revert. We are back at version 11.
log.Log("%d multihashes reverted to CidV1s", total)
if err := repo.WriteVersion("11"); err != nil {
log.Error("failed to write version file")
return err
}

log.Log("reverted version file to version 11")
err = f.Close()
if err != nil {
log.Error("could not close backup file")
return err
}

// Move the backup file out of the way.
err = os.Rename(backupPath, backupPath+".reverted")
if err != nil {
log.Error("could not rename the backup file, but migration worked: %s", err)
return err
if revert {
log.Log("%d multihashes swapped to CidV1s", total)
} else {
log.Log("%d CidV1s swapped to multihashes", total)
}
return nil
}
Expand Down
Loading