Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node/CCQ/Server: Clean restart #3598

Merged
merged 3 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions devnet/query-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ spec:
- --bootstrap
- /dns4/guardian-0.guardian/udp/8996/quic/p2p/12D3KooWL3XJ9EMCyZvmmGXL2LMiVBtrVa2BuESsJiXkSj7333Jw
- --logLevel=info
- --shutdownDelay1
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
- "0"
ports:
- containerPort: 6069
name: rest
Expand Down
1 change: 1 addition & 0 deletions node/cmd/ccq/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func runP2P(ctx context.Context, priv crypto.PrivKey, port uint, networkID, boot
)
default:
logger.Error("failed to write query response to channel, dropping it", zap.String("peerId", peerId), zap.Any("requestId", requestSignature))
// Leave the request in the pending map. It will get cleaned up if it times out.
}
} else {
logger.Info("waiting for more query responses",
Expand Down
6 changes: 6 additions & 0 deletions node/cmd/ccq/pending_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,9 @@ func (p *PendingResponses) Remove(r *PendingResponse) {
defer p.mu.Unlock()
delete(p.pendingResponses, signature)
}

func (p *PendingResponses) NumPending() int {
p.mu.Lock()
defer p.mu.Unlock()
return len(p.pendingResponses)
}
36 changes: 33 additions & 3 deletions node/cmd/ccq/query_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/telemetry"
Expand Down Expand Up @@ -41,6 +42,8 @@ var (
telemetryNodeName *string
statusAddr *string
promRemoteURL *string
shutdownDelay1 *uint
shutdownDelay2 *uint
)

const DEV_NETWORK_ID = "/wormhole/dev"
Expand All @@ -61,6 +64,12 @@ func init() {
telemetryNodeName = QueryServerCmd.Flags().String("telemetryNodeName", "", "Node name used in telemetry")
statusAddr = QueryServerCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
promRemoteURL = QueryServerCmd.Flags().String("promRemoteURL", "", "Prometheus remote write URL (Grafana)")

// The default health check monitoring is every five seconds, with a five second timeout, and you have to miss two, for 20 seconds total.
shutdownDelay1 = QueryServerCmd.Flags().Uint("shutdownDelay1", 25, "Seconds to delay after disabling health check on shutdown")

// The guardians will wait up to 60 seconds before giving up on a request.
shutdownDelay2 = QueryServerCmd.Flags().Uint("shutdownDelay2", 65, "Seconds to wait after delay1 for pending requests to complete")
}

var QueryServerCmd = &cobra.Command{
Expand Down Expand Up @@ -175,11 +184,12 @@ func runQueryServer(cmd *cobra.Command, args []string) {
}()

// Start the status server
var statServer *statusServer
if *statusAddr != "" {
statServer = NewStatusServer(*statusAddr, logger, env)
go func() {
ss := NewStatusServer(*statusAddr, logger, env)
logger.Sugar().Infof("Status server listening on %s", *statusAddr)
err := ss.ListenAndServe()
err := statServer.httpServer.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
logger.Fatal("Status server closed unexpectedly", zap.Error(err))
}
Expand Down Expand Up @@ -209,7 +219,27 @@ func runQueryServer(cmd *cobra.Command, args []string) {
signal.Notify(sigterm, syscall.SIGTERM)
go func() {
<-sigterm
logger.Info("Received sigterm. exiting.")
if statServer != nil && *shutdownDelay1 != 0 {
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
logger.Info("Received sigterm. disabling health checks and pausing.")
statServer.disableHealth()
time.Sleep(time.Duration(*shutdownDelay1) * time.Second)
numPending := 0
logger.Info("Waiting for any outstanding requests to complete before shutting down.")
for count := 0; count < int(*shutdownDelay2); count++ {
time.Sleep(time.Second)
numPending = pendingResponses.NumPending()
if numPending == 0 {
break
}
}
if numPending == 0 {
logger.Info("Done waiting. shutting down.")
} else {
logger.Error("Gave up waiting for pending requests to finish. shutting down anyway.", zap.Int("numStillPending", numPending))
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
logger.Info("Received sigterm. exiting.")
}
cancel()
}()

Expand Down
22 changes: 18 additions & 4 deletions node/cmd/ccq/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"sync/atomic"
"time"

"github.com/certusone/wormhole/node/pkg/common"
Expand All @@ -14,26 +15,39 @@ import (
)

type statusServer struct {
logger *zap.Logger
env common.Environment
logger *zap.Logger
env common.Environment
httpServer *http.Server
healthEnabled atomic.Bool
}

func NewStatusServer(addr string, logger *zap.Logger, env common.Environment) *http.Server {
func NewStatusServer(addr string, logger *zap.Logger, env common.Environment) *statusServer {
s := &statusServer{
logger: logger,
env: env,
}
s.healthEnabled.Store(true)
r := mux.NewRouter()
r.HandleFunc("/health", s.handleHealth).Methods("GET")
r.Handle("/metrics", promhttp.Handler())
return &http.Server{
s.httpServer = &http.Server{
Addr: addr,
Handler: r,
ReadHeaderTimeout: 5 * time.Second,
}
return s
}

func (s *statusServer) disableHealth() {
s.healthEnabled.Store(false)
}

func (s *statusServer) handleHealth(w http.ResponseWriter, r *http.Request) {
if !s.healthEnabled.Load() {
s.logger.Info("ignoring health check")
http.Error(w, "shutting down", http.StatusServiceUnavailable)
return
bruce-riley marked this conversation as resolved.
Show resolved Hide resolved
}
s.logger.Debug("health check")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, "ok")
Expand Down
Loading