From 18415a596021b8ed377f5c2bc49b74a5e7df9e2e Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Wed, 15 Jan 2020 17:49:14 +1100 Subject: [PATCH 1/6] Perform a graceful shutdown for the function proxy on SIGINT & SIGTERM. --- pkg/function-proxy/proxy.go | 40 +++++++++++++++++++++---- pkg/function-proxy/utils/proxy-utils.go | 9 ++---- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/pkg/function-proxy/proxy.go b/pkg/function-proxy/proxy.go index 7210b5ab1..bc3e96e53 100644 --- a/pkg/function-proxy/proxy.go +++ b/pkg/function-proxy/proxy.go @@ -23,6 +23,9 @@ import ( "net/http" "os" "os/exec" + "os/signal" + "syscall" + "time" "github.com/kubeless/kubeless/pkg/function-proxy/utils" @@ -80,10 +83,35 @@ func startNativeDaemon() { } } -func main() { - go startNativeDaemon() - http.HandleFunc("/", handler) - http.HandleFunc("/healthz", health) - http.Handle("/metrics", promhttp.Handler()) - utils.ListenAndServe() +func gracefulShutdown(server *http.Server) { + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-stop + timeout := 10*time.Second; + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + log.Printf("Shuting down with timeout: %s\n", timeout) + if err := server.Shutdown(ctx); err != nil { + log.Printf("Error: %v\n", err) + } else { + log.Println("Server stopped") + } } + +func main() { + mux := http.NewServeMux() + mux.HandleFunc("/", handler) + mux.HandleFunc("/healthz", health) + mux.Handle("/metrics", promhttp.Handler()) + + server := utils.NewServer(mux) + + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + panic(err) + } + }() + + gracefulShutdown(server) +} \ No newline at end of file diff --git a/pkg/function-proxy/utils/proxy-utils.go b/pkg/function-proxy/utils/proxy-utils.go index bc53050d1..da4a04d88 100644 --- a/pkg/function-proxy/utils/proxy-utils.go +++ b/pkg/function-proxy/utils/proxy-utils.go @@ -139,9 +139,6 @@ func Handler(w http.ResponseWriter, r *http.Request, h Handle) { } } -// ListenAndServe starts an HTTP server in FUNC_PORT using custom logging -func ListenAndServe() { - if err := http.ListenAndServe(fmt.Sprintf(":%s", funcPort), logReq(http.DefaultServeMux)); err != nil { - panic(err) - } -} +func NewServer(mux *http.ServeMux) *http.Server { + return &http.Server{Addr: fmt.Sprintf(":%s", funcPort), Handler: logReq(mux)} +} \ No newline at end of file From 413403e7f753d39c912db91f7c7ef5222b84bfea Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Wed, 15 Jan 2020 18:45:48 +1100 Subject: [PATCH 2/6] Add comment to exported function NewServer. --- pkg/function-proxy/utils/proxy-utils.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/function-proxy/utils/proxy-utils.go b/pkg/function-proxy/utils/proxy-utils.go index da4a04d88..11ebafab6 100644 --- a/pkg/function-proxy/utils/proxy-utils.go +++ b/pkg/function-proxy/utils/proxy-utils.go @@ -139,6 +139,8 @@ func Handler(w http.ResponseWriter, r *http.Request, h Handle) { } } +// NewServer returns an HTTP server ready to listen on the configured port +// and with logReq mixed in for logging. func NewServer(mux *http.ServeMux) *http.Server { return &http.Server{Addr: fmt.Sprintf(":%s", funcPort), Handler: logReq(mux)} } \ No newline at end of file From 8246415094dd2b7304133c9a4f822c4637a8851b Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Wed, 15 Jan 2020 19:37:05 +1100 Subject: [PATCH 3/6] gofmt --- pkg/function-proxy/proxy.go | 4 ++-- pkg/function-proxy/utils/proxy-utils.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/function-proxy/proxy.go b/pkg/function-proxy/proxy.go index bc3e96e53..1275ae3e4 100644 --- a/pkg/function-proxy/proxy.go +++ b/pkg/function-proxy/proxy.go @@ -87,7 +87,7 @@ func gracefulShutdown(server *http.Server) { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) <-stop - timeout := 10*time.Second; + timeout := 10 * time.Second ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -114,4 +114,4 @@ func main() { }() gracefulShutdown(server) -} \ No newline at end of file +} diff --git a/pkg/function-proxy/utils/proxy-utils.go b/pkg/function-proxy/utils/proxy-utils.go index 11ebafab6..1490e43a2 100644 --- a/pkg/function-proxy/utils/proxy-utils.go +++ b/pkg/function-proxy/utils/proxy-utils.go @@ -143,4 +143,4 @@ func Handler(w http.ResponseWriter, r *http.Request, h Handle) { // and with logReq mixed in for logging. func NewServer(mux *http.ServeMux) *http.Server { return &http.Server{Addr: fmt.Sprintf(":%s", funcPort), Handler: logReq(mux)} -} \ No newline at end of file +} From 5d9f2432c390a6e955a759d2fd137bee86b67c64 Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Thu, 16 Jan 2020 10:19:05 +1100 Subject: [PATCH 4/6] Read SHUTDOWN_TIMEOUT env var for the shutdown timeout value. --- pkg/function-proxy/proxy.go | 21 +-------------- pkg/function-proxy/utils/proxy-utils.go | 35 ++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 24 deletions(-) diff --git a/pkg/function-proxy/proxy.go b/pkg/function-proxy/proxy.go index 1275ae3e4..2499891bb 100644 --- a/pkg/function-proxy/proxy.go +++ b/pkg/function-proxy/proxy.go @@ -23,9 +23,6 @@ import ( "net/http" "os" "os/exec" - "os/signal" - "syscall" - "time" "github.com/kubeless/kubeless/pkg/function-proxy/utils" @@ -83,22 +80,6 @@ func startNativeDaemon() { } } -func gracefulShutdown(server *http.Server) { - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - <-stop - timeout := 10 * time.Second - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - - log.Printf("Shuting down with timeout: %s\n", timeout) - if err := server.Shutdown(ctx); err != nil { - log.Printf("Error: %v\n", err) - } else { - log.Println("Server stopped") - } -} - func main() { mux := http.NewServeMux() mux.HandleFunc("/", handler) @@ -113,5 +94,5 @@ func main() { } }() - gracefulShutdown(server) + utils.GracefulShutdown(server) } diff --git a/pkg/function-proxy/utils/proxy-utils.go b/pkg/function-proxy/utils/proxy-utils.go index 1490e43a2..ea2f19ea4 100644 --- a/pkg/function-proxy/utils/proxy-utils.go +++ b/pkg/function-proxy/utils/proxy-utils.go @@ -22,17 +22,21 @@ import ( "log" "net/http" "os" + "os/signal" "strconv" + "syscall" "time" "github.com/prometheus/client_golang/prometheus" ) var ( - timeout = os.Getenv("FUNC_TIMEOUT") - funcPort = os.Getenv("FUNC_PORT") - intTimeout int - funcHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + timeout = os.Getenv("FUNC_TIMEOUT") + funcPort = os.Getenv("FUNC_PORT") + shutdownTimeout = os.Getenv("SHUTDOWN_TIMEOUT") + intTimeout int + intShutdownTimeout int + funcHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "function_duration_seconds", Help: "Duration of user function in seconds", }, []string{"method"}) @@ -53,11 +57,18 @@ func init() { if funcPort == "" { funcPort = "8080" } + if shutdownTimeout == "" { + shutdownTimeout = "10" + } var err error intTimeout, err = strconv.Atoi(timeout) if err != nil { panic(err) } + intShutdownTimeout, err = strconv.Atoi(shutdownTimeout) + if err != nil { + panic(err) + } prometheus.MustRegister(funcHistogram, funcCalls, funcErrors) } @@ -144,3 +155,19 @@ func Handler(w http.ResponseWriter, r *http.Request, h Handle) { func NewServer(mux *http.ServeMux) *http.Server { return &http.Server{Addr: fmt.Sprintf(":%s", funcPort), Handler: logReq(mux)} } + +func GracefulShutdown(server *http.Server) { + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + <-stop + timeoutDuration := time.Duration(intShutdownTimeout) * time.Second + ctx, cancel := context.WithTimeout(context.Background(), timeoutDuration) + defer cancel() + + log.Printf("Shuting down with timeout: %s\n", timeoutDuration) + if err := server.Shutdown(ctx); err != nil { + log.Printf("Error: %v\n", err) + } else { + log.Println("Server stopped") + } +} From 59da64f8735ed03b0faedd3ee1408a12f8a1c9fe Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Thu, 16 Jan 2020 10:25:43 +1100 Subject: [PATCH 5/6] Add comment to exported function. --- pkg/function-proxy/utils/proxy-utils.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/function-proxy/utils/proxy-utils.go b/pkg/function-proxy/utils/proxy-utils.go index ea2f19ea4..77bce1c36 100644 --- a/pkg/function-proxy/utils/proxy-utils.go +++ b/pkg/function-proxy/utils/proxy-utils.go @@ -156,6 +156,8 @@ func NewServer(mux *http.ServeMux) *http.Server { return &http.Server{Addr: fmt.Sprintf(":%s", funcPort), Handler: logReq(mux)} } +// GracefulShutdown accepts a server reference and triggers a graceful shutdown +// for it when either SIGINT or SIGTERM is received. func GracefulShutdown(server *http.Server) { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) From 7cea83af060bcd4e9cd49246b3b6826ebb3f087a Mon Sep 17 00:00:00 2001 From: Dylan McGannon Date: Thu, 16 Jan 2020 13:23:16 +1100 Subject: [PATCH 6/6] replace accidently removed function call. --- pkg/function-proxy/proxy.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/function-proxy/proxy.go b/pkg/function-proxy/proxy.go index 2499891bb..d884eb6bc 100644 --- a/pkg/function-proxy/proxy.go +++ b/pkg/function-proxy/proxy.go @@ -81,6 +81,8 @@ func startNativeDaemon() { } func main() { + go startNativeDaemon() + mux := http.NewServeMux() mux.HandleFunc("/", handler) mux.HandleFunc("/healthz", health)