Skip to content

Commit

Permalink
remove loadbalancer, add db cache, set defaults
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr committed Sep 6, 2024
1 parent b93e526 commit fac44dd
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 275 deletions.
176 changes: 27 additions & 149 deletions cuhttp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,29 @@ import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"time"

"github.com/CAFxX/httpcompression"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/gorilla/handlers"
logging "github.com/ipfs/go-log/v2"
"github.com/vulcand/oxy/forward"
"github.com/vulcand/oxy/roundrobin"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/deps/config"
"github.com/filecoin-project/curio/harmony/harmonydb"
)

var log = logging.Logger("cu-http")

// Config structure to hold all configurations, including compression levels
type Config struct {
DomainName string
CertCacheDir string
ListenAddr string
HTTPRedirectAddr string
ReadTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
ReadHeaderTimeout time.Duration
EnableCORS bool
Backends []string
LoadBalancer bool
CompressionLevels CompressionConfig
}

// CompressionConfig holds the compression levels for supported types
type CompressionConfig struct {
GzipLevel int
BrotliLevel int
DeflateLevel int
}

// RouterMap is the map that allows the library user to pass in their own routes
type RouterMap map[string]http.HandlerFunc

type startTime string

func StartHTTPServer(ctx context.Context, cfg *config.HTTPConfig, deps *deps.Deps) error {
err := NewHTTPServer(ctx, cfg)
if err != nil {
return err
}
if cfg.EnableLoadBalancer {
return NewLoadBalancerServer(ctx, cfg)
}
return nil
}

