Skip to content

Commit

Permalink
chain ws to synchronizer and streamer
Browse files Browse the repository at this point in the history
subscribe latest blk via tm, sync historical via http
  • Loading branch information
mmsqe committed Oct 25, 2022
1 parent df14f59 commit 966dd7f
Show file tree
Hide file tree
Showing 5 changed files with 196 additions and 30 deletions.
91 changes: 70 additions & 21 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/crypto-org-chain/cronos/x/cronos/middleware"

"github.com/cosmos/cosmos-sdk/client"

"github.com/cosmos/cosmos-sdk/codec/types"
"github.com/cosmos/cosmos-sdk/server"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -134,6 +135,7 @@ import (
cronoskeeper "github.com/crypto-org-chain/cronos/x/cronos/keeper"
evmhandlers "github.com/crypto-org-chain/cronos/x/cronos/keeper/evmhandlers"
cronostypes "github.com/crypto-org-chain/cronos/x/cronos/types"
tmtypes "github.com/tendermint/tendermint/types"

// unnamed import of statik for swagger UI support
_ "github.com/crypto-org-chain/cronos/client/docs/statik"
Expand Down Expand Up @@ -413,12 +415,38 @@ func New(
startBlockNum = 0
}
nextBlockNum := int(startBlockNum) + 1
// TODO: maxBlockNum init from primary node
maxBlockNum := 0
interval := time.Second

directory := filepath.Join(rootDir, "data", "file_streamer")
// streamer write the file blk by blk with concurrency 1
streamer := cronosfile.NewBlockFileWatcher(1, maxBlockNum, func(blockNum int) string {
return cronosfile.GetLocalDataFileName(directory, blockNum)
}, true)
streamer.Start(nextBlockNum, interval)
go func() {
chData, chErr := streamer.SubscribeData(), streamer.SubscribeError()
for {
select {
case data := <-chData:
pairs, err := cronosfile.DecodeData(data.Data)
fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err)
if err == nil {
versionDB.PutAtVersion(int64(data.BlockNum), pairs)
}
case err := <-chErr:
// fail read
fmt.Println("mm-fail-read-panic")
panic(err)
}
}
}()

