Skip to content

Commit

Permalink
Merge pull request #150 from ipfs/fix/11-to-12-querying
Browse files Browse the repository at this point in the history
11-to-12: Re-work logic to use the backup file for both migrate and revert
  • Loading branch information
aschmahmann authored Feb 15, 2022
2 parents 25fed4c + fa319e8 commit 9dcf004
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 106 deletions.
2 changes: 1 addition & 1 deletion fs-repo-11-to-12/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/ipfs/fs-repo-migrations/tools v0.0.0-20211209222258-754a2dcb82ea
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5 // indirect
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 // indirect
github.com/ipfs/go-filestore v1.0.0
github.com/ipfs/go-ipfs v0.8.0
github.com/ipfs/go-ipfs-ds-help v1.0.0
Expand Down
4 changes: 2 additions & 2 deletions fs-repo-11-to-12/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,8 @@ github.com/ipfs/go-ds-badger v0.0.7/go.mod h1:qt0/fWzZDoPW6jpQeqUjR5kBfhDNB65jd9
github.com/ipfs/go-ds-badger v0.2.1/go.mod h1:Tx7l3aTph3FMFrRS838dcSJh+jjA7cX9DrGVwx/NOwE=
github.com/ipfs/go-ds-badger v0.2.3/go.mod h1:pEYw0rgg3FIrywKKnL+Snr+w/LjJZVMTBRn4FS6UHUk=
github.com/ipfs/go-ds-badger v0.2.6/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5 h1:ovdpQk2ZVK6eQLzMCZy1z2tJae7yvE8xaPUw4Pr1RqI=
github.com/ipfs/go-ds-badger v0.2.7-0.20211210151007-a2805355dcf5/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612 h1:Uvp2/ZNlR3YmH04XJGv4YsPabhPzRPyeroLltVKBSr8=
github.com/ipfs/go-ds-badger v0.2.7-0.20220117180822-159330558612/go.mod h1:02rnztVKA4aZwDuaRPTf8mpqcKmXP7mLl6JPxd14JHA=
github.com/ipfs/go-ds-flatfs v0.4.5 h1:4QceuKEbH+HVZ2ZommstJMi3o3II+dWS3IhLaD7IGHs=
github.com/ipfs/go-ds-flatfs v0.4.5/go.mod h1:e4TesLyZoA8k1gV/yCuBTnt2PJtypn4XUlB5n8KQMZY=
github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIykR4L04tMOYc=
Expand Down
110 changes: 62 additions & 48 deletions fs-repo-11-to-12/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ func (m *Migration) Apply(opts migrate.Options) error {
log.Error(err)
return err
}
defer f.Close()
buf := bufio.NewWriter(f)

