Skip to content

Commit

Permalink
[receiver/cloudfoundryreceiver] WIP adding functionality to receive l…
Browse files Browse the repository at this point in the history
…ogs to the receiver

Co-authored-by: Jose Riguera <jose.riguera@springer.com>
Co-authored-by: Sam Clulow <sam.clulow@springernature.com>
  • Loading branch information
3 people committed May 2, 2024
1 parent 37b090f commit ec3815c
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 31 deletions.
14 changes: 13 additions & 1 deletion receiver/cloudfoundryreceiver/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"code.cloudfoundry.org/go-loggregator/rpc/loggregator_v2"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
)

Expand All @@ -19,7 +20,6 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
namePrefix := envelope.Tags["origin"] + "."

switch message := envelope.Message.(type) {
case *loggregator_v2.Envelope_Log:
case *loggregator_v2.Envelope_Counter:
metric := metricSlice.AppendEmpty()
metric.SetName(namePrefix + message.Counter.GetName())
Expand All @@ -41,6 +41,18 @@ func convertEnvelopeToMetrics(envelope *loggregator_v2.Envelope, metricSlice pme
}
}

func convertEnvelopeToLogs(envelope *loggregator_v2.Envelope, logSlice plog.LogRecordSlice, startTime time.Time) {
switch envelope.Message.(type) {
case *loggregator_v2.Envelope_Log:
// TODO: review log attributes and tags from the envelope
log := logSlice.AppendEmpty()
log.SetTimestamp(pcommon.Timestamp(envelope.GetTimestamp()))
log.Body().SetStr(string(envelope.GetLog().GetPayload()))
log.SetSeverityNumber(plog.SeverityNumber(envelope.GetLog().GetType()))
copyEnvelopeAttributes(log.Attributes(), envelope)
}
}

func copyEnvelopeAttributes(attributes pcommon.Map, envelope *loggregator_v2.Envelope) {
for key, value := range envelope.Tags {
attributes.PutStr(attributeNamePrefix+key, value)
Expand Down
15 changes: 13 additions & 2 deletions receiver/cloudfoundryreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func NewFactory() receiver.Factory {
return receiver.NewFactory(
metadata.Type,
createDefaultConfig,
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability))
receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability),
receiver.WithLogs(createLogsReceiver, component.StabilityLevelBeta))
}

func createDefaultConfig() component.Config {
Expand Down Expand Up @@ -61,5 +62,15 @@ func createMetricsReceiver(
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
c := cfg.(*Config)
return newCloudFoundryReceiver(params, *c, nextConsumer)
return newCloudFoundryMetricsReceiver(params, *c, nextConsumer)
}

func createLogsReceiver(
_ context.Context,
params receiver.CreateSettings,
cfg component.Config,
nextConsumer consumer.Logs,
) (receiver.Logs, error) {
c := cfg.(*Config)
return newCloudFoundryLogsReceiver(params, *c, nextConsumer)
}
127 changes: 112 additions & 15 deletions receiver/cloudfoundryreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import (
"code.cloudfoundry.org/go-loggregator"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
)
Expand All @@ -25,23 +27,67 @@ const (
)

var _ receiver.Metrics = (*cloudFoundryReceiver)(nil)
var _ receiver.Logs = (*cloudFoundryReceiver)(nil)

type telemetryType int64

const (
telemetryTypeMetrics telemetryType = iota
telemetryTypeLogs
telemetryTypeTraces
)

type telemetrySlice struct {
sliceType telemetryType
metricSlice pmetric.MetricSlice
logSlice plog.LogRecordSlice
traceSlice ptrace.SpanSlice
}

// newCloudFoundryReceiver implements the receiver.Metrics for Cloud Foundry protocol.
type cloudFoundryReceiver struct {
settings component.TelemetrySettings
cancel context.CancelFunc
config Config
nextConsumer consumer.Metrics
obsrecv *receiverhelper.ObsReport
goroutines sync.WaitGroup
receiverStartTime time.Time
settings component.TelemetrySettings
cancel context.CancelFunc
config Config
nextMetricsConsumer consumer.Metrics
nextLogsConsumer consumer.Logs
nextTracesConsumer consumer.Traces
obsrecv *receiverhelper.ObsReport
telemetryType telemetryType
goroutines sync.WaitGroup
receiverStartTime time.Time
}

// newCloudFoundryReceiver creates the Cloud Foundry receiver with the given parameters.
func newCloudFoundryReceiver(
func newCloudFoundryMetricsReceiver(
settings receiver.CreateSettings,
config Config,
nextConsumer consumer.Metrics) (*cloudFoundryReceiver, error) {

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: settings.ID,
Transport: transport,
ReceiverCreateSettings: settings,
})
if err != nil {
return nil, err
}

result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
nextMetricsConsumer: nextConsumer,
telemetryType: telemetryTypeMetrics,
obsrecv: obsrecv,
receiverStartTime: time.Now(),
}
return result, nil
}

func newCloudFoundryLogsReceiver(
settings receiver.CreateSettings,
config Config,
nextConsumer consumer.Metrics) (receiver.Metrics, error) {
nextConsumer consumer.Logs) (*cloudFoundryReceiver, error) {

obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: settings.ID,
Expand All @@ -52,13 +98,15 @@ func newCloudFoundryReceiver(
return nil, err
}

return &cloudFoundryReceiver{
result := &cloudFoundryReceiver{
settings: settings.TelemetrySettings,
config: config,
nextConsumer: nextConsumer,
nextLogsConsumer: nextConsumer,
telemetryType: telemetryTypeLogs,
obsrecv: obsrecv,
receiverStartTime: time.Now(),
}, nil
}
return result, nil
}

