Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade package node #726

Merged
merged 6 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/XDC/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ var (
utils.RPCGlobalGasCapFlag,
utils.RPCListenAddrFlag,
utils.RPCPortFlag,
utils.RPCHttpReadTimeoutFlag,
utils.RPCHttpWriteTimeoutFlag,
utils.RPCHttpIdleTimeoutFlag,
utils.RPCApiFlag,
utils.WSEnabledFlag,
utils.WSListenAddrFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/XDC/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ var AppHelpFlagGroups = []flagGroup{
utils.RPCGlobalGasCapFlag,
utils.RPCListenAddrFlag,
utils.RPCPortFlag,
utils.RPCHttpReadTimeoutFlag,
utils.RPCHttpWriteTimeoutFlag,
utils.RPCHttpIdleTimeoutFlag,
utils.RPCApiFlag,
utils.WSEnabledFlag,
utils.WSListenAddrFlag,
Expand Down
22 changes: 19 additions & 3 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,20 @@ var (
Usage: "HTTP-RPC server listening port",
Value: node.DefaultHTTPPort,
}
RPCHttpReadTimeoutFlag = cli.DurationFlag{
Name: "rpcreadtimeout",
Usage: "HTTP-RPC server read timeout",
Value: rpc.DefaultHTTPTimeouts.ReadTimeout,
}
RPCHttpWriteTimeoutFlag = cli.DurationFlag{
Name: "rpcwritetimeout",
Usage: "HTTP-RPC server write timeout (default = 10s)",
Value: node.DefaultHTTPWriteTimeOut,
Usage: "HTTP-RPC server write timeout",
Value: rpc.DefaultHTTPTimeouts.WriteTimeout,
}
RPCHttpIdleTimeoutFlag = cli.DurationFlag{
Name: "rpcidletimeout",
Usage: "HTTP-RPC server idle timeout",
Value: rpc.DefaultHTTPTimeouts.IdleTimeout,
}
RPCCORSDomainFlag = cli.StringFlag{
Name: "rpccorsdomain",
Expand Down Expand Up @@ -779,8 +789,14 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(RPCPortFlag.Name) {
cfg.HTTPPort = ctx.GlobalInt(RPCPortFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpReadTimeoutFlag.Name) {
cfg.HTTPTimeouts.ReadTimeout = ctx.GlobalDuration(RPCHttpReadTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpWriteTimeoutFlag.Name) {
cfg.HTTPWriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name)
cfg.HTTPTimeouts.WriteTimeout = ctx.GlobalDuration(RPCHttpWriteTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCHttpIdleTimeoutFlag.Name) {
cfg.HTTPTimeouts.IdleTimeout = ctx.GlobalDuration(RPCHttpIdleTimeoutFlag.Name)
}
if ctx.GlobalIsSet(RPCCORSDomainFlag.Name) {
cfg.HTTPCors = splitAndTrim(ctx.GlobalString(RPCCORSDomainFlag.Name))
Expand Down
2 changes: 1 addition & 1 deletion node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis
}
}

if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts); err != nil {
if err := api.node.startHTTP(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, allowedOrigins, allowedVHosts, api.node.config.HTTPTimeouts); err != nil {
return false, err
}
return true, nil
Expand Down
9 changes: 5 additions & 4 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"path/filepath"
"runtime"
"strings"
"time"

"github.com/XinFinOrg/XDPoSChain/accounts"
"github.com/XinFinOrg/XDPoSChain/accounts/keystore"
Expand All @@ -33,6 +32,7 @@ import (
"github.com/XinFinOrg/XDPoSChain/log"
"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/discover"
"github.com/XinFinOrg/XDPoSChain/rpc"
)

const (
Expand Down Expand Up @@ -100,9 +100,6 @@ type Config struct {
// for ephemeral nodes).
HTTPPort int `toml:",omitempty"`

// HTTPWriteTimeout is the write timeout for the HTTP RPC server.
HTTPWriteTimeout time.Duration `toml:",omitempty"`

// HTTPCors is the Cross-Origin Resource Sharing header to send to requesting
// clients. Please be aware that CORS is a browser enforced security, it's fully
// useless for custom HTTP clients.
Expand All @@ -122,6 +119,10 @@ type Config struct {
// exposed.
HTTPModules []string `toml:",omitempty"`

// HTTPTimeouts allows for customization of the timeout values used by the HTTP RPC
// interface.
HTTPTimeouts rpc.HTTPTimeouts

// WSHost is the host interface on which to start the websocket RPC server. If
// this field is empty, no websocket API endpoint will be started.
WSHost string `toml:",omitempty"`
Expand Down
13 changes: 6 additions & 7 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,26 @@ import (
"os/user"
"path/filepath"
"runtime"
"time"

"github.com/XinFinOrg/XDPoSChain/p2p"
"github.com/XinFinOrg/XDPoSChain/p2p/nat"
"github.com/XinFinOrg/XDPoSChain/rpc"
)

const (
DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server
DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server
DefaultHTTPWriteTimeOut = 10 * time.Second // Default write timeout for the HTTP RPC server
DefaultWSHost = "localhost" // Default host interface for the websocket RPC server
DefaultWSPort = 8546 // Default TCP port for the websocket RPC server
DefaultHTTPHost = "localhost" // Default host interface for the HTTP RPC server
DefaultHTTPPort = 8545 // Default TCP port for the HTTP RPC server
DefaultWSHost = "localhost" // Default host interface for the websocket RPC server
DefaultWSPort = 8546 // Default TCP port for the websocket RPC server
)

// DefaultConfig contains reasonable default settings.
var DefaultConfig = Config{
DataDir: DefaultDataDir(),
HTTPPort: DefaultHTTPPort,
HTTPWriteTimeout: DefaultHTTPWriteTimeOut,
HTTPModules: []string{"net", "web3"},
HTTPVirtualHosts: []string{"localhost"},
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{
Expand Down
128 changes: 33 additions & 95 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (n *Node) Register(constructor ServiceConstructor) error {
return nil
}

// Start create a live P2P node and starts running it.
// Start creates a live P2P node and starts running it.
func (n *Node) Start() error {
n.lock.Lock()
defer n.lock.Unlock()
Expand Down Expand Up @@ -203,7 +203,7 @@ func (n *Node) Start() error {
return convertFileLockError(err)
}
// Start each of the services
started := []reflect.Type{}
var started []reflect.Type
for kind, service := range services {
// Start the next service, stopping all previous upon failure
if err := service.Start(running); err != nil {
Expand All @@ -217,7 +217,7 @@ func (n *Node) Start() error {
// Mark the service started for potential cleanup
started = append(started, kind)
}
// Lastly start the configured RPC interfaces
// Lastly, start the configured RPC interfaces
if err := n.startRPC(services); err != nil {
for _, service := range services {
service.Stop()
Expand Down Expand Up @@ -252,7 +252,7 @@ func (n *Node) openDataDir() error {
return nil
}

// startRPC is a helper method to start all the various RPC endpoint during node
// startRPC is a helper method to start all the various RPC endpoints during node
// startup. It's not meant to be called at any time afterwards as it makes certain
// assumptions about the state of the node.
func (n *Node) startRPC(services map[reflect.Type]Service) error {
Expand All @@ -269,7 +269,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
n.stopInProc()
return err
}
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts); err != nil {
if err := n.startHTTP(n.httpEndpoint, apis, n.config.HTTPModules, n.config.HTTPCors, n.config.HTTPVirtualHosts, n.config.HTTPTimeouts); err != nil {
n.stopIPC()
n.stopInProc()
return err
Expand All @@ -293,7 +293,7 @@ func (n *Node) startInProc(apis []rpc.API) error {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("InProc registered", "service", api.Service, "namespace", api.Namespace)
n.log.Debug("InProc registered", "namespace", api.Namespace)
}
n.inprocHandler = handler
return nil
Expand All @@ -320,51 +320,16 @@ func (n *Node) RegisterAPIs(apis []rpc.API) {

// startIPC initializes and starts the IPC RPC endpoint.
func (n *Node) startIPC(apis []rpc.API) error {
// Short circuit if the IPC endpoint isn't being exposed
if n.ipcEndpoint == "" {
return nil
return nil // IPC disabled.
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("IPC registered", "service", api.Service, "namespace", api.Namespace)
}
// All APIs registered, start the IPC listener
var (
listener net.Listener
err error
)
if listener, err = rpc.CreateIPCListener(n.ipcEndpoint); err != nil {
listener, handler, err := rpc.StartIPCEndpoint(n.ipcEndpoint, apis)
if err != nil {
return err
}
go func() {
n.log.Info("IPC endpoint opened", "url", n.ipcEndpoint)

for {
conn, err := listener.Accept()
if err != nil {
// Terminate if the listener was closed
n.lock.RLock()
closed := n.ipcListener == nil
n.lock.RUnlock()
if closed {
return
}
// Not closed, just some error; report and continue
n.log.Error("IPC accept failed", "err", err)
continue
}
log.Trace("Accepted RPC connection", "conn", conn.RemoteAddr())
go handler.ServeCodec(rpc.NewCodec(conn), 0)
}
}()
// All listeners booted successfully
n.ipcListener = listener
n.ipcHandler = handler

log.Info("IPC endpoint opened", "url", n.ipcEndpoint)
return nil
}

Expand All @@ -374,7 +339,7 @@ func (n *Node) stopIPC() {
n.ipcListener.Close()
n.ipcListener = nil

n.log.Info("IPC endpoint closed", "endpoint", n.ipcEndpoint)
n.log.Info("IPC endpoint closed", "url", n.ipcEndpoint)
}
if n.ipcHandler != nil {
n.ipcHandler.Stop()
Expand All @@ -383,36 +348,18 @@ func (n *Node) stopIPC() {
}

// startHTTP initializes and starts the HTTP RPC endpoint.
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string) error {
func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors []string, vhosts []string, timeouts rpc.HTTPTimeouts) error {
// Short circuit if the HTTP endpoint isn't being exposed
if endpoint == "" {
return nil
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("HTTP registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
listener, handler, err := rpc.StartHTTPEndpoint(endpoint, apis, modules, cors, vhosts, timeouts)
if err != nil {
return err
}
go rpc.NewHTTPServer(cors, vhosts, handler, n.config.HTTPWriteTimeout).Serve(listener)
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%s", endpoint), "cors", strings.Join(cors, ","), "vhosts", strings.Join(vhosts, ","))
n.log.Info("HTTP endpoint opened", "url", fmt.Sprintf("http://%v/", listener.Addr()),
"cors", strings.Join(cors, ","),
"vhosts", strings.Join(vhosts, ","))
// All listeners booted successfully
n.httpEndpoint = endpoint
n.httpListener = listener
Expand All @@ -424,10 +371,10 @@ func (n *Node) startHTTP(endpoint string, apis []rpc.API, modules []string, cors
// stopHTTP terminates the HTTP RPC endpoint.
func (n *Node) stopHTTP() {
if n.httpListener != nil {
url := fmt.Sprintf("http://%v/", n.httpListener.Addr())
n.httpListener.Close()
n.httpListener = nil

n.log.Info("HTTP endpoint closed", "url", fmt.Sprintf("http://%s", n.httpEndpoint))
n.log.Info("HTTP endpoint closed", "url", url)
}
if n.httpHandler != nil {
n.httpHandler.Stop()
Expand All @@ -441,32 +388,11 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
if endpoint == "" {
return nil
}
// Generate the whitelist based on the allowed modules
whitelist := make(map[string]bool)
for _, module := range modules {
whitelist[module] = true
}
// Register all the APIs exposed by the services
handler := rpc.NewServer()
for _, api := range apis {
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
return err
}
n.log.Debug("WebSocket registered", "service", api.Service, "namespace", api.Namespace)
}
}
// All APIs registered, start the HTTP listener
var (
listener net.Listener
err error
)
if listener, err = net.Listen("tcp", endpoint); err != nil {
listener, handler, err := rpc.StartWSEndpoint(endpoint, apis, modules, wsOrigins, exposeAll)
if err != nil {
return err
}
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
n.log.Info("WebSocket endpoint opened", "url", fmt.Sprintf("ws://%s", listener.Addr()))

// All listeners booted successfully
n.wsEndpoint = endpoint
n.wsListener = listener
Expand Down Expand Up @@ -645,11 +571,23 @@ func (n *Node) IPCEndpoint() string {

// HTTPEndpoint retrieves the current HTTP endpoint used by the protocol stack.
func (n *Node) HTTPEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()

if n.httpListener != nil {
return n.httpListener.Addr().String()
}
return n.httpEndpoint
}

// WSEndpoint retrieves the current WS endpoint used by the protocol stack.
func (n *Node) WSEndpoint() string {
n.lock.Lock()
defer n.lock.Unlock()

if n.wsListener != nil {
return n.wsListener.Addr().String()
}
return n.wsEndpoint
}

Expand Down
Loading