Skip to content

Commit

Permalink
Use sharedcomponent package helper for OC receiver (open-telemetry#3228)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored and dashpole committed Jun 14, 2021
1 parent eae2829 commit 5ddb17d
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 90 deletions.
49 changes: 18 additions & 31 deletions receiver/opencensusreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/sharedcomponent"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

Expand Down Expand Up @@ -56,12 +57,17 @@ func createTracesReceiver(
cfg config.Receiver,
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
r, err := createReceiver(cfg)
var err error
r := receivers.GetOrAdd(cfg, func() component.Component {
rCfg := cfg.(*Config)
var recv *ocReceiver
recv, err = newOpenCensusReceiver(rCfg.ID(), rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, rCfg.buildOptions()...)
return recv
})
if err != nil {
return nil, err
}

r.traceConsumer = nextConsumer
r.Unwrap().(*ocReceiver).traceConsumer = nextConsumer

return r, nil
}
Expand All @@ -72,42 +78,23 @@ func createMetricsReceiver(
cfg config.Receiver,
nextConsumer consumer.Metrics,
) (component.MetricsReceiver, error) {
r, err := createReceiver(cfg)
var err error
r := receivers.GetOrAdd(cfg, func() component.Component {
rCfg := cfg.(*Config)
var recv *ocReceiver
recv, err = newOpenCensusReceiver(rCfg.ID(), rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, rCfg.buildOptions()...)
return recv
})
if err != nil {
return nil, err
}

r.metricsConsumer = nextConsumer
r.Unwrap().(*ocReceiver).metricsConsumer = nextConsumer

return r, nil
}

func createReceiver(cfg config.Receiver) (*ocReceiver, error) {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := receivers[rCfg]
if !ok {
// Build the configuration options.
opts := rCfg.buildOptions()

// We don't have a receiver, so create one.
var err error
receiver, err = newOpenCensusReceiver(rCfg.ID(), rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, opts...)
if err != nil {
return nil, err
}
// Remember the receiver in the map
receivers[rCfg] = receiver
}
return receiver, nil
}

// This is the map of already created OpenCensus receivers for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one ocReceiver object per configuration.
var receivers = map[*Config]*ocReceiver{}
var receivers = sharedcomponent.NewSharedComponents()
110 changes: 51 additions & 59 deletions receiver/opencensusreceiver/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ type ocReceiver struct {
traceConsumer consumer.Traces
metricsConsumer consumer.Metrics

stopOnce sync.Once
startServerOnce sync.Once
startTracesReceiverOnce sync.Once
startMetricsReceiverOnce sync.Once

Expand Down Expand Up @@ -191,22 +189,21 @@ func (ocr *ocReceiver) Shutdown(context.Context) error {
defer ocr.mu.Unlock()

var err error
ocr.stopOnce.Do(func() {
if ocr.serverHTTP != nil {
_ = ocr.serverHTTP.Close()
}
if ocr.serverHTTP != nil {
err = ocr.serverHTTP.Close()
}

if ocr.ln != nil {
_ = ocr.ln.Close()
}
if ocr.ln != nil {
_ = ocr.ln.Close()
}

// TODO: @(odeke-em) investigate what utility invoking (*grpc.Server).Stop()
// gives us yet we invoke (net.Listener).Close().
// Sure (*grpc.Server).Stop() enables proper shutdown but imposes
// a painful and artificial wait time that goes into 20+seconds yet most of our
// tests and code should be reactive in less than even 1second.
// ocr.serverGRPC.Stop()

// TODO: @(odeke-em) investigate what utility invoking (*grpc.Server).Stop()
// gives us yet we invoke (net.Listener).Close().
// Sure (*grpc.Server).Stop() enables proper shutdown but imposes
// a painful and artificial wait time that goes into 20+seconds yet most of our
// tests and code should be reactive in less than even 1second.
// ocr.serverGRPC.Stop()
})
return err
}

Expand All @@ -227,50 +224,45 @@ func (ocr *ocReceiver) httpServer() *http.Server {
}

func (ocr *ocReceiver) startServer(host component.Host) error {
var err error
ocr.startServerOnce.Do(func() {
// Register the grpc-gateway on the HTTP server mux
c := context.Background()
opts := []grpc.DialOption{grpc.WithInsecure()}
endpoint := ocr.ln.Addr().String()

_, ok := ocr.ln.(*net.UnixListener)
if ok {
endpoint = "unix:" + endpoint
}
// Register the grpc-gateway on the HTTP server mux
c := context.Background()
opts := []grpc.DialOption{grpc.WithInsecure()}
endpoint := ocr.ln.Addr().String()

_, ok := ocr.ln.(*net.UnixListener)
if ok {
endpoint = "unix:" + endpoint
}

err = agenttracepb.RegisterTraceServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts)
if err != nil {
return
}
if err := agenttracepb.RegisterTraceServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil {
return err
}

err = agentmetricspb.RegisterMetricsServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts)
if err != nil {
return
}
if err := agentmetricspb.RegisterMetricsServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil {
return err
}

// Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port.
m := cmux.New(ocr.ln)
grpcL := m.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"))

httpL := m.Match(cmux.Any())
go func() {
if errGrpc := ocr.serverGRPC.Serve(grpcL); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
go func() {
if errHTTP := ocr.httpServer().Serve(httpL); errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
go func() {
if errServe := m.Serve(); errServe != nil {
host.ReportFatalError(errServe)
}
}()
})
return err
// Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port.
m := cmux.New(ocr.ln)
grpcL := m.MatchWithWriters(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"),
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"))

httpL := m.Match(cmux.Any())
go func() {
if errGrpc := ocr.serverGRPC.Serve(grpcL); errGrpc != nil {
host.ReportFatalError(errGrpc)
}
}()
go func() {
if errHTTP := ocr.httpServer().Serve(httpL); errHTTP != http.ErrServerClosed {
host.ReportFatalError(errHTTP)
}
}()
go func() {
if errServe := m.Serve(); errServe != nil {
host.ReportFatalError(errServe)
}
}()
return nil
}

0 comments on commit 5ddb17d

Please sign in to comment.