Skip to content

Commit

Permalink
[receiver/cloudfoundryreceiver] Refactor CreateStream in two functions
Browse files Browse the repository at this point in the history
* Original CreateStream fucntion was returning a tuple `(loggregator.EnvelopeStream, error)`, but error was always nil, because the upstream funcion from loggregator `rgc.rlpGatewayClient.Stream` does not fail.
* No need to check the error in tests and in the receiver (it was always nil)
* CreateStream splitted in 2 funcions, each applies the filters each one needs, no need to pass aditional parameters to select filters.
  • Loading branch information
jriguera committed May 24, 2024
1 parent fba8d9a commit 0a6b517
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 71 deletions.
59 changes: 23 additions & 36 deletions receiver/cloudfoundryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ const (
var _ receiver.Metrics = (*cloudFoundryReceiver)(nil)
var _ receiver.Logs = (*cloudFoundryReceiver)(nil)

type telemetryType int8

const (
telemetryTypeMetrics telemetryType = iota
telemetryTypeLogs
telemetryTypeTraces
)

// newCloudFoundryReceiver implements the receiver.Metrics for Cloud Foundry protocol.
type cloudFoundryReceiver struct {
settings component.TelemetrySettings
Expand All @@ -62,7 +54,6 @@ func newCloudFoundryMetricsReceiver(
if err != nil {
return nil, err
}

result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
Expand All @@ -86,7 +77,6 @@ func newCloudFoundryLogsReceiver(
if err != nil {
return nil, err
}

result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
Expand All @@ -98,11 +88,15 @@ func newCloudFoundryLogsReceiver(
}

func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host) error {
tokenProvider, tokenErr := newUAATokenProvider(cfr.settings.Logger, cfr.config.UAA.LimitedClientConfig, cfr.config.UAA.Username, string(cfr.config.UAA.Password))
tokenProvider, tokenErr := newUAATokenProvider(
cfr.settings.Logger,
cfr.config.UAA.LimitedClientConfig,
cfr.config.UAA.Username,
string(cfr.config.UAA.Password),
)
if tokenErr != nil {
return fmt.Errorf("create cloud foundry UAA token provider: %w", tokenErr)
}

streamFactory, streamErr := newEnvelopeStreamFactory(
ctx,
cfr.settings,
Expand All @@ -116,35 +110,27 @@ func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host)

innerCtx, cancel := context.WithCancel(ctx)
cfr.cancel = cancel

cfr.goroutines.Add(1)

go func() {
defer cfr.goroutines.Done()
cfr.settings.Logger.Debug("cloud foundry receiver starting")
_, tokenErr = tokenProvider.ProvideToken()
if tokenErr != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("cloud foundry receiver failed to fetch initial token from UAA: %w", tokenErr)))
cfr.settings.ReportStatus(
component.NewFatalErrorEvent(
fmt.Errorf("cloud foundry receiver failed to fetch initial token from UAA: %w", tokenErr),
),
)
return
}
if cfr.nextLogs != nil {
envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeLogs)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope log stream receiver: %w", err)))
return
}
cfr.streamLogs(innerCtx, envelopeStream)
cfr.streamLogs(innerCtx, streamFactory.CreateMetricsStream(innerCtx, cfr.config.RLPGateway.ShardID))
} else if cfr.nextMetrics != nil {
envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, telemetryTypeMetrics)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope metrics stream receiver: %w", err)))
return
}
cfr.streamMetrics(innerCtx, envelopeStream)
cfr.streamMetrics(innerCtx, streamFactory.CreateLogsStream(innerCtx, cfr.config.RLPGateway.ShardID))
}
cfr.settings.Logger.Debug("cloudfoundry metrics streamer stopped")
}()

return nil
}

Expand All @@ -167,23 +153,23 @@ func (cfr *cloudFoundryReceiver) streamMetrics(
if envelopes == nil {
// If context has not been cancelled, then nil means the shutdown was due to an error within stream
if ctx.Err() == nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(errors.New("RLP gateway streamer shut down due to an error")))
cfr.settings.ReportStatus(
component.NewFatalErrorEvent(
errors.New("RLP gateway metrics streamer shut down due to an error"),
),
)
}

break
}

metrics := pmetric.NewMetrics()
libraryMetrics := createLibraryMetricsSlice(metrics)

for _, envelope := range envelopes {
if envelope != nil {
// There is no concept of startTime in CF loggregator, and we do not know the uptime of the component
// from which the metric originates, so just provide receiver start time as metric start time
convertEnvelopeToMetrics(envelope, libraryMetrics, cfr.receiverStartTime)
}
}