func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -93,13 +141,22 @@ func (cfr *cloudFoundryReceiver) Start(ctx context.Context, host component.Host)
return
}

envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID)
envelopeStream, err := streamFactory.CreateStream(innerCtx, cfr.config.RLPGateway.ShardID, cfr.telemetryType)
if err != nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(fmt.Errorf("creating RLP gateway envelope stream: %w", err)))
return
}

cfr.streamMetrics(innerCtx, envelopeStream)
switch cfr.telemetryType {
case telemetryTypeMetrics:
cfr.streamMetrics(innerCtx, envelopeStream)
case telemetryTypeLogs:
cfr.streamLogs(innerCtx, envelopeStream)
case telemetryTypeTraces:
//TODO
//cfr.streamTelemetry(innerCtx, envelopeStream)
}

cfr.settings.Logger.Debug("cloudfoundry metrics streamer stopped")
}()

Expand Down Expand Up @@ -144,12 +201,42 @@ func (cfr *cloudFoundryReceiver) streamMetrics(

if libraryMetrics.Len() > 0 {
obsCtx := cfr.obsrecv.StartMetricsOp(ctx)
err := cfr.nextConsumer.ConsumeMetrics(ctx, metrics)
err := cfr.nextMetricsConsumer.ConsumeMetrics(ctx, metrics)
cfr.obsrecv.EndMetricsOp(obsCtx, dataFormat, metrics.DataPointCount(), err)
}
}
}

func (cfr *cloudFoundryReceiver) streamLogs(
ctx context.Context,
stream loggregator.EnvelopeStream) {

for {
envelopes := stream()
if envelopes == nil {
if ctx.Err() == nil {
cfr.settings.ReportStatus(component.NewFatalErrorEvent(errors.New("RLP gateway streamer shut down due to an error")))
}
break
}

logs := plog.NewLogs()
libraryLogs := createLibraryLogsSlice(logs)

for _, envelope := range envelopes {
if envelope != nil {
convertEnvelopeToLogs(envelope, libraryLogs, cfr.receiverStartTime)
}
}

if libraryLogs.Len() > 0 {
obsCtx := cfr.obsrecv.StartLogsOp(ctx)
err := cfr.nextLogsConsumer.ConsumeLogs(ctx, logs)
cfr.obsrecv.EndLogsOp(obsCtx, dataFormat, logs.LogRecordCount(), err)
}
}
}

func createLibraryMetricsSlice(metrics pmetric.Metrics) pmetric.MetricSlice {
resourceMetrics := metrics.ResourceMetrics()
resourceMetric := resourceMetrics.AppendEmpty()
Expand All @@ -159,3 +246,13 @@ func createLibraryMetricsSlice(metrics pmetric.Metrics) pmetric.MetricSlice {
libraryMetrics.Scope().SetName(instrumentationLibName)
return libraryMetrics.Metrics()
}

func createLibraryLogsSlice(logs plog.Logs) plog.LogRecordSlice {
resourceLogs := logs.ResourceLogs()
resourceLog := resourceLogs.AppendEmpty()
resourceLog.Resource().Attributes()
libraryLogsSlice := resourceLog.ScopeLogs()
libraryLogs := libraryLogsSlice.AppendEmpty()
libraryLogs.Scope().SetName(instrumentationLibName)
return libraryLogs.LogRecords()
}
40 changes: 27 additions & 13 deletions receiver/cloudfoundryreceiver/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,22 +46,36 @@ func newEnvelopeStreamFactory(

func (rgc *EnvelopeStreamFactory) CreateStream(
ctx context.Context,
shardID string) (loggregator.EnvelopeStream, error) {
shardID string,
telemetryType telemetryType) (loggregator.EnvelopeStream, error) {

stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{
ShardId: shardID,
Selectors: []*loggregator_v2.Selector{
{
Message: &loggregator_v2.Selector_Counter{
Counter: &loggregator_v2.CounterSelector{},
},
newShardID := shardID
selectors := []*loggregator_v2.Selector{}
switch telemetryType {
case telemetryTypeLogs:
newShardID = shardID + "_logs"
selectors = append(selectors, &loggregator_v2.Selector{
Message: &loggregator_v2.Selector_Log{
Log: &loggregator_v2.LogSelector{},
},
})
case telemetryTypeMetrics:
newShardID = shardID + "_metrics"
selectors = append(selectors, &loggregator_v2.Selector{
Message: &loggregator_v2.Selector_Counter{
Counter: &loggregator_v2.CounterSelector{},
},
{
Message: &loggregator_v2.Selector_Gauge{
Gauge: &loggregator_v2.GaugeSelector{},
},
})
selectors = append(selectors, &loggregator_v2.Selector{
Message: &loggregator_v2.Selector_Gauge{
Gauge: &loggregator_v2.GaugeSelector{},
},
},
})
}

stream := rgc.rlpGatewayClient.Stream(ctx, &loggregator_v2.EgressBatchRequest{
ShardId: newShardID,
Selectors: selectors,
})

return stream, nil
Expand Down

0 comments on commit ec3815c

Please sign in to comment.