// Custom middleware to add secure HTTP headers
func secureHeaders(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -102,11 +66,13 @@ func compressionMiddleware(config *config.CompressionConfig) (func(http.Handler)
return adapter, nil
}

func NewHTTPServer(ctx context.Context, config *config.HTTPConfig) error {
func NewHTTPServer(ctx context.Context, deps *deps.Deps, config *config.HTTPConfig) error {
ch := cache{db: deps.DB}

// Set up the autocert manager for Let's Encrypt
certManager := autocert.Manager{
Cache: autocert.DirCache(config.CertCacheDir), // Directory for storing certificates
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
Cache: ch,
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
HostPolicy: autocert.HostWhitelist(config.DomainName),
}

Expand Down Expand Up @@ -188,122 +154,34 @@ func NewHTTPServer(ctx context.Context, config *config.HTTPConfig) error {
return eg.Wait()
}

// NewLoadBalancerServer sets up a load balancer server to distribute traffic to the backends
func NewLoadBalancerServer(ctx context.Context, cfg *config.HTTPConfig) error {
certManager := autocert.Manager{
Cache: autocert.DirCache(cfg.CertCacheDir), // Directory for storing certificates
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
HostPolicy: autocert.HostWhitelist(cfg.DomainName),
}
type cache struct {
db *harmonydb.DB
}

lb, err := NewLoadBalancer(cfg)
func (c cache) Get(ctx context.Context, key string) ([]byte, error) {
var ret []byte
err := c.db.QueryRow(ctx, `SELECT v FROM autocert_cache WHERE k = $1`, key).Scan(&ret)
if err != nil {
return xerrors.Errorf("failed to create load balancer: %w", err)
return nil, xerrors.Errorf("failed to get the value from DB for key %s: %w", key, err)
}

go healthCheck(ctx, lb, time.Duration(cfg.LoadBalanceHealthCheckInterval))

// Set up the HTTP load balancer server with proper timeouts
server := &http.Server{
Addr: cfg.LoadBalancerListenAddr,
Handler: http.HandlerFunc(lb.ServeHTTP), // Attach the load balancer to handle requests
ReadTimeout: cfg.ReadTimeout,
WriteTimeout: cfg.WriteTimeout,
IdleTimeout: cfg.IdleTimeout,
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
TLSConfig: &tls.Config{
GetCertificate: certManager.GetCertificate,
},
}

eg := errgroup.Group{}
eg.Go(func() error {
log.Infof("Starting load balancer server on https://%s", cfg.DomainName)
serr := server.ListenAndServeTLS("", "")
if serr != nil {
return xerrors.Errorf("failed to start listening: %w", serr)
}
return nil
})

go func() {
<-ctx.Done()
log.Warn("Shutting down load balancer Server...")
if err := server.Shutdown(context.Background()); err != nil {
log.Errorf("shutting down web server failed: %s", err)
}
log.Warn("LoadBalancer Server graceful shutdown successful")
}()

return eg.Wait()
return ret, nil
}

// NewLoadBalancer creates a load balancer with external backends
func NewLoadBalancer(config *config.HTTPConfig) (*roundrobin.RoundRobin, error) {
fwd, _ := forward.New()
lb, _ := roundrobin.New(fwd)

// Add the backend servers to the load balancer
for _, backend := range config.LoadBalancerBackends {
burl, err := url.Parse("https://" + backend)
if err != nil {
return nil, xerrors.Errorf("Failed to parse backend URL %s: %w", backend, err)
}
err = lb.UpsertServer(burl)
if err != nil {
return nil, xerrors.Errorf("Failed to add proxy backend: %w", err)
}
func (c cache) Put(ctx context.Context, key string, data []byte) error {
_, err := c.db.Exec(ctx, `INSERT INTO autocert_cache (k, v) VALUES ($1, $2)
ON CONFLICT (k) DO UPDATE SET v = EXCLUDED.v`, key, data)
if err != nil {
return xerrors.Errorf("failed to inset key value pair in DB: %w", err)
}

return lb, nil
return nil
}

// Health check function
func healthCheck(ctx context.Context, lb *roundrobin.RoundRobin, interval time.Duration) {
var failedURLs []*url.URL

ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Infof("Shutting fown the load balancer health check")
return
case <-ticker.C:
var cf []*url.URL
// Drop failed URLs from current set
for _, u := range lb.Servers() {
// Perform a health check by sending a GET request to /health endpoint
resp, err := http.Get(u.String() + "/health") // Assuming /health is the health check endpoint
if err != nil || resp.StatusCode != http.StatusOK {
// If the server is down or the status is not OK, remove it from the load balancer
log.Infof("Backend %s is down. Removing from load balancer.", u)
_ = lb.RemoveServer(u)
cf = append(cf, u)
continue
}
}

// Add back good URLs to current set from previous failed URLs
for _, u := range failedURLs {
// Perform a health check by sending a GET request to /health endpoint
resp, err := http.Get(u.String() + "/health") // Assuming /health is the health check endpoint
if err != nil || resp.StatusCode != http.StatusOK {
// If the server is down or the status is not OK, do not add it to the load balancer
log.Infof("Backend %s is down. Not adding to load balancer.", u)
cf = append(cf, u)
continue
}

// If the server responds with OK, ensure it add it to the load balancer
err = lb.UpsertServer(u)
if err != nil {
log.Errorf("Failed to re-add backend %s: %v", u, err)
}
}

failedURLs = cf
}
func (c cache) Delete(ctx context.Context, key string) error {
_, err := c.db.Exec(ctx, `DELETE FROM autocert_cache WHERE k = $1`, key)
if err != nil {
return xerrors.Errorf("failed to delete key value pair from DB: %w", err)
}
return nil
}

var _ autocert.Cache = cache{}
40 changes: 0 additions & 40 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 14 additions & 22 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ func DefaultCurioConfig() *CurioConfig {
AlertManagerURL: "http://localhost:9093/api/v2/alerts",
},
},
HTTP: HTTPConfig{
DomainName: "",
ListenAddr: "0.0.0.0:12310",
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 10,
IdleTimeout: time.Minute * 2,
ReadHeaderTimeout: time.Second * 5,
EnableCORS: true,
CompressionLevels: CompressionConfig{
GzipLevel: 6,
BrotliLevel: 4,
DeflateLevel: 6,
},
},
}
}

Expand Down Expand Up @@ -555,15 +569,9 @@ type HTTPConfig struct {
// DomainName specifies the domain name that the server uses to serve HTTP requests.
DomainName string

// CertCacheDir path to the cache directory for storing SSL certificates needed for HTTPS.
CertCacheDir string

// ListenAddr is the address that the server listens for HTTP requests.
ListenAddr string

// HTTPRedirectAddr is the address to which HTTP requests are redirected. It's usually used when you want to enforce HTTPS.
HTTPRedirectAddr string

// ReadTimeout is the maximum duration for reading the entire or next request, including body, from the client.
ReadTimeout time.Duration

Expand All @@ -581,22 +589,6 @@ type HTTPConfig struct {

// CompressionLevels hold the compression level for various compression methods supported by the server
CompressionLevels CompressionConfig

// EnableLoadBalancer indicates whether load balancing between backend servers is enabled. It should only
// be enabled on one node per domain name.
EnableLoadBalancer bool

// LoadBalancerListenAddr is the listen address for load balancer. This must be different from ListenAddr of the
// HTTP server.
LoadBalancerListenAddr string

// LoadBalancerBackends holds a list of listen addresses to which HTTP requests can be routed. Current ListenAddr
// should also be added to backends if LoadBalancer is enabled
LoadBalancerBackends []string

// LoadBalanceHealthCheckInterval is the duration to check the status of all backend URLs and adjust the
// loadbalancer backend based on the results
LoadBalanceHealthCheckInterval Duration
}

// CompressionConfig holds the compression levels for supported types
Expand Down
Loading

0 comments on commit fac44dd

Please sign in to comment.