Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
Signed-off-by: chahatsagarmain <chahatsagar2003@gmail.com>
  • Loading branch information
chahatsagarmain committed Dec 3, 2024
1 parent 624bb69 commit a55d3a2
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 86 deletions.
20 changes: 5 additions & 15 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"net/http"
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -104,23 +102,15 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
Handler: c.spanHandlers.GRPCHandler,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
ServerConfig: &configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: options.GRPC.NetAddr.Endpoint,
},
TLSSetting: options.GRPC.TLSSetting,
MaxRecvMsgSizeMiB: options.GRPC.MaxRecvMsgSizeMiB,
Keepalive: options.GRPC.Keepalive,
},
ServerConfig: options.GRPC,
})
if err != nil {
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer
httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTP.Endpoint,
ServerConfig: options.HTTP,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLSSetting,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Expand Down Expand Up @@ -171,11 +161,11 @@ func (c *Collector) Close() error {

// Stop HTTP server
if c.hServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.hServer.Shutdown(timeout); err != nil {
// timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.hServer.Close(); err != nil {
c.logger.Fatal("failed to stop the main HTTP server", zap.Error(err))
}
defer cancel()
// defer cancel()
}

// Stop Zipkin receiver
Expand Down
16 changes: 14 additions & 2 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
Expand All @@ -36,15 +39,24 @@ func optionsForEphemeralPorts() *flags.CollectorOptions {
MaxConnectionIdle: 10,
},
}
collectorOpts.HTTP.Endpoint = ":0"
collectorOpts.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
collectorOpts.HTTP = confighttp.ServerConfig{
Endpoint: ":0",
TLSSetting: &configtls.ServerConfig{},
}
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.NetAddr.Endpoint = ":0"
collectorOpts.OTLP.GRPC.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10,
},
}
collectorOpts.OTLP.HTTP.Endpoint = ":0"
collectorOpts.OTLP.GRPC.NetAddr.Transport = confignet.TransportTypeTCP

collectorOpts.OTLP.HTTP = confighttp.ServerConfig{
Endpoint: ":0",
TLSSetting: &configtls.ServerConfig{},
}
collectorOpts.Zipkin.Endpoint = ":0"
collectorOpts.Tenancy = tenancy.Options{}
return collectorOpts
Expand Down
25 changes: 15 additions & 10 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,15 @@ var otlpServerFlagsCfg = struct {
},
}

var tlsZipkinFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.zipkin",
var zipkinServerFlagsCfg = struct {
HTTP serverFlagsConfig
}{
HTTP: serverFlagsConfig{
prefix: "collector.zipkin",
tls: tlscfg.ServerFlagsConfig{
Prefix: "collector.zipkin",
},
},
}

