Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(gateway): switch to separate Listen/Serve #1314

Merged
merged 7 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions api/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ type Server struct {
srv *http.Server
srvMux *mux.Router // http request multiplexer
listener net.Listener
started atomic.Bool

started atomic.Bool
}

// NewServer returns a new gateway Server.
func NewServer(address string, port string) *Server {
func NewServer(address, port string) *Server {
srvMux := mux.NewRouter()
srvMux.Use(setContentType)

Expand All @@ -42,13 +43,12 @@ func (s *Server) Start(context.Context) error {
log.Warn("cannot start server: already started")
return nil
}

listener, err := net.Listen("tcp", s.srv.Addr)
if err != nil {
return err
}
s.listener = listener
log.Infow("server started", "listening on", listener.Addr().String())
log.Infow("server started", "listening on", s.srv.Addr)
//nolint:errcheck
go s.srv.Serve(listener)
return nil
Expand All @@ -65,6 +65,7 @@ func (s *Server) Stop(ctx context.Context) error {
if err != nil {
return err
}
s.listener = nil
log.Info("server stopped")
return nil
}
Expand All @@ -88,5 +89,8 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {

// ListenAddr returns the listen address of the server.
func (s *Server) ListenAddr() string {
if s.listener == nil {
return ""
}
return s.listener.Addr().String()
}
28 changes: 20 additions & 8 deletions api/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"net"
"net/http"
"sync/atomic"
"time"
Expand All @@ -13,16 +14,18 @@ import (
var log = logging.Logger("rpc")

type Server struct {
http *http.Server
rpc *jsonrpc.RPCServer
srv *http.Server
rpc *jsonrpc.RPCServer
listener net.Listener

started atomic.Bool
}

func NewServer(address string, port string) *Server {
func NewServer(address, port string) *Server {
rpc := jsonrpc.NewServer()
return &Server{
rpc: rpc,
http: &http.Server{
srv: &http.Server{
Addr: address + ":" + port,
Handler: rpc,
// the amount of time allowed to read request headers. set to the default 2 seconds
Expand All @@ -44,9 +47,14 @@ func (s *Server) Start(context.Context) error {
log.Warn("cannot start server: already started")
return nil
}
listener, err := net.Listen("tcp", s.srv.Addr)
if err != nil {
return err
}
s.listener = listener
log.Infow("server started", "listening on", s.srv.Addr)
//nolint:errcheck
go s.http.ListenAndServe()
log.Infow("server started", "listening on", s.http.Addr)
go s.srv.Serve(listener)
return nil
}

Expand All @@ -57,15 +65,19 @@ func (s *Server) Stop(ctx context.Context) error {
log.Warn("cannot stop server: already stopped")
return nil
}
err := s.http.Shutdown(ctx)
err := s.srv.Shutdown(ctx)
if err != nil {
return err
}
s.listener = nil
log.Info("server stopped")
return nil
}

// ListenAddr returns the listen address of the server.
func (s *Server) ListenAddr() string {
return s.http.Addr
if s.listener == nil {
return ""
}
return s.listener.Addr().String()
}
25 changes: 20 additions & 5 deletions api/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"reflect"
"testing"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/golang/mock/gomock"
Expand All @@ -28,8 +29,20 @@ func TestRPCCallsUnderlyingNode(t *testing.T) {
t.Cleanup(cancel)
nd, server := setupNodeWithModifiedRPC(t)
url := nd.RPCServer.ListenAddr()
client, err := client.NewClient(context.Background(), "http://"+url)
t.Cleanup(client.Close)
// we need to run this a few times to prevent the race where the server is not yet started
var (
rpcClient *client.Client
err error
)
for i := 0; i < 3; i++ {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(time.Second * 1)
rpcClient, err = client.NewClient(ctx, "http://"+url)
if err == nil {
t.Cleanup(rpcClient.Close)
break
}
}
require.NotNil(t, rpcClient)
require.NoError(t, err)

expectedBalance := &state.Balance{
Expand All @@ -39,7 +52,7 @@ func TestRPCCallsUnderlyingNode(t *testing.T) {

server.State.EXPECT().Balance(gomock.Any()).Return(expectedBalance, nil).Times(1)

balance, err := client.State.Balance(ctx)
balance, err := rpcClient.State.Balance(ctx)
require.NoError(t, err)
require.Equal(t, expectedBalance, balance)
}
Expand Down Expand Up @@ -118,14 +131,16 @@ func setupNodeWithModifiedRPC(t *testing.T) (*nodebuilder.Node, *mockAPI) {
dasMock.NewMockModule(ctrl),
}

overrideRPCHandler := fx.Invoke(func(srv *rpc.Server) {
// given the behavior of fx.Invoke, this invoke will be called last as it is added at the root level module. For
// further information, check the documentation on fx.Invoke.
invokeRPC := fx.Invoke(func(srv *rpc.Server) {
srv.RegisterService("state", mockAPI.State)
srv.RegisterService("share", mockAPI.Share)
srv.RegisterService("fraud", mockAPI.Fraud)
srv.RegisterService("header", mockAPI.Header)
srv.RegisterService("das", mockAPI.Das)
})
nd := nodebuilder.TestNode(t, node.Full, overrideRPCHandler)
nd := nodebuilder.TestNode(t, node.Full, invokeRPC)
// start node
err := nd.Start(ctx)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion nodebuilder/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti
require.NoError(t, err)
cfg.Core.IP = ip
cfg.Core.RPCPort = port
cfg.RPC.Port = "26655"
cfg.RPC.Port = "0"

opts = append(opts,
state.WithKeyringSigner(TestKeyringSigner(t)),
Expand Down