if libraryMetrics.Len() > 0 {
obsCtx := cfr.obsrecv.StartMetricsOp(ctx)
err := cfr.nextMetrics.ConsumeMetrics(ctx, metrics)
Expand All @@ -200,21 +186,22 @@ func (cfr *cloudFoundryReceiver) streamLogs(
envelopes := stream()
if envelopes == nil {
if ctx.Err() == nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(errors.New("RLP gateway streamer shut down due to an error")))
cfr.settings.ReportStatus(
component.NewFatalErrorEvent(
errors.New("RLP gateway log streamer shut down due to an error"),
),
)
}
break
}

logs := plog.NewLogs()
libraryLogs := createLibraryLogsSlice(logs)

observedTime := time.Now()
for _, envelope := range envelopes {
if envelope != nil {
_ = convertEnvelopeToLogs(envelope, libraryLogs, observedTime)
}
}

if libraryLogs.Len() > 0 {
obsCtx := cfr.obsrecv.StartLogsOp(ctx)
err := cfr.nextLogs.ConsumeLogs(ctx, logs)
Expand Down
51 changes: 28 additions & 23 deletions receiver/cloudfoundryreceiver/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,41 +44,46 @@ func newEnvelopeStreamFactory(
return &EnvelopeStreamFactory{gatewayClient}, nil
}

func (rgc *EnvelopeStreamFactory) CreateStream(
ctx context.Context,
shardID string,
telemetryType telemetryType) (loggregator.EnvelopeStream, error) {

newShardID := shardID
selectors := []*loggregator_v2.Selector{}
switch telemetryType {
case telemetryTypeLogs:
newShardID = shardID + "_logs"
selectors = append(selectors, &loggregator_v2.Selector{
Message: &loggregator_v2.Selector_Log{
Log: &loggregator_v2.LogSelector{},
},
})
case telemetryTypeMetrics:
newShardID = shardID + "_metrics"
selectors = append(selectors, &loggregator_v2.Selector{
func (rgc *EnvelopeStreamFactory) CreateMetricsStream(ctx context.Context, baseShardID string) loggregator.EnvelopeStream {
newShardID := baseShardID + "_metrics"
selectors := []*loggregator_v2.Selector{
{
Message: &loggregator_v2.Selector_Counter{
Counter: &loggregator_v2.CounterSelector{},
},
})
selectors = append(selectors, &loggregator_v2.Selector{
},
{
Message: &loggregator_v2.Selector_Gauge{
Gauge: &loggregator_v2.GaugeSelector{},
},
})
},
}

stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{
ShardId: newShardID,
Selectors: selectors,
})
return stream
}

return stream, nil
func (rgc *EnvelopeStreamFactory) CreateLogsStream(ctx context.Context, baseShardID string) loggregator.EnvelopeStream {
newShardID := baseShardID + "_logs"
selectors := []*loggregator_v2.Selector{
{
Message: &loggregator_v2.Selector_Log{
Log: &loggregator_v2.LogSelector{},
},
},
// {
// Message: &loggregator_v2.Selector_Event{
// Event: &loggregator_v2.EventSelector{},
// },
// }
}
stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{
ShardId: newShardID,
Selectors: selectors,
})
return stream
}

type authorizationProvider struct {
Expand Down
14 changes: 2 additions & 12 deletions receiver/cloudfoundryreceiver/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,8 @@ func TestValidMetricsStream(t *testing.T) {

innerCtx, cancel := context.WithCancel(context.Background())

envelopeStream, createErr := streamFactory.CreateStream(
innerCtx,
cfg.RLPGateway.ShardID,
telemetryTypeMetrics,
)
envelopeStream := streamFactory.CreateMetricsStream(innerCtx, cfg.RLPGateway.ShardID)

require.NoError(t, createErr)
require.NotNil(t, envelopeStream)

cancel()
Expand Down Expand Up @@ -76,13 +71,8 @@ func TestValidLogsStream(t *testing.T) {

innerCtx, cancel := context.WithCancel(context.Background())

envelopeStream, createErr := streamFactory.CreateStream(
innerCtx,
cfg.RLPGateway.ShardID,
telemetryTypeLogs,
)
envelopeStream := streamFactory.CreateLogsStream(innerCtx, cfg.RLPGateway.ShardID)

require.NoError(t, createErr)
require.NotNil(t, envelopeStream)

cancel()
Expand Down

0 comments on commit 0a6b517

Please sign in to comment.