Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

complete multistore #11

Draft
wants to merge 2 commits into
base: multistore
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 85 additions & 29 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Store struct {
keysByName map[string]types.StoreKey
stores map[types.StoreKey]types.CommitKVStore
listeners map[types.StoreKey][]types.WriteListener
removalMap map[types.StoreKey]bool

initialVersion int64
lastCommitInfo *types.CommitInfo
Expand Down Expand Up @@ -72,8 +73,18 @@ func (rs *Store) Commit() types.CommitID {
}

rs.lastCommitInfo = commitStores(version, rs.stores, nil)

// TODO persist to disk
defer rs.flushCommitInfo(rs.lastCommitInfo)

// remove remnants of removed stores
for sk := range rs.removalMap {
if _, ok := rs.stores[sk]; ok {
delete(rs.stores, sk)
delete(rs.storesParams, sk)
delete(rs.keysByName, sk.Name())
}
}
// reset the removalMap
rs.removalMap = make(map[types.StoreKey]bool)

return types.CommitID{
Version: version,
Expand Down Expand Up @@ -129,7 +140,8 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
// Implements interface MultiStore
// used to createQueryContext, abci_query or grpc query service.
func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) {
panic("rootmulti Store don't support historical query service")
// TO DO, we still need this to query current state
return nil, nil
}

// Implements interface MultiStore
Expand Down Expand Up @@ -229,18 +241,7 @@ func (rs *Store) LoadLatestVersion() error {
// Implements interface CommitMultiStore
// used by node startup with UpgradeStoreLoader
func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) error {
cInfo := &types.CommitInfo{}
bz, err := os.ReadFile(filepath.Join(rs.dir, CommitInfoFileName))
if err != nil {
// if file not exists, assume empty db
if !os.IsNotExist(err) {
return errors.Wrap(err, "fail to read commit info file")
}
} else {
if err := cInfo.Unmarshal(bz); err != nil {
return errors.Wrap(err, "failed unmarshal commit info")
}
}
cInfo := rs.getCommitInfo()

infos := make(map[string]types.StoreInfo)
// convert StoreInfos slice to map
Expand All @@ -252,31 +253,45 @@ func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) erro
newStores := make(map[types.StoreKey]types.CommitKVStore)

storesKeys := make([]types.StoreKey, 0, len(rs.storesParams))

rootStoreVersion := cInfo.Version

for key := range rs.storesParams {
storesKeys = append(storesKeys, key)
}
// deterministic iteration order for upgrades
sort.Slice(storesKeys, func(i, j int) bool {
return storesKeys[i].Name() < storesKeys[j].Name()
})

var (
commitID types.CommitID
)
if upgrades != nil {
// deterministic iteration order for upgrades
// (as the underlying store may change and
// upgrades make store changes where the execution order may matter)
sort.Slice(storesKeys, func(i, j int) bool {
return storesKeys[i].Name() < storesKeys[j].Name()
})
}

for _, key := range storesKeys {
var (
commitID types.CommitID
storeVersion int64
)
storeParams := rs.storesParams[key]

if info, ok := infos[key.Name()]; !ok {
commitID = info.CommitId
storeVersion = commitID.Version
} else {
commitID = types.CommitID{}
}

// If it has been added, set the initial version
if upgrades.IsAdded(key.Name()) || upgrades.RenamedFrom(key.Name()) != "" {
storeParams.initialVersion = uint64(cInfo.Version) + 1
} else if commitID.Version != cInfo.Version && storeParams.typ == types.StoreTypeIAVL {
return fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d", key.Name(), cInfo.Version, commitID.Version)
// check if this type of store maintains versions or not
if isVersionedStore(storeParams.typ) {
// if the store is to be created at this height, set initial version
if isNewlyCreatedStore(upgrades, key) {
storeParams.initialVersion = uint64(cInfo.Version) + 1
// if the store existed at the latest height, check if store version equal to root store version
} else if storeVersion != rootStoreVersion {
return fmt.Errorf("version of store %s mismatch root store's version; expected %d got %d", key.Name(), cInfo.Version, commitID.Version)
}
}

store, err := rs.loadCommitStoreFromParams(key, commitID, storeParams)
Expand All @@ -298,6 +313,19 @@ func (rs *Store) LoadLatestVersionAndUpgrade(upgrades *types.StoreUpgrades) erro
return nil
}

// isNewlyCreatedStore check if the store is to be created at this height
func isNewlyCreatedStore(upgrades *types.StoreUpgrades, key types.StoreKey) bool {
return upgrades.IsAdded(key.Name()) || upgrades.RenamedFrom(key.Name()) != ""
}

// isVersionedStore check if this type of store has version or not
func isVersionedStore(storeType types.StoreType) bool {
if storeType == types.StoreTypeIAVL {
return true
}
return false
}

func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID, params storeParams) (types.CommitKVStore, error) {
switch params.typ {
case types.StoreTypeMulti:
Expand All @@ -306,7 +334,7 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID
dir := filepath.Join(rs.dir, key.Name())
return memiavlstore.LoadStoreWithInitialVersion(dir, rs.logger, int64(params.initialVersion))
case types.StoreTypeDB:
panic("recursive MultiStores not yet supported")
panic("db store not yet supported")
case types.StoreTypeTransient:
_, ok := key.(*types.TransientStoreKey)
if !ok {
Expand All @@ -330,7 +358,10 @@ func (rs *Store) loadCommitStoreFromParams(key types.StoreKey, id types.CommitID
// Implements interface CommitMultiStore
// not used in sdk
func (rs *Store) LoadVersionAndUpgrade(ver int64, upgrades *types.StoreUpgrades) error {
panic("rootmulti store don't support LoadVersionAndUpgrade")
if ver != 0 {
return errors.New("rootmulti store only support loading the latest version")
}
return rs.LoadLatestVersionAndUpgrade(upgrades)
}

// Implements interface CommitMultiStore
Expand Down Expand Up @@ -451,3 +482,28 @@ func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore
StoreInfos: storeInfos,
}
}

func (rs *Store) flushCommitInfo(cInfo *types.CommitInfo) {
file, err := os.OpenFile(filepath.Join(rs.dir, CommitInfoFileName), os.O_WRONLY, 0)
if err != nil {
panic(err)
}
cInfoBz, err := cInfo.Marshal()
_, err = file.Write(cInfoBz)
if err != nil {
panic(err)
}
}

func (rs *Store) getCommitInfo() *types.CommitInfo {
cInfo := &types.CommitInfo{}
cInfoBz, err := os.ReadFile(filepath.Join(rs.dir, CommitInfoFileName))
if err != nil {
panic(err)
} else {
if err := cInfo.Unmarshal(cInfoBz); err != nil {
panic(err)
}
}
return cInfo
}