Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use flow-go Components for composing #682

Open
wants to merge 8 commits into
base: feature/local-tx-reexecution
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ generate:
mockery --dir=storage --name=TraceIndexer --output=storage/mocks
mockery --all --dir=services/traces --output=services/traces/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks

.PHONY: ci
ci: check-tidy test e2e-test
Expand Down
80 changes: 60 additions & 20 deletions api/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,54 +7,94 @@ import (
"net/http"
_ "net/http/pprof"
"strconv"
"time"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

"github.com/rs/zerolog"
)

type ProfileServer struct {
logger zerolog.Logger
log zerolog.Logger
server *http.Server
endpoint string

startupCompleted chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about using a component manager here? I think we should stick to the idempotent Ready()/Done() pattern.

}

var _ component.Component = (*ProfileServer)(nil)

func NewProfileServer(
logger zerolog.Logger,
host string,
port int,
) *ProfileServer {
endpoint := net.JoinHostPort(host, strconv.Itoa(port))
return &ProfileServer{
logger: logger,
server: &http.Server{Addr: endpoint},
endpoint: endpoint,
log: logger,
server: &http.Server{Addr: endpoint},
endpoint: endpoint,
startupCompleted: make(chan struct{}),
}
}

func (s *ProfileServer) ListenAddr() string {
return s.endpoint
}
func (s *ProfileServer) Start(ctx irrecoverable.SignalerContext) {
defer close(s.startupCompleted)

s.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}

