Skip to content

Commit

Permalink
Merge branch 'main' into es-leak
Browse files Browse the repository at this point in the history
  • Loading branch information
Manik2708 authored Dec 12, 2024
2 parents c61ef9c + d69dad5 commit a85a30f
Show file tree
Hide file tree
Showing 27 changed files with 571 additions and 560 deletions.
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to configure query service", zap.Error(err))
}

tm := tenancy.NewManager(&cOpts.GRPC.Tenancy)
tm := tenancy.NewManager(&cOpts.Tenancy)

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
Expand Down
46 changes: 10 additions & 36 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package app
import (
"context"
"fmt"
"io"
"net/http"
"time"

Expand Down Expand Up @@ -47,13 +46,10 @@ type Collector struct {
tenancyMgr *tenancy.Manager

// state, read only
hServer *http.Server
grpcServer *grpc.Server
otlpReceiver receiver.Traces
zipkinReceiver receiver.Traces
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
hServer *http.Server
grpcServer *grpc.Server
otlpReceiver receiver.Traces
zipkinReceiver receiver.Traces
}

// CollectorParams to construct a new Jaeger Collector.
Expand Down Expand Up @@ -101,26 +97,19 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: options.GRPC.HostPort,
Handler: c.spanHandlers.GRPCHandler,
TLSConfig: options.GRPC.TLS,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
MaxReceiveMessageLength: options.GRPC.MaxReceiveMessageLength,
MaxConnectionAge: options.GRPC.MaxConnectionAge,
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
Handler: c.spanHandlers.GRPCHandler,
SamplingProvider: c.samplingProvider,
Logger: c.logger,
ServerConfig: options.GRPC,
})
if err != nil {
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer

httpServer, err := server.StartHTTPServer(&server.HTTPServerParams{
HostPort: options.HTTP.HostPort,
ServerConfig: options.HTTP,
Handler: c.spanHandlers.JaegerBatchesHandler,
TLSConfig: options.HTTP.TLS,
HealthCheck: c.hCheck,
MetricsFactory: c.metricsFactory,
SamplingProvider: c.samplingProvider,
Expand All @@ -131,11 +120,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error {
}
c.hServer = httpServer

c.tlsGRPCCertWatcherCloser = &options.GRPC.TLS
c.tlsHTTPCertWatcherCloser = &options.HTTP.TLS
c.tlsZipkinCertWatcherCloser = &options.Zipkin.TLS

if options.Zipkin.HTTPHostPort == "" {
if options.Zipkin.Endpoint == "" {
c.logger.Info("Not listening for Zipkin HTTP traffic, port not configured")
} else {
zipkinReceiver, err := handler.StartZipkinReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr)
Expand Down Expand Up @@ -209,17 +194,6 @@ func (c *Collector) Close() error {
}
}

// watchers actually never return errors from Close
if c.tlsGRPCCertWatcherCloser != nil {
_ = c.tlsGRPCCertWatcherCloser.Close()
}
if c.tlsHTTPCertWatcherCloser != nil {
_ = c.tlsHTTPCertWatcherCloser.Close()
}
if c.tlsZipkinCertWatcherCloser != nil {
_ = c.tlsZipkinCertWatcherCloser.Close()
}

return nil
}

Expand Down
69 changes: 57 additions & 12 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
Expand All @@ -27,13 +31,54 @@ import (
var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *flags.CollectorOptions {
collectorOpts := &flags.CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.Enabled = true
collectorOpts.OTLP.GRPC.HostPort = ":0"
collectorOpts.OTLP.HTTP.HostPort = ":0"
collectorOpts.Zipkin.HTTPHostPort = ":0"
collectorOpts := &flags.CollectorOptions{
HTTP: confighttp.ServerConfig{
Endpoint: ":0",
TLSSetting: &configtls.ServerConfig{},
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":0",
Transport: confignet.TransportTypeTCP,
},
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10,
},
},
},
OTLP: struct {
Enabled bool
GRPC configgrpc.ServerConfig
HTTP confighttp.ServerConfig
}{
Enabled: true,
HTTP: confighttp.ServerConfig{
Endpoint: ":0",
TLSSetting: &configtls.ServerConfig{},
},
GRPC: configgrpc.ServerConfig{
NetAddr: confignet.AddrConfig{
Endpoint: ":0",
Transport: confignet.TransportTypeTCP,
},
Keepalive: &configgrpc.KeepaliveServerConfig{
ServerParameters: &configgrpc.KeepaliveServerParameters{
MaxConnectionIdle: 10,
},
},
},
},
Zipkin: struct {
confighttp.ServerConfig
KeepAlive bool
}{
ServerConfig: confighttp.ServerConfig{
Endpoint: ":0",
},
},
Tenancy: tenancy.Options{},
}
return collectorOpts
}

Expand Down Expand Up @@ -112,23 +157,23 @@ func TestCollector_StartErrors(t *testing.T) {
var options *flags.CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
options.GRPC.NetAddr.Endpoint = ":-1"
run("gRPC", options, "could not start gRPC server")

options = optionsForEphemeralPorts()
options.HTTP.HostPort = ":-1"
options.HTTP.Endpoint = ":-1"
run("HTTP", options, "could not start HTTP server")

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
options.Zipkin.Endpoint = ":-1"
run("Zipkin", options, "could not start Zipkin receiver")

options = optionsForEphemeralPorts()
options.OTLP.GRPC.HostPort = ":-1"
options.OTLP.GRPC.NetAddr.Endpoint = ":-1"
run("OTLP/GRPC", options, "could not start OTLP receiver")

options = optionsForEphemeralPorts()
options.OTLP.HTTP.HostPort = ":-1"
options.OTLP.HTTP.Endpoint = ":-1"
run("OTLP/HTTP", options, "could not start OTLP receiver")
}

Expand Down
Loading

0 comments on commit a85a30f

Please sign in to comment.