Skip to content

Commit

Permalink
v0.4.0; follow golint instructions
Browse files Browse the repository at this point in the history
  • Loading branch information
superisaac committed Apr 19, 2024
1 parent a15fcc4 commit fc56bea
Show file tree
Hide file tree
Showing 38 changed files with 580 additions and 563 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
# nodemux
nodemux is the gateway and load balancer of multiple block chain nodes
nodemux is the gateway and load balancer of multiple blockchain nodes.
nodemux helps prevent node failure and release the network pressure of a single node.

## Install
```shell
% make
% make test
```
110 changes: 55 additions & 55 deletions chains/bitcoin.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ type bitcoinBlockchainInfo struct {
BestBlockhash string `json:"bestblockhash"`
}

type bitcoinBlock struct {
Hash string
Height int
Tx []string
}
// type bitcoinBlock struct {
// Hash string
// Height int
// Tx []string
// }

// see bitcoin-cli getnetworkinfo
type bitcoinNetworkInfo struct {
Expand All @@ -36,7 +36,7 @@ func NewBitcoinChain() *BitcoinChain {
return &BitcoinChain{}
}

func (self BitcoinChain) GetClientVersion(ctx context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c BitcoinChain) GetClientVersion(ctx context.Context, ep *nodemuxcore.Endpoint) (string, error) {
reqmsg := jsoff.NewRequestMessage(1, "getnetworkinfo", nil)
var info bitcoinNetworkInfo
err := ep.UnwrapCallRPC(ctx, reqmsg, &info)
Expand All @@ -47,56 +47,56 @@ func (self BitcoinChain) GetClientVersion(ctx context.Context, ep *nodemuxcore.E
return v, nil
}

func (self BitcoinChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c BitcoinChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

// update txid cache from mempool
func (self BitcoinChain) updateMempoolPresenceCache(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) {
redisClient, ok := m.RedisClient(presenceCacheRedisSelector(ep.Chain))
if !ok {
return
}
reqmsg := jsoff.NewRequestMessage(
1, "getrawmempool", nil)

var txids []string
err := ep.UnwrapCallRPC(ctx, reqmsg, &txids)
if err != nil {
ep.Log().Warnf("getrawmempool error, %s", err)
return
}
presenceCacheUpdate(
ctx, redisClient,
ep.Chain,
txids,
ep.Name,
time.Second*600) // expire after 10 mins
}

func (self BitcoinChain) updateBlockPresenceCache(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint, blockHash string) {
c, ok := m.RedisClient(presenceCacheRedisSelector(ep.Chain))
if !ok {
return
}
reqmsg := jsoff.NewRequestMessage(
1, "getblock", []interface{}{blockHash})

var blk bitcoinBlock
err := ep.UnwrapCallRPC(ctx, reqmsg, &blk)
if err != nil {
ep.Log().Warnf("get block error, blockhash %s, %s", blockHash, err)
return
}
presenceCacheUpdate(
ctx, c,
ep.Chain,
blk.Tx,
ep.Name,
time.Second*1800) // expire after 30 mins
}

func (self *BitcoinChain) GetBlockhead(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
// func (c BitcoinChain) updateMempoolPresenceCache(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) {
// redisClient, ok := m.RedisClient(presenceCacheRedisSelector(ep.Chain))
// if !ok {
// return
// }
// reqmsg := jsoff.NewRequestMessage(
// 1, "getrawmempool", nil)

// var txids []string
// err := ep.UnwrapCallRPC(ctx, reqmsg, &txids)
// if err != nil {
// ep.Log().Warnf("getrawmempool error, %s", err)
// return
// }
// presenceCacheUpdate(
// ctx, redisClient,
// ep.Chain,
// txids,
// ep.Name,
// time.Second*600) // expire after 10 mins
// }

// func (c BitcoinChain) updateBlockPresenceCache(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint, blockHash string) {
// client, ok := m.RedisClient(presenceCacheRedisSelector(ep.Chain))
// if !ok {
// return
// }
// reqmsg := jsoff.NewRequestMessage(
// 1, "getblock", []interface{}{blockHash})

// var blk bitcoinBlock
// err := ep.UnwrapCallRPC(ctx, reqmsg, &blk)
// if err != nil {
// ep.Log().Warnf("get block error, blockhash %s, %s", blockHash, err)
// return
// }
// presenceCacheUpdate(
// ctx, client,
// ep.Chain,
// blk.Tx,
// ep.Name,
// time.Second*1800) // expire after 30 mins
// }

func (c *BitcoinChain) GetBlockhead(ctx context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
reqmsg := jsoff.NewRequestMessage(
1, "getblockchaininfo", nil)

Expand All @@ -113,7 +113,7 @@ func (self *BitcoinChain) GetBlockhead(ctx context.Context, m *nodemuxcore.Multi
return block, nil
}

func (self *BitcoinChain) DelegateRPC(ctx context.Context, m *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
func (c *BitcoinChain) DelegateRPC(ctx context.Context, m *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
//useCache := reqmsg.Method == "gettransaction" || reqmsg.Method == "getrawtransaction" || reqmsg.Method == "decoderawtransaction"
useCache, resmsgFromCache := jsonrpcCacheFetchForMethods(
ctx, m, chain, reqmsg,
Expand All @@ -137,7 +137,7 @@ func (self *BitcoinChain) DelegateRPC(ctx context.Context, m *nodemuxcore.Multip
}

if reqmsg.Method == "getblockhash" {
if h, ok := self.findBlockHeight(reqmsg); ok {
if h, ok := c.findBlockHeight(reqmsg); ok {
return m.DefaultRelayRPC(ctx, chain, reqmsg, h)
}
} else if reqmsg.Method == "getchaintips" || reqmsg.Method == "getblockchaininfo" {
Expand All @@ -152,7 +152,7 @@ func (self *BitcoinChain) DelegateRPC(ctx context.Context, m *nodemuxcore.Multip
return retmsg, err
}

func (self *BitcoinChain) findBlockHeight(reqmsg *jsoff.RequestMessage) (int, bool) {
func (c *BitcoinChain) findBlockHeight(reqmsg *jsoff.RequestMessage) (int, bool) {
// the first argument is a integer number
var bh struct {
Height int
Expand Down
8 changes: 4 additions & 4 deletions chains/cardano.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ func NewCardanoChain() *CardanoChain {
return &CardanoChain{}
}

func (self CardanoChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c CardanoChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
return "", nil
}

func (self CardanoChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c CardanoChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *CardanoChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (c *CardanoChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
q := "{blocks(limit:1, order_by:[{number: \"desc\"}]){number hash}}"
var res cardanoTipResult
err := ep.RequestGraphQL(context, q, nil, nil, &res)
Expand All @@ -51,7 +51,7 @@ func (self *CardanoChain) GetBlockhead(context context.Context, b *nodemuxcore.M

}

func (self *CardanoChain) DelegateGraphQL(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
func (c *CardanoChain) DelegateGraphQL(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
// Custom relay methods can be defined here
return b.DefaultPipeGraphQL(rootCtx, chain, path, w, r, -10)
}
8 changes: 4 additions & 4 deletions chains/casper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ func NewCasperChain() *CasperChain {
return &CasperChain{}
}

func (self CasperChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c CasperChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
return "", nil
}

func (self CasperChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c CasperChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *CasperChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (c *CasperChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
reqmsg := jsoff.NewRequestMessage(
1, "chain_get_block", nil)

Expand All @@ -48,7 +48,7 @@ func (self *CasperChain) GetBlockhead(context context.Context, b *nodemuxcore.Mu
return block, nil
}

func (self *CasperChain) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
func (c *CasperChain) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
// Custom relay methods can be defined here
return b.DefaultRelayRPC(rootCtx, chain, reqmsg, -3)
}
8 changes: 4 additions & 4 deletions chains/conflux.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ func NewConfluxChain() *ConfluxChain {
return &ConfluxChain{}
}

func (self ConfluxChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c ConfluxChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
return "", nil
}

func (self ConfluxChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c ConfluxChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *ConfluxChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (c *ConfluxChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
reqmsg := jsoff.NewRequestMessage(
1, "cfx_epochNumber",
[]interface{}{"latest_mined"})
Expand All @@ -38,7 +38,7 @@ func (self *ConfluxChain) GetBlockhead(context context.Context, b *nodemuxcore.M
return block, nil
}

func (self *ConfluxChain) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
func (c *ConfluxChain) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
// Custom relay methods can be defined here
return b.DefaultRelayRPC(rootCtx, chain, reqmsg, -5)
}
22 changes: 11 additions & 11 deletions chains/cosmos.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ type cosmosBlock struct {
height int `json:"-"`
}

func (self *cosmosBlock) Height() int {
if self.height == 0 {
h, err := strconv.Atoi(self.Block.Header.Height)
func (blk *cosmosBlock) Height() int {
if blk.height == 0 {
h, err := strconv.Atoi(blk.Block.Header.Height)
if err != nil {
panic(err)
}
self.height = h
blk.height = h
}
return self.height
return blk.height
}

type cosmosNodeInfo struct {
Expand All @@ -49,8 +49,8 @@ type cosmosNodeInfo struct {
} `json:"application_version"`
}

func (self cosmosNodeInfo) String() string {
av := self.ApplicationVersion
func (info cosmosNodeInfo) String() string {
av := info.ApplicationVersion
return fmt.Sprintf("%s-%s-%s", av.AppName, av.Version, av.CosmosSDKVersion)
}

Expand All @@ -61,7 +61,7 @@ func NewCosmosChain() *CosmosChain {
return &CosmosChain{}
}

func (self CosmosChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c CosmosChain) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
var info cosmosNodeInfo
err := ep.GetJson(context,
"/cosmos/base/tendermint/v1beta1/node_info",
Expand All @@ -72,11 +72,11 @@ func (self CosmosChain) GetClientVersion(context context.Context, ep *nodemuxcor
return info.String(), nil
}

func (self CosmosChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c CosmosChain) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *CosmosChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (c *CosmosChain) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
var res cosmosBlock
err := ep.GetJson(context,
"/cosmos/base/tendermint/v1beta1/blocks/latest",
Expand All @@ -92,7 +92,7 @@ func (self *CosmosChain) GetBlockhead(context context.Context, b *nodemuxcore.Mu
return block, nil
}

func (self *CosmosChain) DelegateREST(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
func (c *CosmosChain) DelegateREST(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
// Custom relay methods can be defined here
return b.DefaultPipeREST(rootCtx, chain, path, w, r, -2)
}
8 changes: 4 additions & 4 deletions chains/eosapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ func NewEOSAPI() *EOSAPI {
return &EOSAPI{}
}

func (self EOSAPI) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (api EOSAPI) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
return "", nil
}

func (self EOSAPI) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (api EOSAPI) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *EOSAPI) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (api *EOSAPI) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
var chainInfo eosapiChainInfo
err := ep.PostJson(context,
"/v1/chain/get_info",
Expand All @@ -49,7 +49,7 @@ func (self *EOSAPI) GetBlockhead(context context.Context, b *nodemuxcore.Multipl
return block, nil
}

func (self *EOSAPI) DelegateREST(rootCtx context.Context, m *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
func (api *EOSAPI) DelegateREST(rootCtx context.Context, m *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, path string, w http.ResponseWriter, r *http.Request) error {
requiredHeight := -200
if path == "/v1/chain/get_block" {
body, err := io.ReadAll(r.Body)
Expand Down
14 changes: 7 additions & 7 deletions chains/eosrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ type eosrpcChainInfo struct {
LastBlockId string `json:"last_irreversible_block_id"`
}

type eosrpcChainGetBlockReq struct {
BlockNumOrId int `json:"block_num_or_id"`
}
// type eosrpcChainGetBlockReq struct {
// BlockNumOrId int `json:"block_num_or_id"`
// }

type EOSRPC struct {
}
Expand All @@ -23,15 +23,15 @@ func NewEOSRPC() *EOSRPC {
return &EOSRPC{}
}

func (self EOSRPC) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
func (c EOSRPC) GetClientVersion(context context.Context, ep *nodemuxcore.Endpoint) (string, error) {
return "", nil
}

func (self EOSRPC) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
func (c EOSRPC) StartSync(context context.Context, m *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (bool, error) {
return true, nil
}

func (self *EOSRPC) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
func (c *EOSRPC) GetBlockhead(context context.Context, b *nodemuxcore.Multiplexer, ep *nodemuxcore.Endpoint) (*nodemuxcore.Block, error) {
reqmsg := jsoff.NewRequestMessage(
1, "get_info", nil)

Expand All @@ -48,7 +48,7 @@ func (self *EOSRPC) GetBlockhead(context context.Context, b *nodemuxcore.Multipl
return block, nil
}

func (self *EOSRPC) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
func (c *EOSRPC) DelegateRPC(rootCtx context.Context, b *nodemuxcore.Multiplexer, chain nodemuxcore.ChainRef, reqmsg *jsoff.RequestMessage, r *http.Request) (jsoff.Message, error) {
// Custom relay methods can be defined here
return b.DefaultRelayRPC(rootCtx, chain, reqmsg, -300)
}
Loading

0 comments on commit fc56bea

Please sign in to comment.