Skip to content

Commit

Permalink
Add an internal sharedcomponent to be shared by receivers with shared…
Browse files Browse the repository at this point in the history
… resources (#3198)

* Add an internal sharedcomponent to be shared by receivers with shared resources

Use the new code in OTLP receiver.

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>

* Add comments to sharedcomponent package

Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu authored May 17, 2021
1 parent ce82946 commit a19d6ce
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 86 deletions.
87 changes: 87 additions & 0 deletions internal/sharedcomponent/sharedcomponent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package sharedcomponent exposes util functionality for receivers and exporters
// that need to share state between different signal types instances such as net.Listener or os.File.
package sharedcomponent

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
)

// SharedComponents a map that keeps reference of all created instances for a given configuration,
// and ensures that the shared state is started and stopped only once.
type SharedComponents struct {
comps map[interface{}]*SharedComponent
}

// NewSharedComponents returns a new empty SharedComponents.
func NewSharedComponents() *SharedComponents {
return &SharedComponents{
comps: make(map[interface{}]*SharedComponent),
}
}

// GetOrAdd returns the already created instance if exists, otherwise creates a new instance
// and adds it to the map of references.
func (scs *SharedComponents) GetOrAdd(key interface{}, create func() component.Component) *SharedComponent {
if c, ok := scs.comps[key]; ok {
return c
}
newComp := &SharedComponent{
Component: create(),
removeFunc: func() {
delete(scs.comps, key)
},
}
scs.comps[key] = newComp
return newComp
}

// SharedComponent ensures that the wrapped component is started and stopped only once.
// When stopped it is removed from the SharedComponents map.
type SharedComponent struct {
component.Component

startOnce sync.Once
stopOnce sync.Once
removeFunc func()
}

// Unwrap returns the original component.
func (r *SharedComponent) Unwrap() component.Component {
return r.Component
}

// Start implements component.Component.
func (r *SharedComponent) Start(ctx context.Context, host component.Host) error {
var err error
r.startOnce.Do(func() {
err = r.Component.Start(ctx, host)
})
return err
}

// Shutdown implements component.Component.
func (r *SharedComponent) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = r.Component.Shutdown(ctx)
r.removeFunc()
})
return err
}
79 changes: 79 additions & 0 deletions internal/sharedcomponent/sharedcomponent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sharedcomponent

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenthelper"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
)

var id = config.NewID("test")

func TestNewSharedComponents(t *testing.T) {
comps := NewSharedComponents()
assert.Len(t, comps.comps, 0)
}

func TestSharedComponents_GetOrAdd(t *testing.T) {
nop := componenthelper.New()
createNop := func() component.Component { return nop }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createNop)
assert.Len(t, comps.comps, 1)
assert.Same(t, nop, got.Unwrap())
assert.Same(t, got, comps.GetOrAdd(id, createNop))

// Shutdown nop will remove
assert.NoError(t, got.Shutdown(context.Background()))
assert.Len(t, comps.comps, 0)
assert.NotSame(t, got, comps.GetOrAdd(id, createNop))
}

func TestSharedComponent(t *testing.T) {
wantErr := errors.New("my error")
calledStart := 0
calledStop := 0
comp := componenthelper.New(
componenthelper.WithStart(func(ctx context.Context, host component.Host) error {
calledStart++
return wantErr
}), componenthelper.WithShutdown(func(ctx context.Context) error {
calledStop++
return wantErr
}))
createComp := func() component.Component { return comp }

comps := NewSharedComponents()
got := comps.GetOrAdd(id, createComp)
assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
// Second time is not called anymore.
assert.NoError(t, got.Start(context.Background(), componenttest.NewNopHost()))
assert.Equal(t, 1, calledStart)
assert.Equal(t, wantErr, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
// Second time is not called anymore.
assert.NoError(t, got.Shutdown(context.Background()))
assert.Equal(t, 1, calledStop)
}
39 changes: 14 additions & 25 deletions receiver/otlpreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ package otlpreceiver
import (
"context"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/confighttp"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/internal/sharedcomponent"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)

Expand Down Expand Up @@ -73,9 +72,11 @@ func createTracesReceiver(
cfg config.Receiver,
nextConsumer consumer.Traces,
) (component.TracesReceiver, error) {
r := createReceiver(cfg, params.Logger)
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), params.Logger)
})