func (s *ProfileServer) Start() {
go func() {
err := s.server.ListenAndServe()
if err != nil {
s.log.Info().Msgf("Profiler server started: %s", s.endpoint)

if err := s.server.ListenAndServe(); err != nil {
// http.ErrServerClosed is returned when Close or Shutdown is called
// we don't consider this an error, so print this with debug level instead
if errors.Is(err, http.ErrServerClosed) {
s.logger.Warn().Msg("Profiler server shutdown")
return
s.log.Debug().Err(err).Msg("Profiler server shutdown")
} else {
s.log.Err(err).Msg("error running profiler server")
}
s.logger.Err(err).Msg("failed to start Profiler server")
panic(err)
}
}()
}

func (s *ProfileServer) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
func (s *ProfileServer) Ready() <-chan struct{} {
ready := make(chan struct{})

go func() {
<-s.startupCompleted
close(ready)
}()

return s.server.Shutdown(ctx)
return ready
}

func (s *ProfileServer) Close() error {
return s.server.Close()
func (s *ProfileServer) Done() <-chan struct{} {
done := make(chan struct{})
go func() {
<-s.startupCompleted
defer close(done)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := s.server.Shutdown(ctx)
if err == nil {
s.log.Info().Msg("Profiler server graceful shutdown completed")
}

if errors.Is(err, ctx.Err()) {
s.log.Warn().Msg("Profiler server graceful shutdown timed out")
err := s.server.Close()
if err != nil {
s.log.Err(err).Msg("error closing profiler server")
}
} else {
s.log.Err(err).Msg("error shutting down profiler server")
}
}()
return done
}
113 changes: 73 additions & 40 deletions api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import (
"time"

"github.com/onflow/go-ethereum/core"

"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"

gethVM "github.com/onflow/go-ethereum/core/vm"
gethLog "github.com/onflow/go-ethereum/log"
"github.com/onflow/go-ethereum/rpc"
Expand Down Expand Up @@ -57,8 +61,12 @@ type Server struct {

config config.Config
collector metrics.Collector

startupCompleted chan struct{}
}

var _ component.Component = (*Server)(nil)

const (
shutdownTimeout = 5 * time.Second
batchRequestLimit = 50
Expand All @@ -79,10 +87,11 @@ func NewServer(
gethLog.SetDefault(gethLog.NewLogger(zeroSlog))

return &Server{
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
logger: logger,
timeouts: rpc.DefaultHTTPTimeouts,
config: cfg,
collector: collector,
startupCompleted: make(chan struct{}),
}
}

Expand Down Expand Up @@ -179,9 +188,10 @@ func (h *Server) disableWS() bool {
}

// Start starts the HTTP server if it is enabled and not already running.
func (h *Server) Start() error {
func (h *Server) Start(ctx irrecoverable.SignalerContext) {
defer close(h.startupCompleted)
if h.endpoint == "" || h.listener != nil {
return nil // already running or not configured
return // already running or not configured
}

// Initialize the server.
Expand All @@ -192,16 +202,21 @@ func (h *Server) Start() error {
h.server.ReadHeaderTimeout = h.timeouts.ReadHeaderTimeout
h.server.WriteTimeout = h.timeouts.WriteTimeout
h.server.IdleTimeout = h.timeouts.IdleTimeout
h.server.BaseContext = func(_ net.Listener) context.Context {
return ctx
}
}

listenConfig := net.ListenConfig{}
// Start the server.
listener, err := net.Listen("tcp", h.endpoint)
listener, err := listenConfig.Listen(ctx, "tcp", h.endpoint)
if err != nil {
// If the server fails to start, we need to clear out the RPC and WS
// configurations so they can be configured another time.
h.disableRPC()
h.disableWS()
return err
ctx.Throw(err)
return
}

h.listener = listener
Expand All @@ -213,7 +228,7 @@ func (h *Server) Start() error {
return
}
h.logger.Err(err).Msg("failed to start API server")
panic(err)
ctx.Throw(err)
}
}()

Expand All @@ -225,8 +240,17 @@ func (h *Server) Start() error {
url := fmt.Sprintf("ws://%v", listener.Addr())
h.logger.Info().Msgf("JSON-RPC over WebSocket enabled: %s", url)
}
}

return nil
func (h *Server) Ready() <-chan struct{} {
ready := make(chan struct{})

go func() {
<-h.startupCompleted
close(ready)
}()

return ready
}

// disableRPC stops the JSON-RPC over HTTP handler.
Expand Down Expand Up @@ -296,41 +320,50 @@ func (h *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
}

// Stop shuts down the HTTP server.
func (h *Server) Stop() {
if h.listener == nil {
return // not running
}
// Done shuts down the HTTP server.
func (h *Server) Done() <-chan struct{} {
done := make(chan struct{})

// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}
go func() {
defer close(done)

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}
if h.listener == nil {
return // not running
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}
// Shut down the server.
httpHandler := h.httpHandler
if httpHandler != nil {
httpHandler.server.Stop()
h.httpHandler = nil
}

wsHandler := h.wsHandler
if wsHandler != nil {
wsHandler.server.Stop()
h.wsHandler = nil
}

ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
defer cancel()
err := h.server.Shutdown(ctx)
if err != nil && err == ctx.Err() {
h.logger.Warn().Msg("HTTP server graceful shutdown timed out")
h.server.Close()
}

h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)
h.listener.Close()
h.logger.Info().Msgf(
"HTTP server stopped, endpoint: %s", h.listener.Addr(),
)

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil

}()

// Clear out everything to allow re-configuring it later.
h.host, h.port, h.endpoint = "", 0, ""
h.server, h.listener = nil, nil
return done
}

// CheckTimeouts ensures that timeout values are meaningful
Expand Down
14 changes: 3 additions & 11 deletions api/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func newSubscription[T any](

rpcSub := notifier.CreateSubscription()

subs := models.NewSubscription(logger, callback(notifier, rpcSub))
subs := models.NewSubscription(callback(notifier, rpcSub))

l := logger.With().
Str("gateway-subscription-id", fmt.Sprintf("%p", subs)).
Expand All @@ -190,16 +190,8 @@ func newSubscription[T any](
go func() {
defer publisher.Unsubscribe(subs)

for {
select {
case err := <-subs.Error():
l.Debug().Err(err).Msg("subscription returned error")
return
case err := <-rpcSub.Err():
l.Debug().Err(err).Msg("client unsubscribed")
return
}
}
err := <-rpcSub.Err()
l.Debug().Err(err).Msg("client unsubscribed")
}()

l.Info().Msg("new heads subscription created")
Expand Down
Loading
Loading