var corsZipkinFlags = corscfg.Flags{
Expand Down Expand Up @@ -150,7 +157,7 @@ func AddFlags(flagSet *flag.FlagSet) {

flagSet.String(flagZipkinHTTPHostPort, "", "The host:port (e.g. 127.0.0.1:9411 or :9411) of the collector's Zipkin server (disabled by default)")
flagSet.Bool(flagZipkinKeepAliveEnabled, true, "KeepAlive configures allow Keep-Alive for Zipkin HTTP server (enabled by default)")
tlsZipkinFlagsConfig.AddFlags(flagSet)
zipkinServerFlagsCfg.HTTP.tls.AddFlags(flagSet)
corsZipkinFlags.AddFlags(flagSet)

tenancy.AddFlags(flagSet)
Expand Down Expand Up @@ -205,7 +212,7 @@ func initGRPCFromViper(v *viper.Viper, _ *zap.Logger, opts *configgrpc.ServerCon
}
opts.TLSSetting = tlsOpts.ToOtelServerConfig()
opts.NetAddr.Endpoint = ports.FormatHostPort(v.GetString(cfg.prefix + "." + flagSuffixHostPort))
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength) * 1024 * 1024
opts.MaxRecvMsgSizeMiB = v.GetInt(cfg.prefix+"."+flagSuffixGRPCMaxReceiveMessageLength) / (1024 * 1024)
opts.Keepalive = &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionAge: v.GetDuration(cfg.prefix + "." + flagSuffixGRPCMaxConnectionAge),
Expand Down Expand Up @@ -239,17 +246,15 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger)
}
corsOpts := corsOTLPFlags.InitFromViper(v)
cOpts.OTLP.HTTP.CORS = corsOpts.ToOTELCorsConfig()
if err := initGRPCFromViper(v, logger, &cOpts.GRPC, otlpServerFlagsCfg.GRPC); err != nil {
if err := initGRPCFromViper(v, logger, &cOpts.OTLP.GRPC, otlpServerFlagsCfg.GRPC); err != nil {
return cOpts, fmt.Errorf("failed to parse OTLP/gRPC server options: %w", err)
}

// cOpts.Zipkin. = v.GetBool(flagZipkinKeepAliveEnabled)
cOpts.Zipkin.Endpoint = ports.FormatHostPort(v.GetString(flagZipkinHTTPHostPort))
tlsZipkin, err := tlsZipkinFlagsConfig.InitFromViper(v)
if err != nil {
return cOpts, fmt.Errorf("failed to parse Zipkin TLS options: %w", err)

if err := initHTTPFromViper(v, logger, &cOpts.Zipkin, zipkinServerFlagsCfg.HTTP); err != nil {
return cOpts, fmt.Errorf("failed to parse Zipkin server options: %w", err)
}
cOpts.Zipkin.TLSSetting = tlsZipkin.ToOtelServerConfig()
corsOpts = corsZipkinFlags.InitFromViper(v)
cOpts.Zipkin.CORS = corsOpts.ToOTELCorsConfig()

Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/flags/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestCollectorOptionsWithFlags_CheckMaxReceiveMessageLength(t *testing.T) {
_, err := c.InitFromViper(v, zap.NewNop())
require.NoError(t, err)

assert.Equal(t, 8388608, c.GRPC.MaxRecvMsgSizeMiB)
assert.Equal(t, 8, c.GRPC.MaxRecvMsgSizeMiB)
}

func TestCollectorOptionsWithFlags_CheckMaxConnectionAge(t *testing.T) {
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestCollectorOptionsWithFlags_CheckFullTenancy(t *testing.T) {
// })
// c.InitFromViper(v, zap.NewNop())

// assert.False(t, c.Zipkin.KeepAlive)
// assert.False(t, c.Zipkin.)
// }

func TestMain(m *testing.M) {
Expand Down
1 change: 1 addition & 0 deletions cmd/collector/app/handler/otlp_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func optionsWithPorts(port string) *flags.CollectorOptions {
Endpoint: port,
},
}
opts.OTLP.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
return opts
}

Expand Down
29 changes: 20 additions & 9 deletions cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ import (
"fmt"
"net"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
Expand All @@ -25,7 +29,7 @@ import (

// GRPCServerParams to construct a new Jaeger Collector gRPC Server
type GRPCServerParams struct {
*configgrpc.ServerConfig
configgrpc.ServerConfig
Handler *handler.GRPCHandler
SamplingProvider samplingstrategy.Provider
Logger *zap.Logger
Expand All @@ -38,16 +42,16 @@ type GRPCServerParams struct {
// StartGRPCServer based on the given parameters
func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
var server *grpc.Server
var grpcOpts []grpc.ServerOption
var grpcOpts []configgrpc.ToServerOption

if params.MaxRecvMsgSizeMiB > 0 {
grpcOpts = append(grpcOpts, grpc.MaxRecvMsgSize(params.MaxRecvMsgSizeMiB))
grpcOpts = append(grpcOpts, configgrpc.WithGrpcServerOption(grpc.MaxRecvMsgSize(params.MaxRecvMsgSizeMiB)))
}
if params.Keepalive != nil {
grpcOpts = append(grpcOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
grpcOpts = append(grpcOpts, configgrpc.WithGrpcServerOption(grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: params.Keepalive.ServerParameters.MaxConnectionAge,
MaxConnectionAgeGrace: params.Keepalive.ServerParameters.MaxConnectionAgeGrace,
}))
})))
}

if params.TLSSetting != nil {
Expand All @@ -58,13 +62,20 @@ func StartGRPCServer(params *GRPCServerParams) (*grpc.Server, error) {
}

creds := credentials.NewTLS(tlsCfg)
grpcOpts = append(grpcOpts, grpc.Creds(creds))
grpcOpts = append(grpcOpts, configgrpc.WithGrpcServerOption(grpc.Creds(creds)))
}

server = grpc.NewServer(grpcOpts...)
server, err := params.ToServer(context.Background(), nil, component.TelemetrySettings{
Logger: params.Logger,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}, grpcOpts...)
if err != nil {
return nil, err
}
reflection.Register(server)
fmt.Printf("grpc endpoint %v \n", params.NetAddr.Endpoint)
listener, err := net.Listen("tcp", params.NetAddr.Endpoint)
listener, err := params.NetAddr.Listen(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to listen on gRPC port: %w", err)
}
Expand Down
22 changes: 17 additions & 5 deletions cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (
func TestFailToListen(t *testing.T) {
logger, _ := zap.NewDevelopment()
server, err := StartGRPCServer(&GRPCServerParams{
ServerConfig: &configgrpc.ServerConfig{
ServerConfig: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":-1",
Endpoint: ":-1",
Transport: confignet.TransportTypeTCP,
},
},
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}),
Expand Down Expand Up @@ -72,8 +73,11 @@ func TestSpanCollector(t *testing.T) {
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}),
SamplingProvider: &mockSamplingProvider{},
Logger: logger,
ServerConfig: &configgrpc.ServerConfig{
MaxRecvMsgSizeMiB: 1,
ServerConfig: configgrpc.ServerConfig{
MaxRecvMsgSizeMiB: 2,
NetAddr: confignet.AddrConfig{
Transport: confignet.TransportTypeTCP,
},
},
}

Expand Down Expand Up @@ -105,7 +109,10 @@ func TestCollectorStartWithTLS(t *testing.T) {
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}),
SamplingProvider: &mockSamplingProvider{},
Logger: logger,
ServerConfig: &configgrpc.ServerConfig{
ServerConfig: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Transport: confignet.TransportTypeTCP,
},
TLSSetting: opts.ToOtelServerConfig(),
},
}
Expand All @@ -120,6 +127,11 @@ func TestCollectorReflection(t *testing.T) {
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.Manager{}),
SamplingProvider: &mockSamplingProvider{},
Logger: logger,
ServerConfig: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Transport: confignet.TransportTypeTCP,
},
},
}

