Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version store #9

Draft
wants to merge 31 commits into
base: version-store
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"

"github.com/crypto-org-chain/cronos/x/cronos"
"github.com/crypto-org-chain/cronos/x/cronos/middleware"
Expand All @@ -22,12 +23,14 @@ import (
dbm "github.com/tendermint/tm-db"

"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
"github.com/cosmos/cosmos-sdk/codec"
"github.com/cosmos/cosmos-sdk/server/api"
"github.com/cosmos/cosmos-sdk/server/config"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -122,7 +125,8 @@ import (
gravitytypes "github.com/peggyjv/gravity-bridge/module/v2/x/gravity/types"

// this line is used by starport scaffolding # stargate/app/moduleImport

cronosappclient "github.com/crypto-org-chain/cronos/client"
cronosfile "github.com/crypto-org-chain/cronos/client/file"
"github.com/crypto-org-chain/cronos/versiondb"
"github.com/crypto-org-chain/cronos/versiondb/tmdb"
cronosclient "github.com/crypto-org-chain/cronos/x/cronos/client"
Expand All @@ -148,6 +152,8 @@ const (
//
// NOTE: In the SDK, the default value is 255.
AddrLen = 20

FileStreamerDirectory = "file_streamer"
)

// this line is used by starport scaffolding # stargate/wasm/app/enabledProposals
Expand Down Expand Up @@ -353,12 +359,33 @@ func New(
os.Exit(1)
}

// configure state listening capabilities using AppOptions
// we are doing nothing with the returned streamingServices and waitGroup in this case
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
for _, streamerName := range streamers {
if streamerName == "file" {
streamingDir := filepath.Join(cast.ToString(appOpts.Get(flags.FlagHome)), "data", FileStreamerDirectory)
if err := os.MkdirAll(streamingDir, os.ModePerm); err != nil {
panic(err)
}

// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
Comment on lines +372 to +374

Check failure

Code scanning / gosec

the value in the range statement should be _ unless copying a map: want: for key := range m

the value in the range statement should be _ unless copying a map: want: for key := range m
service, err := file.NewStreamingService(streamingDir, "", exposeStoreKeys, appCodec, false, true, true)
if err != nil {
panic(err)
}
bApp.SetStreamingService(service)

wg := new(sync.WaitGroup)
if err := service.Stream(wg); err != nil {
panic(err)
}
}
if streamerName == "versiondb" {
dataDir := filepath.Join(homePath, "data", "versiondb")
rootDir := cast.ToString(appOpts.Get(flags.FlagHome))
dataDir := filepath.Join(rootDir, "data", "versiondb")
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
panic(err)
}
Expand All @@ -376,7 +403,15 @@ func New(
panic(err)
}
versionDB := tmdb.NewStore(plainDB, historyDB, changesetDB)

isGrpcOnly := cast.ToBool(appOpts.Get(cronosappclient.FlagIsGrpcOnly))
remoteGrpcUrl := cast.ToString(appOpts.Get(cronosappclient.FlagRemoteGrpcUrl))
isLocal := cast.ToBool(appOpts.Get(cronosappclient.FlagIsLocal))
remoteUrl := cast.ToString(appOpts.Get(cronosappclient.FlagRemoteUrl))
remoteWsUrl := cast.ToString(appOpts.Get(cronosappclient.FlagRemoteWsUrl))
concurrency := cast.ToInt(appOpts.Get(cronosappclient.FlagConcurrency))
if isGrpcOnly {
cronosfile.Sync(versionDB, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir, isLocal, concurrency)
}
// default to exposing all
exposeStoreKeys := make([]storetypes.StoreKey, 0, len(keys))
for _, storeKey := range keys {
Expand All @@ -388,7 +423,6 @@ func New(
qms.MountTransientStores(tkeys)
qms.MountMemoryStores(memKeys)
bApp.SetQueryMultiStore(qms)
break
}
}

Expand Down
30 changes: 30 additions & 0 deletions client/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package client

import (
"fmt"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/gogo/protobuf/proto"
)

func DecodeData(data []byte) (pairs []types.StoreKVPair, err error) {
const prefixLen = 8
offset := prefixLen
dataSize := sdk.BigEndianToUint64(data[:offset])
size := len(data)
if int(dataSize)+prefixLen != size {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return nil, fmt.Errorf("incomplete file: %v vs %v", dataSize, size)
}
for offset < size {
size, n := proto.DecodeVarint(data[offset:])
offset += n
pair := new(types.StoreKVPair)
if err := proto.Unmarshal(data[offset:offset+int(size)], pair); err != nil {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return nil, err
}
pairs = append(pairs, *pair)
offset += int(size)

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
}
return
}
210 changes: 210 additions & 0 deletions client/file/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package file

import (
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path/filepath"
"time"

"github.com/crypto-org-chain/cronos/client"
"github.com/crypto-org-chain/cronos/versiondb/tmdb"
tmjson "github.com/tendermint/tendermint/libs/json"
tmtypes "github.com/tendermint/tendermint/types"
)

// Simplify block height for header
type Header struct {
Height int64 `json:"height,omitempty"`
}

type Block struct {
Header Header `json:"header"`
}

type GetLatestBlockResponse struct {
Block *Block `json:"block,omitempty"`
}

func Sync(versionDB *tmdb.Store, remoteGrpcUrl, remoteUrl, remoteWsUrl, rootDir string, isLocal bool, concurrency int) {

const defaultMaxRetry = 50
latestVersion, err := versionDB.GetLatestVersion()
fmt.Printf("mm-latestVersion: %+v\n", latestVersion)
if err != nil {
panic(err)
}
startBlockNum := latestVersion
if startBlockNum < 0 {
startBlockNum = 0
}
nextBlockNum := int(startBlockNum) + 1
maxBlockNum := -1
for i := 0; i < defaultMaxRetry; i++ {
if i > 0 {
time.Sleep(time.Second)
}
resp, err := http.Get(fmt.Sprintf("%s/cosmos/base/tendermint/v1beta1/blocks/latest", remoteGrpcUrl))
if err != nil {
fmt.Printf("error making http request: %s\n", err)
continue
}

var bz []byte
if result := func() bool {
defer resp.Body.Close()
bz, err = ioutil.ReadAll(resp.Body)
if err != nil {
return false
}
if resp.StatusCode != http.StatusOK {
return false
}
return true
}(); !result {
continue
}

res := new(GetLatestBlockResponse)
err = tmjson.Unmarshal(bz, res)
if err != nil {
fmt.Printf("mm-read-res-err: %+v\n", err)
continue
}
maxBlockNum = int(res.Block.Header.Height)
fmt.Printf("mm-maxBlockNum: %d\n", maxBlockNum)
break
}
if maxBlockNum < 0 {
panic(fmt.Sprintf("max retries %d reached", defaultMaxRetry))
}

interval := time.Second
directory := filepath.Join(rootDir, "data", "file_streamer")
// streamer write the file blk by blk with concurrency 1
streamer := NewBlockFileWatcher(1, maxBlockNum, func(blockNum int) string {
return GetLocalDataFileName(directory, blockNum)
}, nil, true)
streamer.Start(nextBlockNum, interval)
go func() {
chData, chErr := streamer.SubscribeData(), streamer.SubscribeError()
for {
select {
case data := <-chData:
pairs, err := client.DecodeData(data.Data)
fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err)
if err != nil {
fmt.Println("invalid decode")
} else if err = versionDB.PutAtVersion(int64(data.BlockNum), pairs); err != nil {
fmt.Println("mm-put-at-version-panic")
panic(err)
}
data.ChResult <- err
case err := <-chErr:
// fail read
fmt.Println("mm-fail-read-panic")
panic(err)
}
}
}()

synchronizer := NewBlockFileWatcher(
concurrency,
maxBlockNum,
func(blockNum int) string {
return fmt.Sprintf("%s/%s", remoteUrl, DataFileName(blockNum))
},
func(blockNum int) bool {
path := GetLocalDataFileName(directory, blockNum)
f, err := os.Open(path)

Check failure

Code scanning / gosec

Potential file inclusion via variable

Potential file inclusion via variable
if err == nil {
defer func() {
_ = f.Close()

Check warning

Code scanning / gosec

Returned error is not propagated up the stack.

Returned error is not propagated up the stack.
}()
// valid 1st 8 bytes for downloaded file
var bytes [8]byte
if _, err = io.ReadFull(f, bytes[:]); err == nil {
size := binary.BigEndian.Uint64(bytes[:])
if info, err := f.Stat(); err == nil && size+uint64(8) == uint64(info.Size()) {

Check failure

Code scanning / gosec

Potential integer overflow by integer type conversion

Potential integer overflow by integer type conversion
return false
}
}
}
return true
},
isLocal,
)
synchronizer.Start(nextBlockNum, interval)
go func() {
// max retry for temporary io error
maxRetry := concurrency * 2
retry := 0
chData, chErr := synchronizer.SubscribeData(), synchronizer.SubscribeError()
for {
select {
case data := <-chData:
file := GetLocalDataFileName(directory, data.BlockNum)
fmt.Printf("mm-data.BlockNum: %+v\n", data.BlockNum)
if err := os.WriteFile(file, data.Data, 0600); err != nil {
fmt.Println("mm-WriteFile-panic")
panic(err)
} else {
retry = 0
fmt.Println("mm-reset-retry")
if data.BlockNum > maxBlockNum {
streamer.SetMaxBlockNum(data.BlockNum)
}
}
data.ChResult <- err
case err := <-chErr:
retry++
fmt.Println("mm-retry", retry)
if retry == maxRetry {
// data corrupt
fmt.Println("mm-data-corrupt-panic")
panic(err)
}
}
}
}()

go func() {
for i := 0; i < defaultMaxRetry; i++ {
if i > 0 {
time.Sleep(time.Second)
}
wsClient := client.NewWebsocketClient(remoteWsUrl)
chResult, err := wsClient.Subscribe()
if err != nil {
fmt.Printf("mm-subscribed[%+v]: %+v\n", i, err)
continue
}
fmt.Println("subscribing")
err = wsClient.Send("subscribe", []string{
"tm.event='NewBlockHeader'",
})
if err != nil {
fmt.Printf("mm-subscribed: %+v\n", err)
continue
}
i = 0
fmt.Println("subscribed ws")
for res := range chResult {
if res == nil || res.Data == nil {
continue
}
data, ok := res.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
continue
}
blockNum := int(data.Header.Height)
fmt.Printf("mm-set-max-blk: %+v\n", blockNum)
synchronizer.SetMaxBlockNum(blockNum)
}
}
panic(fmt.Sprintf("max retries %d reached", defaultMaxRetry))
}()
}
Loading