diff --git a/cmd/agent/app/reporter/grpc/collector_proxy.go b/cmd/agent/app/reporter/grpc/collector_proxy.go index d2b20089d7d..4770a7f5580 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy.go @@ -31,6 +31,7 @@ import ( type ProxyBuilder struct { reporter aReporter.Reporter manager httpserver.ClientConfigManager + conn *grpc.ClientConn } // NewCollectorProxy creates ProxyBuilder @@ -54,6 +55,7 @@ func NewCollectorProxy(o *Options, logger *zap.Logger) (*ProxyBuilder, error) { conn, _ = grpc.Dial(o.CollectorHostPort[0], grpc.WithInsecure()) } return &ProxyBuilder{ + conn: conn, reporter: NewReporter(conn, logger), manager: NewSamplingManager(conn)}, nil } @@ -67,3 +69,8 @@ func (b ProxyBuilder) GetReporter() aReporter.Reporter { func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager { return b.manager } + +// Close closes connections used by proxy. +func (b ProxyBuilder) Close() error { + return b.conn.Close() +} diff --git a/cmd/agent/app/reporter/grpc/collector_proxy_test.go b/cmd/agent/app/reporter/grpc/collector_proxy_test.go index c0c8d5eed0e..7c4433e2ab9 100644 --- a/cmd/agent/app/reporter/grpc/collector_proxy_test.go +++ b/cmd/agent/app/reporter/grpc/collector_proxy_test.go @@ -38,6 +38,8 @@ func TestProxyBuilder(t *testing.T) { require.NotNil(t, proxy) assert.NotNil(t, proxy.GetReporter()) assert.NotNil(t, proxy.GetManager()) + assert.Nil(t, proxy.Close()) + assert.EqualError(t, proxy.Close(), "rpc error: code = Canceled desc = grpc: the client connection is closing") } func TestMultipleCollectors(t *testing.T) { @@ -70,4 +72,5 @@ func TestMultipleCollectors(t *testing.T) { } } assert.Equal(t, true, bothServers) + require.Nil(t, proxy.Close()) } diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy.go b/cmd/agent/app/reporter/tchannel/collector_proxy.go index fa855b68dd6..c4ca150975f 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy.go @@ -49,3 +49,9 @@ func (b ProxyBuilder) GetReporter() reporter.Reporter { func (b ProxyBuilder) GetManager() httpserver.ClientConfigManager { return b.manager } + +// Close closes connections used by proxy. +func (b ProxyBuilder) Close() error { + b.reporter.Channel().Close() + return nil +} diff --git a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go index 1f320548fa6..91b0e40a726 100644 --- a/cmd/agent/app/reporter/tchannel/collector_proxy_test.go +++ b/cmd/agent/app/reporter/tchannel/collector_proxy_test.go @@ -43,4 +43,5 @@ func TestCreate(t *testing.T) { assert.Equal(t, r, b.GetReporter()) m := httpserver.NewCollectorProxy(r.CollectorServiceName(), r.Channel(), mFactory) assert.Equal(t, m, b.GetManager()) + assert.Nil(t, b.Close()) } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index fa8772ecf3d..28b90e3ba4e 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -16,9 +16,12 @@ package main import ( "fmt" + "io" "io/ioutil" "net/http" "os" + "os/signal" + "syscall" "github.com/pkg/errors" "github.com/spf13/cobra" @@ -38,6 +41,9 @@ import ( ) func main() { + var signalsChannel = make(chan os.Signal) + signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + v := viper.New() var command = &cobra.Command{ Use: "jaeger-agent", @@ -88,7 +94,13 @@ func main() { if err := agent.Run(); err != nil { return errors.Wrap(err, "Failed to run the agent") } - select {} + <-signalsChannel + logger.Info("Shutting down") + if closer, ok := cp.(io.Closer); ok { + closer.Close() + } + logger.Info("Shutdown complete") + return nil }, } diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 02e38704083..8a1f7f5ea20 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -139,12 +139,13 @@ func main() { qOpts := new(queryApp.QueryOptions).InitFromViper(v) startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory) - startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc) + grpcServer := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc) startQuery(qOpts, spanReader, dependencyReader, logger, metricsFactory, mBldr, hc) hc.Ready() <-signalsChannel logger.Info("Shutting down") if closer, ok := spanWriter.(io.Closer); ok { + grpcServer.GracefulStop() err := closer.Close() if err != nil { logger.Error("Failed to close span writer", zap.Error(err)) @@ -236,7 +237,7 @@ func startCollector( baseFactory metrics.Factory, strategyStore strategystore.StrategyStore, hc *healthcheck.HealthCheck, -) { +) *grpc.Server { metricsFactory := baseFactory.Namespace("collector", nil) spanBuilder, err := collector.NewSpanHandlerBuilder( @@ -269,11 +270,9 @@ func startCollector( ch.Serve(listener) } - { - grpcserver.StartGRPCCollector(cOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger, - func(err error) { - logger.Fatal("gRPC collector failed", zap.Error(err)) - }) + server, err := startGRPCServer(cOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger) + if err != nil { + logger.Fatal("Could not start gRPC collector", zap.Error(err)) } { @@ -293,6 +292,23 @@ func startCollector( hc.Set(healthcheck.Unavailable) }() } + return server +} + +func startGRPCServer( + port int, + handler *collectorApp.GRPCHandler, + samplingStore strategystore.StrategyStore, + logger *zap.Logger, +) (*grpc.Server, error) { + server := grpc.NewServer() + _, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) { + logger.Fatal("gRPC collector failed", zap.Error(err)) + }) + if err != nil { + return nil, err + } + return server, err } func startZipkinHTTPAPI( diff --git a/cmd/collector/main.go b/cmd/collector/main.go index d7d0b6a63b7..1eedaf06ac7 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -140,11 +140,9 @@ func main() { ch.Serve(listener) } - { - grpcserver.StartGRPCCollector(builderOpts.CollectorGRPCPort, grpc.NewServer(), grpcHandler, strategyStore, logger, - func(err error) { - logger.Fatal("gRPC collector failed", zap.Error(err)) - }) + server, err := startGRPCServer(builderOpts.CollectorGRPCPort, grpcHandler, strategyStore, logger) + if err != nil { + logger.Fatal("Could not start gRPC collector", zap.Error(err)) } { @@ -174,6 +172,7 @@ func main() { <-signalsChannel logger.Info("Shutting down") if closer, ok := spanWriter.(io.Closer); ok { + server.GracefulStop() err := closer.Close() if err != nil { logger.Error("Failed to close span writer", zap.Error(err)) @@ -207,6 +206,22 @@ func main() { } } +func startGRPCServer( + port int, + handler *app.GRPCHandler, + samplingStore strategystore.StrategyStore, + logger *zap.Logger, +) (*grpc.Server, error) { + server := grpc.NewServer() + _, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) { + logger.Fatal("gRPC collector failed", zap.Error(err)) + }) + if err != nil { + return nil, err + } + return server, err +} + func startZipkinHTTPAPI( logger *zap.Logger, zipkinPort int,