isLocal := cast.ToBool(appOpts.Get(cronosappclient.FlagIsLocal))
remoteUrl := cast.ToString(appOpts.Get(cronosappclient.FlagRemoteUrl))
concurrency := cast.ToInt(appOpts.Get(cronosappclient.FlagConcurrency))
interval := time.Second
synchronizer := cronosfile.NewBlockFileWatcher(concurrency, func(blockNum int) string {
synchronizer := cronosfile.NewBlockFileWatcher(concurrency, maxBlockNum, func(blockNum int) string {
return fmt.Sprintf("%s/%s", remoteUrl, cronosfile.DataFileName(blockNum))
}, isLocal)
synchronizer.Start(nextBlockNum, interval)
Expand All @@ -431,41 +459,62 @@ func New(
select {
case data := <-chData:
file := cronosfile.GetLocalDataFileName(directory, data.BlockNum)
fmt.Printf("mm-file: %+v\n", file)
fmt.Printf("mm-data.BlockNum: %+v\n", data.BlockNum)
if err := os.WriteFile(file, data.Data, 0644); err != nil {
fmt.Println("mm-WriteFile-panic")
panic(err)
}
retry = 0
// fmt.Printf("mm-pairs: %+v\n", len(pairs))
// versionDB.PutAtVersion(int64(data.BlockNum), pairs)
fmt.Println("mm-reset-retry")
if data.BlockNum > maxBlockNum {
streamer.SetMaxBlockNum(data.BlockNum)
}
case err := <-chErr:
retry++
fmt.Println("mm-retry", retry)
if retry == maxRetry {
// data corrupt
fmt.Println("mm-data-corrupt-panic")
panic(err)
}
}
}
}()

// streamer write the file blk by blk with concurrency 1
streamer := cronosfile.NewBlockFileWatcher(1, func(blockNum int) string {
return cronosfile.GetLocalDataFileName(directory, blockNum)
}, true)
streamer.Start(nextBlockNum, interval)
go func() {
chData, chErr := streamer.SubscribeData(), streamer.SubscribeError()
for {
select {
case data := <-chData:
pairs, err := cronosfile.DecodeData(data.Data)
fmt.Printf("mm-pairs: %+v, %+v\n", len(pairs), err)
if err == nil {
versionDB.PutAtVersion(int64(data.BlockNum), pairs)
maxRetry := 50
for i := 0; i < maxRetry; i++ {
if i > 0 {
time.Sleep(time.Second)
}
// TODO: config remote tm
wsClient := cronosappclient.NewWebsocketClient("ws://localhost:26767/websocket")
chResult, err := wsClient.Subscribe()
if err != nil {
fmt.Printf("mm-subscribed[%+v]: %+v\n", i, err)
continue
}
fmt.Println("subscribing")
err = wsClient.Send("subscribe", []string{
"tm.event='NewBlockHeader'",
})
if err != nil {
fmt.Printf("mm-subscribed: %+v\n", err)
continue
}
i = 0
fmt.Println("subscribed ws")
for res := range chResult {
if res == nil {
continue
}
case err := <-chErr:
// fail read
panic(err)
data, ok := res.Data.(tmtypes.EventDataNewBlockHeader)
if !ok {
continue
}
blockNum := int(data.Header.Height)
fmt.Printf("mm-set-max-blk: %+v\n", blockNum)
synchronizer.SetMaxBlockNum(blockNum)
}
}
}()
Expand Down
96 changes: 96 additions & 0 deletions client/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package client

import (
"bytes"
"context"
"fmt"
"time"

tmjson "github.com/tendermint/tendermint/libs/json"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
types "github.com/tendermint/tendermint/rpc/jsonrpc/types"
"nhooyr.io/websocket"
"nhooyr.io/websocket/wsjson"
)

type WebsocketClient struct {
url string
wsconn *websocket.Conn
}

func NewWebsocketClient(url string) *WebsocketClient {
return &WebsocketClient{url: url}
}

func (c *WebsocketClient) Subscribe() (<-chan *coretypes.ResultEvent, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

conn, _, err := websocket.Dial(ctx, c.url, nil)
if err != nil {
return nil, err
}

c.wsconn = conn
conn.SetReadLimit(10240000)

chResult := make(chan *coretypes.ResultEvent)
go func() {
defer close(chResult)
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
_, r, err := c.wsconn.Reader(ctx)
if err != nil {
cancel()
continue
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(r)
if err != nil {
continue
}
cancel()
bz := buf.Bytes()
res := new(types.RPCResponse)
err = tmjson.Unmarshal(bz, res)
if err != nil {
// fmt.Printf("mm-unmarshal-err: %+v\n", err)
// continue
fmt.Printf("mm-read-res-err: %+v\n", err)
chResult <- nil
break
}
ev := new(coretypes.ResultEvent)
if err := tmjson.Unmarshal(res.Result, &ev); err != nil {
fmt.Printf("mm-read-ev-err: %+v\n", err)
chResult <- nil
break
}
chResult <- ev
time.Sleep(time.Second)
}
}()
return chResult, nil
}

func (c *WebsocketClient) Send(
method string,
params []string,
) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

return wsjson.Write(ctx, c.wsconn, map[string]interface{}{
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1,
})
}

func (c *WebsocketClient) Close() {
if c.wsconn != nil {
c.wsconn.Close(websocket.StatusNormalClosure, "")
c.wsconn = nil
}
}
26 changes: 22 additions & 4 deletions file/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"os"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ type BlockData struct {

type BlockFileWatcher struct {
concurrency int
maxBlockNum int64
getFilePath func(blockNum int) string
downloader fileDownloader
chData chan *BlockData
Expand All @@ -70,11 +72,13 @@ type BlockFileWatcher struct {

func NewBlockFileWatcher(
concurrency int,
maxBlockNum int,
getFilePath func(blockNum int) string,
isLocal bool,
) *BlockFileWatcher {
w := &BlockFileWatcher{
concurrency: concurrency,
maxBlockNum: int64(maxBlockNum),
getFilePath: getFilePath,
chData: make(chan *BlockData),
chError: make(chan error),
Expand Down Expand Up @@ -104,11 +108,16 @@ func (w *BlockFileWatcher) SubscribeError() <-chan error {
return w.chError
}

func (w *BlockFileWatcher) SetMaxBlockNum(num int) {
atomic.StoreInt64(&w.maxBlockNum, int64(num))
}

func (w *BlockFileWatcher) fetch(blockNum int) error {
path := w.getFilePath(blockNum)
f, err := os.Open(path)
if err == nil {
defer f.Close()
// valid 1st 8 bytes for downloaded file
var bytes [8]byte
if _, err = io.ReadFull(f, bytes[:]); err == nil {
size := binary.BigEndian.Uint64(bytes[:])
Expand All @@ -119,7 +128,7 @@ func (w *BlockFileWatcher) fetch(blockNum int) error {
f.Close()
}

// TBC: skip if exist path to avoid dup download
// download if file not exist
data, err := w.downloader.GetData(path)
fmt.Printf("mm-fetch: %+v, %+v, %+v\n", blockNum, len(data), err)
if err != nil {
Expand Down Expand Up @@ -158,8 +167,18 @@ func (w *BlockFileWatcher) Start(

default:
wg := new(sync.WaitGroup)
resultErrs := make([]error, w.concurrency)
for i := 0; i < w.concurrency; i++ {
currentBlockNum := blockNum
maxBlockNum := int(atomic.LoadInt64(&w.maxBlockNum))
concurrency := w.concurrency
if diff := maxBlockNum - currentBlockNum; diff < concurrency {
if diff <= 0 {
time.Sleep(interval)
break
}
concurrency = diff
}
resultErrs := make([]error, concurrency)
for i := 0; i < concurrency; i++ {
nextBlockNum := blockNum + i
fmt.Println("mm-start: ", nextBlockNum)
if !finishedBlockNums[nextBlockNum] {
Expand All @@ -173,7 +192,6 @@ func (w *BlockFileWatcher) Start(
}
wg.Wait()
errReached := false
currentBlockNum := blockNum
for i, err := range resultErrs {
b := currentBlockNum + i
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
google.golang.org/grpc v1.50.1
google.golang.org/protobuf v1.28.1
gopkg.in/yaml.v2 v2.4.0
nhooyr.io/websocket v1.8.6
)

require (
Expand Down Expand Up @@ -191,7 +192,6 @@ require (
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
nhooyr.io/websocket v1.8.6 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

Expand Down
11 changes: 7 additions & 4 deletions integration_tests/test_query_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def network(tmp_path_factory):
procs.append(exec(base / "replica.jsonnet", path1, base_port1))
try:
wait_for_port(ports.evmrpc_port(base_port0))
wait_for_port(ports.evmrpc_ws_port(base_port0))
wait_for_port(ports.grpc_port(base_port1))
yield Network(Cronos(path0 / chain_id), Cronos(path1 / chain_id))
finally:
Expand All @@ -87,10 +88,12 @@ def grpc_call(p, address):
url = f"http://127.0.0.1:{p}/cosmos/bank/v1beta1/balances/{address}"
response = requests.get(url)
if not response.ok:
raise Exception(
f"response code: {response.status_code}, "
f"{response.reason}, {response.json()}"
)
# retry until file get synced
return -1
# raise Exception(
# f"response code: {response.status_code}, "
# f"{response.reason}, {response.json()}"
# )
result = response.json()
if result.get("code"):
raise Exception(result["raw_log"])
Expand Down

0 comments on commit 966dd7f

Please sign in to comment.