server, err := StartGRPCServer(params)
Expand Down
47 changes: 19 additions & 28 deletions cmd/collector/app/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (
"context"
"net"
"net/http"
"time"

"github.com/gorilla/mux"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

Expand All @@ -25,43 +28,31 @@ import (

// HTTPServerParams to construct a new Jaeger Collector HTTP Server
type HTTPServerParams struct {
TLSConfig *configtls.ServerConfig
HostPort string
confighttp.ServerConfig
Handler handler.JaegerBatchesHandler
SamplingProvider samplingstrategy.Provider
MetricsFactory metrics.Factory
HealthCheck *healthcheck.HealthCheck
Logger *zap.Logger

// ReadTimeout sets the respective parameter of http.Server
ReadTimeout time.Duration
// ReadHeaderTimeout sets the respective parameter of http.Server
ReadHeaderTimeout time.Duration
// IdleTimeout sets the respective parameter of http.Server
IdleTimeout time.Duration
}

// StartHTTPServer based on the given parameters
func StartHTTPServer(params *HTTPServerParams) (*http.Server, error) {
params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.HostPort))
params.Logger.Info("Starting jaeger-collector HTTP server", zap.String("http host-port", params.Endpoint))

errorLog, _ := zap.NewStdLogAt(params.Logger, zapcore.ErrorLevel)
server := &http.Server{
Addr: params.HostPort,
ReadTimeout: params.ReadTimeout,
ReadHeaderTimeout: params.ReadHeaderTimeout,
IdleTimeout: params.IdleTimeout,
ErrorLog: errorLog,
}
if params.TLSConfig != nil {
tlsCfg, err := params.TLSConfig.LoadTLSConfig(context.Background())
if err != nil {
return nil, err
}
server.TLSConfig = tlsCfg
server, err := params.ToServer(context.Background(), nil, component.TelemetrySettings{
Logger: params.Logger,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}, nil)
server.ErrorLog = errorLog
if err != nil {
return nil, err
}

listener, err := net.Listen("tcp", params.HostPort)
listener, err := params.ToListener(context.Background())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -91,8 +82,8 @@ func serveHTTP(server *http.Server, listener net.Listener, params *HTTPServerPar
server.Handler = httpmetrics.Wrap(recoveryHandler(r), params.MetricsFactory, params.Logger)
go func() {
var err error
if params.TLSConfig != nil {
err = server.ServeTLS(listener, "", "")
if params.TLSSetting != nil {
err = server.ServeTLS(listener,params.TLSSetting.CertFile, params.TLSSetting.KeyFile)
} else {
err = server.Serve(listener)
}
Expand Down
Loading

0 comments on commit a55d3a2

Please sign in to comment.