if err := r.registerTraceConsumer(ctx, nextConsumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerTraceConsumer(ctx, nextConsumer); err != nil {
return nil, err
}
return r, nil
Expand All @@ -88,9 +89,11 @@ func createMetricsReceiver(
cfg config.Receiver,
consumer consumer.Metrics,
) (component.MetricsReceiver, error) {
r := createReceiver(cfg, params.Logger)
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), params.Logger)
})

if err := r.registerMetricsConsumer(ctx, consumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerMetricsConsumer(ctx, consumer); err != nil {
return nil, err
}
return r, nil
Expand All @@ -103,34 +106,20 @@ func createLogReceiver(
cfg config.Receiver,
consumer consumer.Logs,
) (component.LogsReceiver, error) {
r := createReceiver(cfg, params.Logger)
r := receivers.GetOrAdd(cfg, func() component.Component {
return newOtlpReceiver(cfg.(*Config), params.Logger)
})

if err := r.registerLogsConsumer(ctx, consumer); err != nil {
if err := r.Unwrap().(*otlpReceiver).registerLogsConsumer(ctx, consumer); err != nil {
return nil, err
}
return r, nil
}

func createReceiver(cfg config.Receiver, logger *zap.Logger) *otlpReceiver {
rCfg := cfg.(*Config)

// There must be one receiver for both metrics and traces. We maintain a map of
// receivers per config.

// Check to see if there is already a receiver for this config.
receiver, ok := receivers[rCfg]
if !ok {
// We don't have a receiver, so create one.
receiver = newOtlpReceiver(rCfg, logger)
receivers[rCfg] = receiver
}
return receiver
}

// This is the map of already created OTLP receivers for particular configurations.
// We maintain this map because the Factory is asked trace and metric receivers separately
// when it gets CreateTracesReceiver() and CreateMetricsReceiver() but they must not
// create separate objects, they must use one otlpReceiver object per configuration.
// When the receiver is shutdown it should be removed from this map so the same configuration
// can be recreated successfully.
var receivers = map[*Config]*otlpReceiver{}
var receivers = sharedcomponent.NewSharedComponents()
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestCreateTracesReceiver(t *testing.T) {
require.NotNil(t, tr)
if tt.wantErr {
assert.Error(t, tr.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, tr.Shutdown(context.Background()))
} else {
assert.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, tr.Shutdown(context.Background()))
Expand Down Expand Up @@ -306,11 +307,11 @@ func TestCreateLogReceiver(t *testing.T) {

if tt.wantStartErr {
assert.Error(t, mr.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mr.Shutdown(context.Background()))
} else {
require.NoError(t, mr.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, mr.Shutdown(context.Background()))
}
receivers = map[*Config]*otlpReceiver{}
})
}
}
36 changes: 8 additions & 28 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package otlpreceiver

import (
"context"
"errors"
"net"
"net/http"
"sync"
Expand Down Expand Up @@ -48,9 +47,6 @@ type otlpReceiver struct {
traceReceiver *trace.Receiver
metricsReceiver *metrics.Receiver
logReceiver *logs.Receiver

stopOnce sync.Once
startServerOnce sync.Once
shutdownWG sync.WaitGroup

logger *zap.Logger
Expand Down Expand Up @@ -175,38 +171,22 @@ func (r *otlpReceiver) startProtocolServers(host component.Host) error {
// Start runs the trace receiver on the gRPC server. Currently
// it also enables the metrics receiver too.
func (r *otlpReceiver) Start(_ context.Context, host component.Host) error {
if r.traceReceiver == nil && r.metricsReceiver == nil && r.logReceiver == nil {
return errors.New("cannot start receiver: no consumers were specified")
}

var err error
r.startServerOnce.Do(func() {
err = r.startProtocolServers(host)
})
return err
return r.startProtocolServers(host)
}

// Shutdown is a method to turn off receiving.
func (r *otlpReceiver) Shutdown(ctx context.Context) error {
var err error
r.stopOnce.Do(func() {
err = nil

if r.serverHTTP != nil {
err = r.serverHTTP.Shutdown(ctx)
}

if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}
if r.serverHTTP != nil {
err = r.serverHTTP.Shutdown(ctx)
}

r.shutdownWG.Wait()
if r.serverGRPC != nil {
r.serverGRPC.GracefulStop()
}

// delete the receiver from the map so it doesn't leak and it becomes possible to create
// another instance with the same configuration that functions properly. Notice that an
// OTLP object can only be started and shutdown once.
delete(receivers, r.cfg)
})
r.shutdownWG.Wait()
return err
}

Expand Down
Loading

0 comments on commit a19d6ce

Please sign in to comment.