From d688101e37b44f096c6ab8e8ee87d722bdee01f7 Mon Sep 17 00:00:00 2001 From: Jakub Daleki Date: Thu, 13 Jun 2024 15:50:06 +0200 Subject: [PATCH] implement multi topic writer --- exporter/googlecloudpubsubexporter/config.go | 2 + .../googlecloudpubsubexporter/config_test.go | 1 + .../googlecloudpubsubexporter/exporter.go | 71 +++++++++++++++++-- exporter/googlecloudpubsubexporter/factory.go | 7 +- .../testdata/config.yaml | 1 + 5 files changed, 74 insertions(+), 8 deletions(-) diff --git a/exporter/googlecloudpubsubexporter/config.go b/exporter/googlecloudpubsubexporter/config.go index 4b2979a7d9b1..9e061e123198 100644 --- a/exporter/googlecloudpubsubexporter/config.go +++ b/exporter/googlecloudpubsubexporter/config.go @@ -30,6 +30,8 @@ type Config struct { // The fully qualified resource name of the Pubsub topic Topic string `mapstructure:"topic"` + // If true, the topic will be taken from the "gcp.topic" attribute or default if not set or invalid + TopicFromAttribute bool `mapstructure:"topic_from_attribute"` // Compression of the payload (only gzip or is supported, no compression is the default) Compression string `mapstructure:"compression"` // Watermark defines the watermark (the ce-time attribute on the message) behavior diff --git a/exporter/googlecloudpubsubexporter/config_test.go b/exporter/googlecloudpubsubexporter/config_test.go index cdbb948ca8f9..cf068032947b 100644 --- a/exporter/googlecloudpubsubexporter/config_test.go +++ b/exporter/googlecloudpubsubexporter/config_test.go @@ -42,6 +42,7 @@ func TestLoadConfig(t *testing.T) { Timeout: 20 * time.Second, } customConfig.Topic = "projects/my-project/topics/otlp-topic" + customConfig.TopicFromAttribute = true customConfig.Compression = "gzip" customConfig.Watermark.Behavior = "earliest" customConfig.Watermark.AllowedDrift = time.Hour diff --git a/exporter/googlecloudpubsubexporter/exporter.go b/exporter/googlecloudpubsubexporter/exporter.go index c3c5a0ddacfb..e1315e5eb127 100644 --- a/exporter/googlecloudpubsubexporter/exporter.go +++ b/exporter/googlecloudpubsubexporter/exporter.go @@ -7,6 +7,7 @@ import ( "bytes" "compress/gzip" "context" + "errors" "fmt" "time" @@ -23,7 +24,10 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -const name = "googlecloudpubsub" +const ( + name = "googlecloudpubsub" + topicAttribute = "gcp.topic" +) type pubsubExporter struct { logger *zap.Logger @@ -109,7 +113,7 @@ func (ex *pubsubExporter) generateClientOptions() (copts []option.ClientOption) return copts } -func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time) error { +func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, data []byte, watermark time.Time, topic string) error { id, err := uuid.NewRandom() if err != nil { return err @@ -144,7 +148,7 @@ func (ex *pubsubExporter) publishMessage(ctx context.Context, encoding encoding, } } _, err = ex.client.Publish(ctx, &pubsubpb.PublishRequest{ - Topic: ex.config.Topic, + Topic: topic, Messages: []*pubsubpb.PubsubMessage{ { Attributes: attributes, @@ -177,7 +181,7 @@ func (ex *pubsubExporter) consumeTraces(ctx context.Context, traces ptrace.Trace if err != nil { return err } - return ex.publishMessage(ctx, otlpProtoTrace, buffer, ex.tracesWatermarkFunc(traces, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) + return ex.publishMessage(ctx, otlpProtoTrace, buffer, ex.tracesWatermarkFunc(traces, time.Now(), ex.config.Watermark.AllowedDrift).UTC(), ex.config.Topic) } func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Metrics) error { @@ -185,7 +189,7 @@ func (ex *pubsubExporter) consumeMetrics(ctx context.Context, metrics pmetric.Me if err != nil { return err } - return ex.publishMessage(ctx, otlpProtoMetric, buffer, ex.metricsWatermarkFunc(metrics, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) + return ex.publishMessage(ctx, otlpProtoMetric, buffer, ex.metricsWatermarkFunc(metrics, time.Now(), ex.config.Watermark.AllowedDrift).UTC(), ex.config.Topic) } func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error { @@ -193,5 +197,60 @@ func (ex *pubsubExporter) consumeLogs(ctx context.Context, logs plog.Logs) error if err != nil { return err } - return ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC()) + return ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC(), ex.config.Topic) +} + +func (ex *pubsubExporter) consumeLogsDynamicTopic(ctx context.Context, logs plog.Logs) error { + var errs []error + entries := ex.createEntries(logs) + for topic, entry := range entries { + log := plog.NewLogs() + lrs := log.ResourceLogs().AppendEmpty(). + ScopeLogs().AppendEmpty(). + LogRecords() + + entry.CopyTo(lrs) + buffer, err := ex.logsMarshaler.MarshalLogs(log) + if err != nil { + errs = append(errs, err) + continue + } + err = ex.publishMessage(ctx, otlpProtoLog, buffer, ex.logsWatermarkFunc(logs, time.Now(), ex.config.Watermark.AllowedDrift).UTC(), topic) + if err != nil { + errs = append(errs, err) + } + } + + return errors.Join(errs...) +} + +func (ex *pubsubExporter) createEntries(ld plog.Logs) map[string]plog.LogRecordSlice { + entries := make(map[string]plog.LogRecordSlice) + for i := 0; i < ld.ResourceLogs().Len(); i++ { + rl := ld.ResourceLogs().At(i) + for j := 0; j < rl.ScopeLogs().Len(); j++ { + sl := rl.ScopeLogs().At(j) + for k := 0; k < sl.LogRecords().Len(); k++ { + log := sl.LogRecords().At(k) + topicAttr, ok := log.Attributes().Get(topicAttribute) + topic := ex.config.Topic + // if present and valid override with dynamic topic + if ok && topicMatcher.MatchString(topicAttr.AsString()) { + topic = topicAttr.AsString() + } + ex.appendEntry(topic, log, entries) + } + } + } + + return entries +} + +func (ex *pubsubExporter) appendEntry(topic string, log plog.LogRecord, entries map[string]plog.LogRecordSlice) { + entry, ok := entries[topic] + if !ok { + entry = plog.NewLogRecordSlice() + entries[topic] = entry + } + log.CopyTo(entry.AppendEmpty()) } diff --git a/exporter/googlecloudpubsubexporter/factory.go b/exporter/googlecloudpubsubexporter/factory.go index 4199a1e7d9c6..5b56dd7d6d05 100644 --- a/exporter/googlecloudpubsubexporter/factory.go +++ b/exporter/googlecloudpubsubexporter/factory.go @@ -131,12 +131,15 @@ func createLogsExporter( pCfg := cfg.(*Config) pubsubExporter := ensureExporter(set, pCfg) - + consumeLogsFunc := pubsubExporter.consumeLogs + if pCfg.TopicFromAttribute { + consumeLogsFunc = pubsubExporter.consumeLogsDynamicTopic + } return exporterhelper.NewLogsExporter( ctx, set, cfg, - pubsubExporter.consumeLogs, + consumeLogsFunc, exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), exporterhelper.WithTimeout(pCfg.TimeoutSettings), exporterhelper.WithRetry(pCfg.BackOffConfig), diff --git a/exporter/googlecloudpubsubexporter/testdata/config.yaml b/exporter/googlecloudpubsubexporter/testdata/config.yaml index 63e43036e294..077972d45ba0 100644 --- a/exporter/googlecloudpubsubexporter/testdata/config.yaml +++ b/exporter/googlecloudpubsubexporter/testdata/config.yaml @@ -4,6 +4,7 @@ googlecloudpubsub/customname: user_agent: opentelemetry-collector-contrib {{version}} timeout: 20s topic: projects/my-project/topics/otlp-topic + topic_from_attribute: true compression: gzip watermark: behavior: earliest