Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement otlploggrpc gRPC client #5572

Merged
merged 7 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions exporters/otlp/otlplog/otlploggrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package otlploggrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc"

import (
"context"
"fmt"
"time"

"google.golang.org/genproto/googleapis/rpc/errdetails"
Expand All @@ -16,8 +18,10 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc/internal/retry"
collogpb "go.opentelemetry.io/proto/otlp/collector/logs/v1"
logpb "go.opentelemetry.io/proto/otlp/logs/v1"
)

// The methods of this type are not expected to be called concurrently.
Expand Down Expand Up @@ -107,6 +111,99 @@ func newGRPCDialOptions(cfg config) []grpc.DialOption {
return dialOpts
}

// UploadLogs sends proto logs to connected endpoint.
//
// Retryable errors from the server will be handled according to any
// RetryConfig the client was created with.
//
// The otlplog.Exporter synchronizes access to client methods, and
// ensures this is not called after the Exporter is shutdown. Only thing
// to do here is send data.
func (c *client) UploadLogs(ctx context.Context, rl []*logpb.ResourceLogs) error {
select {
case <-ctx.Done():
// Do not upload if the context is already expired.
return ctx.Err()
default:
}

ctx, cancel := c.exportContext(ctx)
defer cancel()

return c.requestFunc(ctx, func(ctx context.Context) error {
resp, err := c.lsc.Export(ctx, &collogpb.ExportLogsServiceRequest{
ResourceLogs: rl,
})
if resp != nil && resp.PartialSuccess != nil {
msg := resp.PartialSuccess.GetErrorMessage()
n := resp.PartialSuccess.GetRejectedLogRecords()
if n != 0 || msg != "" {
err := fmt.Errorf("OTLP partial success: %s (%d log records rejected)", msg, n)
otel.Handle(err)
}
}
// nil is converted to OK.
if status.Code(err) == codes.OK {
// Success.
return nil
}
return err
})
}

// Shutdown shuts down the client, freeing all resources.
//
// Any active connections to a remote endpoint are closed if they were created
// by the client. Any gRPC connection passed during creation using
// WithGRPCConn will not be closed. It is the caller's responsibility to
// handle cleanup of that resource.
//
// The otlplog.Exporter synchronizes access to client methods and
// ensures this is called only once. The only thing that needs to be done
// here is to release any computational resources the client holds.
func (c *client) Shutdown(ctx context.Context) error {
c.metadata = nil
c.requestFunc = nil
c.lsc = nil

// Release the connection if we created it.
err := ctx.Err()
if c.ourConn {
closeErr := c.conn.Close()
// A context timeout error takes precedence over this error.
if err == nil && closeErr != nil {
err = closeErr
}
}
c.conn = nil
return err
}

// exportContext returns a copy of parent with an appropriate deadline and
// cancellation function based on the clients configured export timeout.
//
// It is the callers responsibility to cancel the returned context once its
// use is complete, via the parent or directly with the returned CancelFunc, to
// ensure all resources are correctly released.
func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) {
var (
ctx context.Context
cancel context.CancelFunc
)

if c.exportTimeout > 0 {
ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
} else {
ctx, cancel = context.WithCancel(parent)
}

if c.metadata.Len() > 0 {
ctx = metadata.NewOutgoingContext(ctx, c.metadata)
}

return ctx, cancel
}

// retryable returns if err identifies a request that can be retried and a
// duration to wait for if an explicit throttle time is included in err.
func retryable(err error) (bool, time.Duration) {
Expand Down
Loading