Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Register healthz interface for controller and scheduler #446

Merged
merged 1 commit into from
Sep 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions cmd/controllers/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
defaultBurst = 100
defaultWorkers = 3
defaultSchedulerName = "volcano"

defaultHealthzBindAddress = "127.0.0.1:11252"
)

// ServerOption is the main context object for the controller manager.
Expand All @@ -42,6 +44,9 @@ type ServerOption struct {
// concurrently. Larger number = faster job updating, but more CPU load.
WorkerThreads uint32
SchedulerName string
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:11252
HealthzBindAddress string
}

// NewServerOption creates a new CMServer with a default config.
Expand All @@ -63,6 +68,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
fs.Uint32Var(&s.WorkerThreads, "worker-threads", defaultWorkers, "The number of threads syncing job operations concurrently. "+
"Larger number = faster job updating, but more CPU load")
fs.StringVar(&s.SchedulerName, "scheduler-name", defaultSchedulerName, "Volcano will handle pods whose .spec.SchedulerName is same as scheduler-name")
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")
}

// CheckOptionOrDie checks the LockObjectNamespace
Expand Down
13 changes: 7 additions & 6 deletions cmd/controllers/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
Master: "127.0.0.1",
KubeAPIQPS: defaultQPS,
KubeAPIBurst: 200,
PrintVersion: false,
WorkerThreads: defaultWorkers,
SchedulerName: defaultSchedulerName,
HealthzBindAddress: "127.0.0.1:11252",
}

if !reflect.DeepEqual(expected, s) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/controllers/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/client-go/tools/record"

"volcano.sh/volcano/cmd/controllers/app/options"
"volcano.sh/volcano/pkg/apis/helpers"
vkclient "volcano.sh/volcano/pkg/client/clientset/versioned"
"volcano.sh/volcano/pkg/controllers/garbagecollector"
"volcano.sh/volcano/pkg/controllers/job"
Expand Down Expand Up @@ -80,6 +81,10 @@ func Run(opt *options.ServerOption) error {
return err
}

if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-controller"); err != nil {
return err
}

run := startControllers(config, opt)

if !opt.EnableLeaderElection {
Expand Down
6 changes: 6 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
defaultQueue = "default"
defaultListenAddress = ":8080"

defaultHealthzBindAddress = "127.0.0.1:11251"

defaultQPS = 50.0
defaultBurst = 100
)
Expand All @@ -48,6 +50,9 @@ type ServerOption struct {
EnablePriorityClass bool
KubeAPIBurst int
KubeAPIQPS float32
// HealthzBindAddress is the IP address and port for the health check server to serve on,
// defaulting to 127.0.0.1:11251
HealthzBindAddress string
}

// ServerOpts server options
Expand Down Expand Up @@ -78,6 +83,7 @@ func (s *ServerOption) AddFlags(fs *pflag.FlagSet) {
"Enable PriorityClass to provide the capacity of preemption at pod group level; to disable it, set it false")
fs.Float32Var(&s.KubeAPIQPS, "kube-api-qps", defaultQPS, "QPS to use while talking with kubernetes apiserver")
fs.IntVar(&s.KubeAPIBurst, "kube-api-burst", defaultBurst, "Burst to use while talking with kubernetes apiserver")
fs.StringVar(&s.HealthzBindAddress, "healthz-bind-address", defaultHealthzBindAddress, "The address to listen on for /healthz HTTP requests.")
}

// CheckOptionOrDie check lock-object-namespace when LeaderElection is enabled
Expand Down
13 changes: 7 additions & 6 deletions cmd/scheduler/app/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,13 @@ func TestAddFlags(t *testing.T) {

// This is a snapshot of expected options parsed by args.
expected := &ServerOption{
SchedulerName: defaultSchedulerName,
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
SchedulerName: defaultSchedulerName,
SchedulePeriod: 5 * time.Minute,
DefaultQueue: defaultQueue,
ListenAddress: defaultListenAddress,
KubeAPIBurst: defaultBurst,
KubeAPIQPS: defaultQPS,
HealthzBindAddress: "127.0.0.1:11251",
}

if !reflect.DeepEqual(expected, s) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus/promhttp"

"volcano.sh/volcano/cmd/scheduler/app/options"
"volcano.sh/volcano/pkg/apis/helpers"
"volcano.sh/volcano/pkg/scheduler"
"volcano.sh/volcano/pkg/version"

Expand Down Expand Up @@ -96,6 +98,10 @@ func Run(opt *options.ServerOption) error {
glog.Fatalf("Prometheus Http Server failed %s", http.ListenAndServe(opt.ListenAddress, nil))
}()

if err := helpers.StartHealthz(opt.HealthzBindAddress, "volcano-scheduler"); err != nil {
return err
}

run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
Expand Down
80 changes: 80 additions & 0 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ limitations under the License.
package helpers

import (
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/golang/glog"

"k8s.io/api/core/v1"
Expand All @@ -25,6 +34,9 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/mux"
"k8s.io/client-go/kubernetes"

vkbatchv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
Expand Down Expand Up @@ -148,3 +160,71 @@ func GeneratePodgroupName(pod *v1.Pod) string {

return pgName
}

// StartHealthz register healthz interface
func StartHealthz(healthzBindAddress, name string) error {
listener, err := net.Listen("tcp", healthzBindAddress)
if err != nil {
return fmt.Errorf("failed to create listener: %v", err)
}

pathRecorderMux := mux.NewPathRecorderMux(name)
healthz.InstallHandler(pathRecorderMux)

server := &http.Server{
Addr: listener.Addr().String(),
Handler: pathRecorderMux,
MaxHeaderBytes: 1 << 20,
}

return runServer(server, listener)
}

func runServer(server *http.Server, ln net.Listener) error {
if ln == nil || server == nil {
return fmt.Errorf("listener and server must not be nil")
}

stopCh := make(chan os.Signal)
signal.Notify(stopCh, syscall.SIGTERM, syscall.SIGINT)

go func() {
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), 0)
server.Shutdown(ctx)
cancel()
}()

go func() {
defer utilruntime.HandleCrash()

var listener net.Listener
listener = tcpKeepAliveListener{ln.(*net.TCPListener)}

err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", listener.Addr().String())
select {
case <-stopCh:
glog.Info(msg)
default:
glog.Fatalf("%s due to error: %v", msg, err)
}
}()

return nil
}

type tcpKeepAliveListener struct {
*net.TCPListener
}

// Accept waits for and returns the next connection to the listener.
func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(3 * time.Minute)
return tc, nil
}
21 changes: 21 additions & 0 deletions vendor/k8s.io/apiserver/pkg/server/healthz/doc.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading