diff --git a/.golangci.yaml b/.golangci.yaml index b49b825ba..5a07e4280 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -2,6 +2,7 @@ run: timeout: 10m skip-dirs: - ci + - collector/benthos/internal linters-settings: gci: diff --git a/collector/benthos/input/otel_log.go b/collector/benthos/input/otel_log.go new file mode 100644 index 000000000..c0c628e12 --- /dev/null +++ b/collector/benthos/input/otel_log.go @@ -0,0 +1,337 @@ +package input + +import ( + "context" + "encoding/json" + "errors" + "net" + "sync" + "time" + + "github.com/benthosdev/benthos/v4/public/service" + "github.com/samber/lo" + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + commonpb "go.opentelemetry.io/proto/otlp/common/v1" + logspb "go.opentelemetry.io/proto/otlp/logs/v1" + resourcepb "go.opentelemetry.io/proto/otlp/resource/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/openmeterio/openmeter/collector/benthos/internal/message" + "github.com/openmeterio/openmeter/collector/benthos/internal/shutdown" +) + +// TODO: add batching config and policy + +func otelLogInputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Services"). + Summary("List objects in Kubernetes."). + Fields( + service.NewStringField("address"). + Description("OTLP gRPC endpoint"), + service.NewDurationField("timeout"). + Description("Timeout for requests. If a consumed messages takes longer than this to be delivered the connection is closed, but the message may still be delivered."). + Default("5s"), + ) +} + +func init() { + err := service.RegisterBatchInput("otel_log", otelLogInputConfig(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) { + return newOtelLogInput(conf) + }) + if err != nil { + panic(err) + } +} + +type otelLogInput struct { + collogspb.UnimplementedLogsServiceServer + + address string + server *grpc.Server + + timeout time.Duration + + handlerWG sync.WaitGroup + transactions chan message.Transaction + + shutSig *shutdown.Signaller +} + +func newOtelLogInput(conf *service.ParsedConfig) (*otelLogInput, error) { + server := grpc.NewServer() + + address, err := conf.FieldString("address") + if err != nil { + return nil, err + } + + timeout, err := conf.FieldDuration("timeout") + if err != nil { + return nil, err + } + + in := &otelLogInput{ + address: address, + server: server, + + timeout: timeout, + + transactions: make(chan message.Transaction), + + shutSig: shutdown.NewSignaller(), + } + + collogspb.RegisterLogsServiceServer(server, in) + + return in, nil +} + +func (in *otelLogInput) Connect(_ context.Context) error { + ln, err := net.Listen("tcp", in.address) + if err != nil { + return err + } + + // TODO: log listening + + go in.loop(ln) + + return nil +} + +func (in *otelLogInput) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { + select { + case b, open := <-in.transactions: + if open { + return b.Payload, b.Ack, nil + } + return nil, nil, nil + case <-ctx.Done(): + return nil, nil, ctx.Err() + } +} + +func (in *otelLogInput) Close(_ context.Context) error { + in.server.Stop() + + return nil +} + +func (in *otelLogInput) Export(ctx context.Context, request *collogspb.ExportLogsServiceRequest) (*collogspb.ExportLogsServiceResponse, error) { + if in.shutSig.ShouldCloseAtLeisure() { + return nil, status.Error(codes.Unavailable, "server closing") + } + + in.handlerWG.Add(1) + defer in.handlerWG.Done() + + // TODO: add rate limit + + msg, err := in.extractMessageFromRequest(request) + if err != nil { + // h.log.Warn("Request read failed: %v\n", err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + resChan := make(chan error, 1) + select { + case in.transactions <- message.NewTransaction(msg, resChan): + case <-time.After(in.timeout): + return nil, status.Error(codes.DeadlineExceeded, "request timed out") + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "request timed out") + case <-in.shutSig.CloseAtLeisureChan(): + return nil, status.Error(codes.Unavailable, "server closing") + } + + select { + case res, open := <-resChan: + if !open { + return nil, status.Error(codes.Unavailable, "server closing") + } else if res != nil { + var berr *service.BatchError + if errors.As(res, &berr) && len(msg) > berr.IndexedErrors() { + return &collogspb.ExportLogsServiceResponse{ + PartialSuccess: &collogspb.ExportLogsPartialSuccess{ + RejectedLogRecords: int64(berr.IndexedErrors()), + ErrorMessage: berr.Error(), + }, + }, nil + } + + return nil, status.Error(codes.Internal, berr.Error()) + } + case <-time.After(in.timeout): + return nil, status.Error(codes.DeadlineExceeded, "request timed out") + case <-ctx.Done(): + return nil, status.Error(codes.DeadlineExceeded, "request timed out") + case <-in.shutSig.CloseNowChan(): + return nil, status.Error(codes.Unavailable, "server closing") + } + + return &collogspb.ExportLogsServiceResponse{}, nil +} + +type record struct { + Resource *resource `json:"resource,omitempty"` + Scope *scope `json:"scope,omitempty"` + Record *logRecord `json:"record,omitempty"` +} + +type resource struct { + Attributes map[string]any `json:"attributes,omitempty"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"` +} + +func resourceFrom(pb *resourcepb.Resource) *resource { + if pb == nil { + return nil + } + + return &resource{ + Attributes: lo.SliceToMap(pb.GetAttributes(), func(item *commonpb.KeyValue) (string, any) { + return item.GetKey(), anyFrom(item.GetValue()) + }), + DroppedAttributesCount: pb.GetDroppedAttributesCount(), + } +} + +func anyFrom(pb *commonpb.AnyValue) any { + if pb == nil { + return nil + } + + switch pb.Value.(type) { + case *commonpb.AnyValue_StringValue: + return pb.GetStringValue() + case *commonpb.AnyValue_BoolValue: + return pb.GetBoolValue() + case *commonpb.AnyValue_IntValue: + return pb.GetIntValue() + case *commonpb.AnyValue_DoubleValue: + return pb.GetDoubleValue() + case *commonpb.AnyValue_ArrayValue: + return lo.Map(pb.GetArrayValue().GetValues(), func(v *commonpb.AnyValue, _ int) any { + return anyFrom(v) + }) + case *commonpb.AnyValue_KvlistValue: + return lo.SliceToMap(pb.GetKvlistValue().GetValues(), func(item *commonpb.KeyValue) (string, any) { + return item.GetKey(), anyFrom(item.GetValue()) + }) + case *commonpb.AnyValue_BytesValue: + return pb.GetBytesValue() + } + + return nil +} + +type scope struct { + Name string `json:"name,omitempty"` + Version string `json:"version,omitempty"` + Attributes map[string]any `json:"attributes,omitempty"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"` +} + +func scopeFrom(pb *commonpb.InstrumentationScope) *scope { + if pb == nil { + return nil + } + + return &scope{ + Name: pb.GetName(), + Version: pb.GetVersion(), + Attributes: lo.SliceToMap(pb.GetAttributes(), func(item *commonpb.KeyValue) (string, any) { + return item.GetKey(), anyFrom(item.GetValue()) + }), + DroppedAttributesCount: pb.GetDroppedAttributesCount(), + } +} + +type logRecord struct { + TimeUnixNano uint64 `json:"time_unix_nano,omitempty"` + ObservedTimeUnixNano uint64 `json:"observed_time_unix_nano,omitempty"` + SeverityNumber int32 `json:"severity_number,omitempty"` + SeverityText string `json:"severity_text,omitempty"` + Body any `json:"body,omitempty"` + Attributes map[string]any `json:"attributes,omitempty"` + DroppedAttributesCount uint32 `json:"dropped_attributes_count,omitempty"` + Flags uint32 `json:"flags,omitempty"` + TraceId []byte `json:"trace_id,omitempty"` + SpanId []byte `json:"span_id,omitempty"` +} + +func logRecordFrom(pb *logspb.LogRecord) *logRecord { + if pb == nil { + return nil + } + + return &logRecord{ + TimeUnixNano: pb.GetTimeUnixNano(), + ObservedTimeUnixNano: pb.GetObservedTimeUnixNano(), + SeverityNumber: int32(pb.GetSeverityNumber()), + SeverityText: pb.GetSeverityText(), + Body: anyFrom(pb.GetBody()), + Attributes: lo.SliceToMap(pb.GetAttributes(), func(item *commonpb.KeyValue) (string, any) { + return item.GetKey(), anyFrom(item.GetValue()) + }), + DroppedAttributesCount: pb.GetDroppedAttributesCount(), + Flags: pb.GetFlags(), + TraceId: pb.GetTraceId(), + SpanId: pb.GetSpanId(), + } +} + +func (in *otelLogInput) extractMessageFromRequest(request *collogspb.ExportLogsServiceRequest) (service.MessageBatch, error) { + var batch service.MessageBatch + + // TODO: improve message decoding + for _, resourceLog := range request.GetResourceLogs() { + r := resourceFrom(resourceLog.GetResource()) + + for _, scopeLog := range resourceLog.GetScopeLogs() { + s := scopeFrom(scopeLog.GetScope()) + + for _, logRecord := range scopeLog.GetLogRecords() { + record := record{ + Resource: r, + Scope: s, + Record: logRecordFrom(logRecord), + } + recordByte, err := json.Marshal(record) + // recordByte, err := protojson.Marshal(record) + if err != nil { + return nil, err + } + msg := service.NewMessage(recordByte) + batch = append(batch, msg) + } + } + } + + return batch, nil +} + +func (in *otelLogInput) loop(ln net.Listener) { + defer func() { + in.server.GracefulStop() + + in.handlerWG.Wait() + + close(in.transactions) + in.shutSig.ShutdownComplete() + }() + + go func() { + // TODO: add TLS support + if err := in.server.Serve(ln); err != nil { + _ = err + // in.log.Error("Server error: %v\n", err) + } + }() + + <-in.shutSig.CloseAtLeisureChan() +} diff --git a/collector/benthos/internal/LICENSE b/collector/benthos/internal/LICENSE new file mode 100644 index 000000000..008fb0cf6 --- /dev/null +++ b/collector/benthos/internal/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2020 Ashley Jeffs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/collector/benthos/internal/message/batch.go b/collector/benthos/internal/message/batch.go new file mode 100644 index 000000000..8d7c294e2 --- /dev/null +++ b/collector/benthos/internal/message/batch.go @@ -0,0 +1,5 @@ +package message + +import "github.com/benthosdev/benthos/v4/public/service" + +type Batch = service.MessageBatch diff --git a/collector/benthos/internal/message/transaction.go b/collector/benthos/internal/message/transaction.go new file mode 100644 index 000000000..86196eeb1 --- /dev/null +++ b/collector/benthos/internal/message/transaction.go @@ -0,0 +1,96 @@ +package message + +import "context" + +// Transaction is a component that associates a batch of one or more messages +// with a mechanism that is able to propagate an acknowledgement of delivery +// back to the source of the batch. +// +// This allows batches to be routed through complex component networks of +// buffers, processing pipelines and output brokers without losing the +// association. +// +// It would not be sufficient to associate acknowledgement to the message (or +// batch of messages) itself as it would then not be possible to expand and +// split message batches (grouping, etc) without loosening delivery guarantees. +// +// The proper way to do such things would be to create a new transaction for +// each resulting batch, and only when all derivative transactions are +// acknowledged is the source transaction acknowledged in turn. +type Transaction struct { + // Payload is the message payload of this transaction. + Payload Batch + + // responseChan should receive a response at the end of a transaction (once + // the message is no longer owned by the receiver.) The response itself + // indicates whether the message has been propagated successfully. + responseChan chan<- error + + // responseFunc should be called with an error at the end of a transaction + // (once the message is no longer owned by the receiver.) The error + // indicates whether the message has been propagated successfully. + responseFunc func(context.Context, error) error + + // Used for cancelling transactions. When cancelled it is up to the receiver + // of this transaction to abort any attempt to deliver the transaction + // message. + ctx context.Context +} + +//------------------------------------------------------------------------------ + +// NewTransaction creates a new transaction object from a message payload and a +// response channel. +func NewTransaction(payload Batch, resChan chan<- error) Transaction { + return Transaction{ + Payload: payload, + responseChan: resChan, + ctx: context.Background(), + } +} + +// NewTransactionFunc creates a new transaction object that associates a message +// batch payload with a func used to acknowledge delivery of the message batch. +func NewTransactionFunc(payload Batch, fn func(context.Context, error) error) Transaction { + return Transaction{ + Payload: payload, + responseFunc: fn, + ctx: context.Background(), + } +} + +// Context returns a context that indicates the cancellation of a transaction. +// It is optional for receivers of a transaction to honour this context, and is +// worth doing in cases where the transaction is blocked (on reconnect loops, +// etc) as it is often used as a fail-fast mechanism. +// +// When a transaction is aborted due to cancellation it is still required that +// acknowledgment is made, and should be done so with t.Context().Err(). +func (t *Transaction) Context() context.Context { + return t.ctx +} + +// WithContext returns a copy of the transaction associated with a context used +// for cancellation. When cancelled it is up to the receiver of this transaction +// to abort any attempt to deliver the transaction message. +func (t *Transaction) WithContext(ctx context.Context) *Transaction { + newT := *t + newT.ctx = ctx + return &newT +} + +// Ack returns a delivery response back through the transaction to the message +// source. A nil error indicates that delivery has been completed successfully, +// a non-nil error indicates that the message could not be delivered and should +// be retried or nacked upstream. +func (t *Transaction) Ack(ctx context.Context, err error) error { + if t.responseFunc != nil { + return t.responseFunc(ctx, err) + } + select { + case t.responseChan <- err: + case <-ctx.Done(): + return ctx.Err() + } + return nil +} diff --git a/collector/benthos/internal/shutdown/signaler.go b/collector/benthos/internal/shutdown/signaler.go new file mode 100644 index 000000000..9a49d1dff --- /dev/null +++ b/collector/benthos/internal/shutdown/signaler.go @@ -0,0 +1,168 @@ +package shutdown + +import ( + "context" + "sync" +) + +// Signaller is a mechanism owned by components that support graceful +// shut down and is used as a way to signal from outside that any goroutines +// owned by the component should begin to close. +// +// Shutting down can happen in two tiers of urgency, the first is to terminate +// "at leisure", meaning if you're in the middle of something it's okay to do +// that first before terminating, but please do not commit to new work. +// +// The second tier is immediate, where you need to clean up resources and +// terminate as soon as possible, regardless of any tasks that you are currently +// attempting to finish. +// +// Finally, there is also a signal of having closed down, which is made by the +// component and can be used from outside to determine whether the component +// has finished terminating. +type Signaller struct { + closeAtLeisureChan chan struct{} + closeAtLeisureOnce sync.Once + + closeNowChan chan struct{} + closeNowOnce sync.Once + + hasClosedChan chan struct{} + hasClosedOnce sync.Once +} + +// NewSignaller creates a new signaller. +func NewSignaller() *Signaller { + return &Signaller{ + closeAtLeisureChan: make(chan struct{}), + closeNowChan: make(chan struct{}), + hasClosedChan: make(chan struct{}), + } +} + +// CloseAtLeisure signals to the owner of this Signaller that it should +// terminate at its own leisure, meaning it's okay to complete any tasks that +// are in progress but no new work should be started. +func (s *Signaller) CloseAtLeisure() { + s.closeAtLeisureOnce.Do(func() { + close(s.closeAtLeisureChan) + }) +} + +// CloseNow signals to the owner of this Signaller that it should terminate +// right now regardless of any in progress tasks. +func (s *Signaller) CloseNow() { + s.CloseAtLeisure() + s.closeNowOnce.Do(func() { + close(s.closeNowChan) + }) +} + +// ShutdownComplete is a signal made by the component that it and all of its +// owned resources have terminated. +func (s *Signaller) ShutdownComplete() { + s.hasClosedOnce.Do(func() { + close(s.hasClosedChan) + }) +} + +//------------------------------------------------------------------------------ + +// ShouldCloseAtLeisure returns true if the signaller has received the signal to +// shut down at leisure or immediately. +func (s *Signaller) ShouldCloseAtLeisure() bool { + select { + case <-s.CloseAtLeisureChan(): + return true + default: + } + return false +} + +// CloseAtLeisureChan returns a channel that will be closed when the signal to +// shut down either at leisure or immediately has been made. +func (s *Signaller) CloseAtLeisureChan() <-chan struct{} { + return s.closeAtLeisureChan +} + +// CloseAtLeisureCtx returns a context.Context that will be terminated when +// either the provided context is cancelled or the signal to shut down +// either at leisure or immediately has been made. +func (s *Signaller) CloseAtLeisureCtx(ctx context.Context) (context.Context, context.CancelFunc) { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + go func() { + select { + case <-ctx.Done(): + case <-s.closeAtLeisureChan: + } + cancel() + }() + return ctx, cancel +} + +// ShouldCloseNow returns true if the signaller has received the signal to shut +// down immediately. +func (s *Signaller) ShouldCloseNow() bool { + select { + case <-s.CloseNowChan(): + return true + default: + } + return false +} + +// CloseNowChan returns a channel that will be closed when the signal to shut +// down immediately has been made. +func (s *Signaller) CloseNowChan() <-chan struct{} { + return s.closeNowChan +} + +// CloseNowCtx returns a context.Context that will be terminated when either the +// provided context is cancelled or the signal to shut down immediately has been +// made. +func (s *Signaller) CloseNowCtx(ctx context.Context) (context.Context, context.CancelFunc) { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + go func() { + select { + case <-ctx.Done(): + case <-s.closeNowChan: + } + cancel() + }() + return ctx, cancel +} + +// HasClosed returns true if the signaller has received the signal that the +// component has terminated. +func (s *Signaller) HasClosed() bool { + select { + case <-s.HasClosedChan(): + return true + default: + } + return false +} + +// HasClosedChan returns a channel that will be closed when the signal that the +// component has terminated has been made. +func (s *Signaller) HasClosedChan() <-chan struct{} { + return s.hasClosedChan +} + +// HasClosedCtx returns a context.Context that will be cancelled when either the +// provided context is cancelled or the signal that the component has shut down +// has been made. +func (s *Signaller) HasClosedCtx(ctx context.Context) (context.Context, context.CancelFunc) { + var cancel context.CancelFunc + ctx, cancel = context.WithCancel(ctx) + go func() { + select { + case <-ctx.Done(): + case <-s.hasClosedChan: + } + cancel() + }() + return ctx, cancel +} diff --git a/collector/benthos/output/otel_log.go b/collector/benthos/output/otel_log.go new file mode 100644 index 000000000..2b6d63cda --- /dev/null +++ b/collector/benthos/output/otel_log.go @@ -0,0 +1,124 @@ +package output + +import ( + "context" + + "github.com/benthosdev/benthos/v4/public/service" + collogspb "go.opentelemetry.io/proto/otlp/collector/logs/v1" + logspb "go.opentelemetry.io/proto/otlp/logs/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/protojson" +) + +func otelLogOutputConfig() *service.ConfigSpec { + return service.NewConfigSpec(). + Beta(). + Categories("Services"). + Summary("Export logs to an OTLP log collector service."). + Description(""). + Fields( + service.NewStringField("address"). + Description("OTLP gRPC endpoint"), + + service.NewBatchPolicyField("batching"), + service.NewOutputMaxInFlightField().Default(10), + ) +} + +func init() { + err := service.RegisterBatchOutput("otel_log", otelLogOutputConfig(), + func(conf *service.ParsedConfig, mgr *service.Resources) ( + output service.BatchOutput, + batchPolicy service.BatchPolicy, + maxInFlight int, + err error, + ) { + if maxInFlight, err = conf.FieldInt("max_in_flight"); err != nil { + return + } + + if batchPolicy, err = conf.FieldBatchPolicy("batching"); err != nil { + return + } + + output, err = newOtelLogOutput(conf) + + return + }) + if err != nil { + panic(err) + } +} + +type otelLogOutput struct { + address string + + conn *grpc.ClientConn + client collogspb.LogsServiceClient +} + +func newOtelLogOutput(conf *service.ParsedConfig) (*otelLogOutput, error) { + address, err := conf.FieldString("address") + if err != nil { + return nil, err + } + + return &otelLogOutput{ + address: address, + }, nil +} + +func (out *otelLogOutput) Connect(ctx context.Context) error { + if out.conn != nil { + out.conn.Close() + out.conn = nil + } + + conn, err := grpc.DialContext(ctx, out.address, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + + out.conn = conn + out.client = collogspb.NewLogsServiceClient(conn) + + return nil +} + +func (out *otelLogOutput) WriteBatch(ctx context.Context, batch service.MessageBatch) error { + var resourceLogs []*logspb.ResourceLogs + + for _, msg := range batch { + var resourceLog logspb.ResourceLogs + + b, err := msg.AsBytes() + if err != nil { + return err + } + + err = protojson.Unmarshal(b, &resourceLog) + if err != nil { + return err + } + + resourceLogs = append(resourceLogs, &resourceLog) + } + + req := &collogspb.ExportLogsServiceRequest{ + ResourceLogs: resourceLogs, + } + + _, err := out.client.Export(ctx, req) + if err != nil { + return err + } + + return nil +} + +func (out *otelLogOutput) Close(_ context.Context) error { + out.conn.Close() + + return nil +} diff --git a/examples/collectors/README.md b/examples/collectors/README.md index 6854b0a84..a21ecfd39 100644 --- a/examples/collectors/README.md +++ b/examples/collectors/README.md @@ -8,6 +8,7 @@ The examples use the custom Benthos distribution in this repository. - [Generate](generate/) random events - [HTTP server](http-server/) (forwarding events to OpenMeter) - [Kubernetes Pod execution time](kubernetes-pod-exec-time/) +- [OpenTelemetry Logs](otel-log/) ## Prerequisites diff --git a/examples/collectors/otel-log/.env.dist b/examples/collectors/otel-log/.env.dist new file mode 100644 index 000000000..39ef0f7bf --- /dev/null +++ b/examples/collectors/otel-log/.env.dist @@ -0,0 +1,15 @@ +# OpenMeter details +OPENMETER_URL=https://openmeter.cloud +OPENMETER_TOKEN= + +BATCH_SIZE=20 +BATCH_PERIOD=2s + +# Enable debug logging +DEBUG=false + +# Seeder configuration +# For more details see https://www.benthos.dev/docs/components/inputs/generate +SEEDER_INTERVAL=1s + +SEEDER_LOG=false diff --git a/examples/collectors/otel-log/README.md b/examples/collectors/otel-log/README.md new file mode 100644 index 000000000..e397ea1fe --- /dev/null +++ b/examples/collectors/otel-log/README.md @@ -0,0 +1,97 @@ +# OpenTelemetry Log + +This example demonstrates ingesting logs into OpenMeter using the OTLP protocol. + +## Table of Contents + +- [Prerequisites](#prerequisites) +- [Launch the example](#launch-the-example) +- [Checking events](#checking-events) +- [Cleanup](#cleanup) +- [Advanced configuration](#advanced-configuration) +- [Production use](#production-use) + +## Prerequisites + +This example uses [Docker](https://docker.com) and [Docker Compose](https://docs.docker.com/compose/), but you are free to run the components in any other way. + +Check out this repository if you want to run the example locally: + +```shell +git clone https://github.com/openmeterio/openmeter.git +cd openmeter/examples/collectors/otel-log +``` + +Create a new `.env` file and add the details of your OpenMeter instance: + +```shell +cp .env.dist .env +# edit .env and fill in the details +``` + +> [!TIP] +> Tweak other options in the `.env` file to change the behavior of the example. + +[
 Create a meter 
](https://openmeter.cloud/meters/create?meter=%7B%22slug%22%3A%22api_calls%22%2C%22eventType%22%3A%22api-calls%22%2C%22valueProperty%22%3A%22%24.duration_ms%22%2C%22aggregation%22%3A%22SUM%22%2C%22windowSize%22%3A%22MINUTE%22%2C%22groupBy%22%3A%5B%7B%22name%22%3A%22method%22%7D%2C%7B%22name%22%3A%22path%22%7D%2C%7B%22name%22%3A%22region%22%7D%2C%7B%22name%22%3A%22zone%22%7D%5D%7D&utm_source=github&utm_medium=link&utm_content=collectors) +using the button or [manually](https://openmeter.cloud/meters/create) with the following details: + +- Event type: `api-calls` +- Aggregation: `SUM` +- Value property: `$.duration_ms` +- Group by (optional): + - `method`: `$.method` + - `path`: `$.path` + - `region`: `$.region` + - `zone`: `$.zone` + +
Configuration for self-hosted OpenMeter
+ +```yaml +# ... + +meters: + - slug: api_calls + eventType: api-calls + aggregation: SUM + valueProperty: $.duration_ms + groupBy: + method: $.method + path: $.path + region: $.region + zone: $.zone +``` +
+ +> [!TIP] +> Read more about creating a meters in the [documentation](https://openmeter.io/docs/getting-started/meters). + +## Launch the example + +Launch the example (event collector and seeder): + +```shell +docker compose up -d +``` + +## Checking events + +Read more in the collector examples [README](../README.md#Checking-events-in-OpenMeter). + +## Cleanup + +Stop containers: + +```shell +docker compose down -v +``` + +## Advanced configuration + +Check out the configuration files and the [Benthos documentation](https://www.benthos.dev/docs/about) for more details. + +## Production use + +We are actively working on improving the documentation and the examples. +In the meantime, feel free to contact us [in email](https://us10.list-manage.com/contact-form?u=c7d6a96403a0e5e19032ee885&form_id=fe04a7fc4851f8547cfee56763850e95) or [on Discord](https://discord.gg/nYH3ZQ3Xzq). + +We are more than happy to help you set up OpenMeter in your production environment. diff --git a/examples/collectors/otel-log/config.yaml b/examples/collectors/otel-log/config.yaml new file mode 100644 index 000000000..19e7f0d01 --- /dev/null +++ b/examples/collectors/otel-log/config.yaml @@ -0,0 +1,48 @@ +input: + otel_log: + address: "${OTLP_ADDRESS}" + timeout: 30s + +pipeline: + processors: + - mapping: | + root = { + "id": uuid_v4(), + "specversion": "1.0", + "type": "api-calls", + "source": "otlp-log", + "time": this.record.attributes.time, + "subject": this.record.attributes.subject, + "data": { + "method": this.record.attributes.method, + "path": this.record.attributes.path, + "region": this.record.attributes.region, + "zone": this.record.attributes.zone, + "duration_ms": this.record.attributes.duration, + }, + } + - json_schema: + schema_path: "file://./cloudevents.spec.json" + - catch: + - log: + level: ERROR + message: "Schema validation failed due to: ${!error()}" + - mapping: "root = deleted()" + +output: + switch: + cases: + - check: "" + continue: true + output: + openmeter: + url: "${OPENMETER_URL:https://openmeter.cloud}" + token: "${OPENMETER_TOKEN:}" + batching: + count: ${BATCH_SIZE:20} + period: ${BATCH_PERIOD:} + + - check: '"${DEBUG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/examples/collectors/otel-log/docker-compose.yaml b/examples/collectors/otel-log/docker-compose.yaml new file mode 100644 index 000000000..ea8e9493e --- /dev/null +++ b/examples/collectors/otel-log/docker-compose.yaml @@ -0,0 +1,26 @@ +version: "3.9" + +services: + forwarder: + image: ghcr.io/openmeterio/benthos-collector + pull_policy: always + command: benthos -c /etc/benthos/config.yaml + env_file: + - .env + environment: + OTLP_ADDRESS: 0.0.0.0:4317 + ports: + - 127.0.0.1:4317:4317 + volumes: + - ./config.yaml:/etc/benthos/config.yaml:ro + + seeder: + image: ghcr.io/openmeterio/benthos-collector + pull_policy: always + command: benthos -c /etc/benthos/config.yaml + env_file: + - .env + environment: + OTLP_ADDRESS: forwarder:4317 + volumes: + - ./seed/config.yaml:/etc/benthos/config.yaml:ro diff --git a/examples/collectors/otel-log/seed/config.yaml b/examples/collectors/otel-log/seed/config.yaml new file mode 100644 index 000000000..943e9682b --- /dev/null +++ b/examples/collectors/otel-log/seed/config.yaml @@ -0,0 +1,95 @@ +input: + generate: + interval: "${SEEDER_INTERVAL:1s}" + mapping: | + let max_subjects = ${SEEDER_MAX_SUBJECTS:100} + + let event_type = "api-calls" + let source = "api-gateway" + let methods = ["GET", "POST"] + let paths = ["/", "/about", "/contact", "/pricing", "/docs"] + let regions = ["us-east-1", "us-west-1", "us-east-2", "us-west-2"] + let zoneSuffixes = ["a", "b", "c", "d"] + + let subject = "customer-%d".format(random_int(seed: timestamp_unix_nano()) % $max_subjects) + let time = (now().ts_sub_iso8601("P3D").ts_unix() + random_int(min: 60, max: 60 * 60 * 24 * 3)).ts_format() + + let method = $methods.index(random_int(seed: timestamp_unix_nano()) % $methods.length()) + let path = $paths.index(random_int(seed: timestamp_unix_nano()) % $paths.length()) + let region = $regions.index(random_int(seed: timestamp_unix_nano()) % $regions.length()) + let zone = "%s%s".format($region, $zoneSuffixes.index(random_int(seed: timestamp_unix_nano()) % $zoneSuffixes.length())) + let duration = random_int(seed: timestamp_unix_nano(), max: 1000) + + root = { + "scope_logs": [ + { + "log_records": [ + { + "severity_number": 9, + "severity_text": "INFO", + "attributes": [ + { + "key": "subject", + "value": { + "stringValue": $subject, + }, + }, + { + "key": "time", + "value": { + "stringValue": $time, + }, + }, + { + "key": "method", + "value": { + "stringValue": $method, + }, + }, + { + "key": "path", + "value": { + "stringValue": $path, + }, + }, + { + "key": "region", + "value": { + "stringValue": $region, + }, + }, + { + "key": "zone", + "value": { + "stringValue": $zone, + }, + }, + { + "key": "duration", + "value": { + "intValue": $duration, + }, + }, + ], + "body": { + "stringValue": "this is an access log" + }, + }, + ], + }, + ], + } + +output: + switch: + cases: + - check: "" + continue: true + output: + otel_log: + address: "${OTLP_ADDRESS}" + + - check: '"${SEEDER_LOG:false}" == "true"' + output: + stdout: + codec: lines diff --git a/go.mod b/go.mod index 02f04dac8..063670644 100644 --- a/go.mod +++ b/go.mod @@ -47,6 +47,7 @@ require ( go.opentelemetry.io/otel/sdk v1.24.0 go.opentelemetry.io/otel/sdk/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 + go.opentelemetry.io/proto/otlp v1.1.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 google.golang.org/grpc v1.62.0 k8s.io/apimachinery v0.29.2 @@ -318,7 +319,6 @@ require ( go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 // indirect - go.opentelemetry.io/proto/otlp v1.1.0 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.19.0 // indirect diff --git a/internal/ingest/httpingest/httpingest.go b/internal/ingest/httpingest/httpingest.go index 0fc0212d6..f6a7c5350 100644 --- a/internal/ingest/httpingest/httpingest.go +++ b/internal/ingest/httpingest/httpingest.go @@ -53,14 +53,20 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, namespace str contentType := r.Header.Get("Content-Type") var err error + var handled bool switch contentType { case "application/cloudevents+json": - err = h.processSingleRequest(ctx, w, r, namespace) + err, handled = h.processSingleRequest(ctx, w, r, namespace) case "application/cloudevents-batch+json": - err = h.processBatchRequest(ctx, w, r, namespace) + err, handled = h.processBatchRequest(ctx, w, r, namespace) default: // this should never happen - err = errors.New("invalid content type: " + contentType) + models.NewStatusProblem(ctx, errors.New("invalid content type: "+contentType), http.StatusBadRequest).Respond(w, r) + handled = true + } + + if handled { + return } if err != nil { @@ -73,38 +79,42 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, namespace str w.WriteHeader(http.StatusNoContent) } -func (h Handler) processBatchRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, namespace string) error { +func (h Handler) processBatchRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, namespace string) (error, bool) { var events api.IngestEventsApplicationCloudeventsBatchPlusJSONBody err := json.NewDecoder(r.Body).Decode(&events) if err != nil { - return fmt.Errorf("parsing event: %w", err) + models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w, r) + + return nil, true } for _, event := range events { err = h.processEvent(ctx, event, namespace) if err != nil { - return err + return err, false } } - return nil + return nil, false } -func (h Handler) processSingleRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, namespace string) error { +func (h Handler) processSingleRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, namespace string) (error, bool) { var event api.IngestEventsApplicationCloudeventsPlusJSONRequestBody err := json.NewDecoder(r.Body).Decode(&event) if err != nil { - return fmt.Errorf("parsing event: %w", err) + models.NewStatusProblem(ctx, fmt.Errorf("parsing event: %w", err), http.StatusBadRequest).Respond(w, r) + + return nil, true } err = h.processEvent(r.Context(), event, namespace) if err != nil { - return err + return err, false } - return nil + return nil, false } func (h Handler) processEvent(ctx context.Context, event event.Event, namespace string) error { diff --git a/internal/ingest/httpingest/httpingest_test.go b/internal/ingest/httpingest/httpingest_test.go index 10f3258aa..efe97c756 100644 --- a/internal/ingest/httpingest/httpingest_test.go +++ b/internal/ingest/httpingest/httpingest_test.go @@ -77,6 +77,27 @@ func TestHandler(t *testing.T) { assert.Equal(t, receivedEvent.Time(), ev.Time()) } +func TestHandler_InvalidEvent(t *testing.T) { + collector := ingest.NewInMemoryCollector() + httpHandler, err := NewHandler(HandlerConfig{ + Collector: collector, + NamespaceManager: namespaceManager, + ErrorHandler: errorsx.NopHandler{}, + }) + require.NoError(t, err) + handler := MockHandler{ + handler: httpHandler, + } + + server := httptest.NewServer(handler) + client := server.Client() + + resp, err := client.Post(server.URL, "application/cloudevents+json", bytes.NewBuffer([]byte(`invalid`))) + require.NoError(t, err) + + assert.Equal(t, http.StatusBadRequest, resp.StatusCode) +} + func TestBatchHandler(t *testing.T) { collector := ingest.NewInMemoryCollector() httpHandler, err := NewHandler(HandlerConfig{