Skip to content

Commit

Permalink
start/stop gRPC Server based on leader election
Browse files Browse the repository at this point in the history
Signed-off-by: Zbynek Roubalik <zroubalik@gmail.com>
  • Loading branch information
zroubalik committed Nov 16, 2022
1 parent 1dbd37c commit 19cd301
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 16 deletions.
14 changes: 6 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,21 +207,19 @@ func main() {
os.Exit(1)
}

grpcServer := metricsservice.NewGrpcServer(&scaledHandler, metricsServiceAddr)
if err := mgr.Add(&grpcServer); err != nil {
setupLog.Error(err, "unable to set up Metrics Service gRPC server")
os.Exit(1)
}

setupLog.Info("Starting manager")
setupLog.Info(fmt.Sprintf("KEDA Version: %s", version.Version))
setupLog.Info(fmt.Sprintf("Git Commit: %s", version.GitCommit))
setupLog.Info(fmt.Sprintf("Go Version: %s", runtime.Version()))
setupLog.Info(fmt.Sprintf("Go OS/Arch: %s/%s", runtime.GOOS, runtime.GOARCH))
setupLog.Info(fmt.Sprintf("Running on Kubernetes %s", kubeVersion.PrettyVersion), "version", kubeVersion.Version)

go func() {
setupLog.Info("Starting Metrics Service gRPC Server", "address", metricsServiceAddr)
if err := metricsservice.StartServer(&scaledHandler, metricsServiceAddr); err != nil {
setupLog.Error(err, "unable to start Metrics Service gRPC server")
os.Exit(1)
}
}()

if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
Expand Down
50 changes: 42 additions & 8 deletions pkg/metricsservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ import (

var log = ctrl.Log.WithName("grpc_server")

type grpcServer struct {
type GrpcServer struct {
server *grpc.Server
address string
scalerHandler *scaling.ScaleHandler
api.UnimplementedMetricsServiceServer
}

func (s *grpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) {
// GetMetrics returns metrics values in form of ExternalMetricValueList for specified ScaledObject reference
func (s *GrpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*v1beta1.ExternalMetricValueList, error) {
// TODO hit the metrics cache here first

cache, err := (*s.scalerHandler).GetScalersCacheForScaledObject(ctx, in.Name, in.Namespace)
Expand All @@ -62,25 +65,56 @@ func (s *grpcServer) GetMetrics(ctx context.Context, in *api.ScaledObjectRef) (*
return v1beta1ExtMetrics, nil
}

func newGrpcServer(scaleHandler *scaling.ScaleHandler) *grpc.Server {
// NewGrpcServer creates a new instance of GrpcServer
func NewGrpcServer(scaleHandler *scaling.ScaleHandler, address string) GrpcServer {
gsrv := grpc.NewServer()
srv := grpcServer{
srv := GrpcServer{
server: gsrv,
address: address,
scalerHandler: scaleHandler,
}

api.RegisterMetricsServiceServer(gsrv, &srv)
return gsrv
return srv
}

func StartServer(scaleHandler *scaling.ScaleHandler, address string) error {
lis, err := net.Listen("tcp", address)
func (s *GrpcServer) startServer() error {
lis, err := net.Listen("tcp", s.address)
if err != nil {
return fmt.Errorf("failed to listen: %v", err)
}

if err := newGrpcServer(scaleHandler).Serve(lis); err != nil {
if err := s.server.Serve(lis); err != nil {
return fmt.Errorf("failed to serve: %v", err)
}

return nil
}

// Start starts a new gRPC Metrics Service, this implements Runnable interface
// of controller-runtime Manager, so we can use mgr.Add() to start this component.
func (s *GrpcServer) Start(ctx context.Context) error {
errChan := make(chan error)

go func() {
log.Info("Starting Metrics Service gRPC Server", "address", s.address)
if err := s.startServer(); err != nil {
log.Error(err, "unable to start Metrics Service gRPC server", "address", s.address)
errChan <- err
}
}()

select {
case err := <-errChan:
return err
case <-ctx.Done():
return nil
}
}

// NeedLeaderElection is needed to implement LeaderElectionRunnable interface
// of controller-runtime. This assures that the component is started/stoped
// when this particular instance is selected/deselected as a leader.
func (s *GrpcServer) NeedLeaderElection() bool {
return true
}

0 comments on commit 19cd301

Please sign in to comment.