swapCh := make(chan Swap, 1000)
Expand All @@ -119,7 +118,7 @@ func (m *Migration) Apply(opts migrate.Options) error {
for _, prefix := range migrationPrefixes {
log.VLog(" - Adding keys in prefix %s to backup file", prefix)
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore, SwapCh: swapCh}
total, err := cidSwapper.Run(true) // DRY RUN
total, err := cidSwapper.Prepare() // DRY RUN
if err != nil {
close(swapCh)
log.Error(err)
Expand All @@ -131,17 +130,12 @@ func (m *Migration) Apply(opts migrate.Options) error {
// Wait for our writing to finish before doing the flushing.
<-writingDone
buf.Flush()
f.Close()

// MIGRATION: Run the real migration.
for _, prefix := range migrationPrefixes {
log.VLog(" - Migrating keys in prefix %s", prefix)
cidSwapper := CidSwapper{Prefix: prefix, Store: m.dstore}
total, err := cidSwapper.Run(false) // NOT a Dry Run
if err != nil {
log.Error(err)
return err
}
log.Log("%d CIDv1 keys in %s have been migrated", total, prefix)
err = m.scanAndSwap(filepath.Join(opts.Path, backupFile), false) // revert=false
if err != nil {
log.Error(err)
return err
}

// Wrap up, we are now in repo-version 12.
Expand Down Expand Up @@ -187,21 +181,48 @@ func (m *Migration) Revert(opts migrate.Options) error {

// Open revert path for reading
backupPath := filepath.Join(opts.Path, backupFile)
err = m.scanAndSwap(backupPath, true) // revert = true
if err != nil {
log.Error(err)
return err
}

// Wrap up the Revert. We are back at version 11.
if err := repo.WriteVersion("11"); err != nil {
log.Error("failed to write version file")
return err
}

log.Log("reverted version file to version 11")

// Move the backup file out of the way.
err = os.Rename(backupPath, backupPath+".reverted")
if err != nil {
log.Error("could not rename the backup file, but migration worked: %s", err)
return err
}
return nil
}

// Receives a backup file which contains all the things that need to be
// migrated and reads every line, performing swaps in the needed direction.
func (m *Migration) scanAndSwap(backupPath string, revert bool) error {
f, err := getBackupFile(backupPath)
if err != nil {
log.Error(err)
return err
}
defer f.Close()

unswapCh := make(chan Swap, 1000)
swapCh := make(chan Swap, 1000)
scanner := bufio.NewScanner(f)
var scannerErr error

// This will send swap objects to the Unswapper on unswapCh as they
// This will send swap objects to the swapping channel as they
// are read from the backup file on disk. It will also send MFS and
// pinset pins for reversal.
// pinset pins for reversal when doing a revert.
go func() {
defer close(unswapCh)
defer close(swapCh)

// Process backup file first.
for scanner.Scan() {
Expand All @@ -222,60 +243,53 @@ func (m *Migration) Revert(opts migrate.Options) error {
break
}
mhashPath := prefix.Child(dshelp.MultihashToDsKey(cid.Hash()))
// This is the original swap object which is what we
// wanted to rebuild. Old is the old path and new is
// the new path and the unswapper will revert this.

// The swapper will move cidPath to mhashPath, and the unswapper
// will do the opposite.
sw := Swap{Old: cidPath, New: mhashPath}
unswapCh <- sw
swapCh <- sw
}
if err := scanner.Err(); err != nil {
log.Error(err)
return
}

// Process MFS/pinset. We have to do this in cases the user
// has been running with the migration for some time and made changes to
// the pinset or the MFS root.
if err := walkPinsAndMFS(unswapCh, m.dstore); err != nil {
log.Error(err)
return
if revert {
// Process MFS/pinset. We have to do this in cases the
// user has been running with the migration for some
// time and made changes to the pinset or the MFS
// root.
if err := walkPinsAndMFS(swapCh, m.dstore); err != nil {
log.Error(err)
return
}
}

}()

// The backup file contains prefixed keys, so we do not need to set
// Prefix in the CidSwapper.
cidSwapper := CidSwapper{Store: m.dstore}
total, err := cidSwapper.Revert(unswapCh)
var total uint64
if revert {
total, err = cidSwapper.Revert(swapCh)
} else {
total, err = cidSwapper.Run(swapCh)
}
if err != nil {
log.Error(err)
return err
}
// Revert will only return after unswapCh is closed, so we know

// The swapper will only return after swapCh is closed, so we know
// scannerErr is safe to read at this point.
if scannerErr != nil {
return err
}

// Wrap up the Revert. We are back at version 11.
log.Log("%d multihashes reverted to CidV1s", total)
if err := repo.WriteVersion("11"); err != nil {
log.Error("failed to write version file")
return err
}

log.Log("reverted version file to version 11")
err = f.Close()
if err != nil {
log.Error("could not close backup file")
return err
}

// Move the backup file out of the way.
err = os.Rename(backupPath, backupPath+".reverted")
if err != nil {
log.Error("could not rename the backup file, but migration worked: %s", err)
return err
if revert {
log.Log("%d multihashes swapped to CidV1s", total)
} else {
log.Log("%d CidV1s swapped to multihashes", total)
}
return nil
}
Expand Down
Loading

0 comments on commit 9dcf004

Please sign in to comment.