Skip to content

Commit

Permalink
feat: migrate cloudrunner internals to slog
Browse files Browse the repository at this point in the history
This is the turning-point for the migration from zap to slog - all
internal logging in cloudrunner now uses the slog APIs and the slog
logging backends (TextHandler in development and JSONHandler in
production).

The zap APIs continue to work, and will be marked as deprecated when the
slog APIs have proven stable in production for a few weeks.
  • Loading branch information
odsod committed Oct 16, 2024
1 parent c02e66b commit 8a666ed
Show file tree
Hide file tree
Showing 21 changed files with 137 additions and 218 deletions.
44 changes: 44 additions & 0 deletions cloudconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package cloudconfig
import (
"fmt"
"io"
"log/slog"
"os"
"text/tabwriter"
"time"

"go.opentelemetry.io/otel/codes"
)

// envPrefix can be set during build-time to append a prefix to all environment variables loaded into the RunConfig.
Expand Down Expand Up @@ -108,3 +112,43 @@ func (c *Config) PrintUsage(w io.Writer) {
}
_ = tabs.Flush()
}

// LogValue implements [slog.LogValuer].
func (c *Config) LogValue() slog.Value {
attrs := make([]slog.Attr, 0, len(c.configSpecs))
for _, configSpec := range c.configSpecs {
attrs = append(attrs, slog.Any(configSpec.name, fieldSpecsValue(configSpec.fieldSpecs)))
}
return slog.GroupValue(attrs...)
}

type fieldSpecsValue []fieldSpec

func (fsv fieldSpecsValue) LogValue() slog.Value {
attrs := make([]slog.Attr, 0, len(fsv))
for _, fs := range fsv {
if fs.Secret {
attrs = append(attrs, slog.String(fs.Key, "<secret>"))
continue
}
switch value := fs.Value.Interface().(type) {
case time.Duration:
attrs = append(attrs, slog.Duration(fs.Key, value))
case []codes.Code:
logValue := make([]string, 0, len(value))
for _, code := range value {
logValue = append(logValue, code.String())
}
attrs = append(attrs, slog.Any(fs.Key, logValue))
case map[codes.Code]slog.Level:
logValue := make(map[string]string, len(value))
for code, level := range value {
logValue[code.String()] = level.String()
}
attrs = append(attrs, slog.Any(fs.Key, logValue))
default:
attrs = append(attrs, slog.Any(fs.Key, fs.Value.Interface()))
}
}
return slog.GroupValue(attrs...)
}
69 changes: 0 additions & 69 deletions cloudconfig/zap.go

This file was deleted.

32 changes: 11 additions & 21 deletions cloudmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"net"
"net/http"
"strings"
"time"

"github.com/soheilhy/cmux"
"go.einride.tech/cloudrunner/cloudzap"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)
Expand All @@ -31,54 +30,45 @@ func ServeGRPCHTTP(
cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto"),
)
httpL := m.Match(cmux.Any())
logger, ok := cloudzap.GetLogger(ctx)
if !ok {
logger = zap.NewNop()
}

var g errgroup.Group

// wait for context to be canceled and gracefully stop all servers.
g.Go(func() error {
<-ctx.Done()

logger.Debug("stopping cmux server")
slog.DebugContext(ctx, "stopping cmux server")
m.Close()

logger.Debug("stopping HTTP server")
slog.DebugContext(ctx, "stopping HTTP server")
// use a new context because the parent ctx is already canceled.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil && !isClosedErr(err) {
logger.Warn("stopping http server", zap.Error(err))
slog.WarnContext(ctx, "stopping http server", slog.Any("error", err))
}

logger.Debug("stopping gRPC server")
slog.DebugContext(ctx, "stopping gRPC server")
grpcServer.GracefulStop()
logger.Debug("stopped both http and grpc server")
slog.DebugContext(ctx, "stopped both http and grpc server")
return nil
})

g.Go(func() error {
logger.Debug("serving gRPC")
slog.DebugContext(ctx, "serving gRPC")
if err := grpcServer.Serve(grpcL); err != nil && !isClosedErr(err) {
return fmt.Errorf("serve gRPC: %w", err)
}
logger.Debug("stopped serving gRPC")
slog.DebugContext(ctx, "stopped serving gRPC")
return nil
})

g.Go(func() error {
logger.Debug("serving HTTP")
slog.DebugContext(ctx, "serving HTTP")
if err := httpServer.Serve(httpL); err != nil && !isClosedErr(err) {
return fmt.Errorf("serve HTTP: %w", err)
}
logger.Debug("stopped serving HTTP")
slog.DebugContext(ctx, "stopped serving HTTP")
return nil
})

if err := m.Serve(); err != nil && !isClosedErr(err) {
logger.Error("oops", zap.Error(err))
slog.ErrorContext(ctx, "oops", slog.Any("error", err))
return fmt.Errorf("serve cmux: %w", err)
}
return g.Wait()
Expand Down
3 changes: 0 additions & 3 deletions cloudmux/mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"testing"
"time"

"go.einride.tech/cloudrunner/cloudzap"
"go.uber.org/zap/zaptest"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/examples/helloworld/helloworld"
Expand Down Expand Up @@ -118,7 +116,6 @@ func TestServe_GracefulHTTP(t *testing.T) {
func newTestFixture(t *testing.T) *testFixture {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
ctx = cloudzap.WithLogger(ctx, zaptest.NewLogger(t))
var lc net.ListenConfig
lis, err := lc.Listen(ctx, "tcp", ":0")
assert.NilError(t, err)
Expand Down
22 changes: 5 additions & 17 deletions cloudotel/errorhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,17 @@ package cloudotel

import (
"context"
"log/slog"

"go.einride.tech/cloudrunner/cloudzap"
"go.opentelemetry.io/otel"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// NewErrorLogger returns a new otel.ErrorHandler that logs errors using the provided logger, level and message.
func NewErrorLogger(logger *zap.Logger, level zapcore.Level, message string) otel.ErrorHandler {
return errorHandler{logger: logger, level: level, message: message}
}

type errorHandler struct {
logger *zap.Logger
level zapcore.Level
message string
}

// Handle implements otel.ErrorHandler.
func (e errorHandler) Handle(err error) {
e.logger.Check(e.level, e.message).Write(zap.Error(err))
// Deprecated: This is a no-op as part of the migration from zap to slog.
func NewErrorLogger(*zap.Logger, zapcore.Level, string) otel.ErrorHandler {
return otel.ErrorHandlerFunc(func(error) {})
}

// RegisterErrorHandler registers a global OpenTelemetry error handler.
Expand All @@ -43,7 +33,5 @@ func handleError(ctx context.Context, err error) {
// https://pkg.go.dev/go.opentelemetry.io/otel/bridge/opencensus
return
}
if logger, ok := cloudzap.GetLogger(ctx); ok {
logger.Warn("otel error", zap.Error(err))
}
slog.WarnContext(ctx, "otel error", slog.Any("error", err))
}
9 changes: 4 additions & 5 deletions cloudpubsub/httphandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package cloudpubsub
import (
"context"
"encoding/json"
"log/slog"
"net/http"
"time"

"cloud.google.com/go/pubsub/apiv1/pubsubpb"
"go.einride.tech/cloudrunner/cloudrequestlog"
"go.einride.tech/cloudrunner/cloudstatus"
"go.einride.tech/cloudrunner/cloudzap"
"go.uber.org/zap"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -40,7 +39,7 @@ func (fn httpHandlerFn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(zap.Error(err))
fields.Add(slog.Any("error", err))
}
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
Expand All @@ -53,12 +52,12 @@ func (fn httpHandlerFn) ServeHTTP(w http.ResponseWriter, r *http.Request) {
OrderingKey: payload.Message.OrderingKey,
}
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(cloudzap.ProtoMessage("pubsubMessage", &pubsubMessage))
fields.Add(slog.Any("pubsubMessage", &pubsubMessage))
}
ctx := withSubscription(r.Context(), payload.Subscription)
if err := fn(ctx, &pubsubMessage); err != nil {
if fields, ok := cloudrequestlog.GetAdditionalFields(r.Context()); ok {
fields.Add(zap.Error(err))
fields.Add(slog.Any("error", err))
}
code := status.Code(err)
httpStatus := cloudstatus.ToHTTP(code)
Expand Down
7 changes: 4 additions & 3 deletions cloudserver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package cloudserver
import (
"context"
"fmt"
"log/slog"
"net/http"
"runtime/debug"

"go.einride.tech/cloudrunner/cloudrequestlog"
"go.uber.org/zap"
)

// HTTPServer provides HTTP server middleware.
Expand All @@ -17,8 +18,8 @@ func (i *Middleware) HTTPServer(next http.Handler) http.Handler {
writer.WriteHeader(http.StatusInternalServerError)
if fields, ok := cloudrequestlog.GetAdditionalFields(request.Context()); ok {
fields.Add(
zap.Stack("stack"),
zap.Error(fmt.Errorf("recovered panic: %v", r)),
slog.String("stack", string(debug.Stack())),
slog.Any("error", fmt.Errorf("recovered panic: %v", r)),
)
}
}
Expand Down
7 changes: 4 additions & 3 deletions cloudserver/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"runtime"
"runtime/debug"

"go.einride.tech/cloudrunner/clouderror"
"go.einride.tech/cloudrunner/cloudrequestlog"
"go.einride.tech/cloudrunner/cloudstream"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -35,7 +36,7 @@ func (i *Middleware) GRPCUnaryServerInterceptor(
status.New(codes.Internal, "internal error"),
)
if additionalFields, ok := cloudrequestlog.GetAdditionalFields(ctx); ok {
additionalFields.Add(zap.Stack("stack"))
additionalFields.Add(slog.String("stack", string(debug.Stack())))
}
}
}()
Expand Down Expand Up @@ -70,7 +71,7 @@ func (i *Middleware) GRPCStreamServerInterceptor(
status.New(codes.Internal, "internal error"),
)
if additionalFields, ok := cloudrequestlog.GetAdditionalFields(ss.Context()); ok {
additionalFields.Add(zap.Stack("stack"))
additionalFields.Add(slog.String("stack", string(debug.Stack())))
}
}
}()
Expand Down
3 changes: 3 additions & 0 deletions cloudslog/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/trace"
ltype "google.golang.org/genproto/googleapis/logging/type"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -94,6 +95,8 @@ func (r *attrReplacer) replaceAttr(_ []string, attr slog.Attr) slog.Attr {
attr.Value = slog.AnyValue(newBuildInfoValue(value))
case *ltype.HttpRequest:
attr.Value = slog.AnyValue(newProtoValue(fixHTTPRequest(value), r.config.ProtoMessageSizeLimit))
case *status.Status:
attr.Value = slog.AnyValue(newProtoValue(value.Proto(), r.config.ProtoMessageSizeLimit))
case proto.Message:
if needsRedact(value) {
value = proto.Clone(value)
Expand Down
6 changes: 3 additions & 3 deletions cloudtrace/idhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package cloudtrace

import (
"context"
"log/slog"

"go.einride.tech/cloudrunner/cloudzap"
"go.uber.org/zap"
"go.einride.tech/cloudrunner/cloudslog"
)

// IDKey is the log entry key for trace IDs.
Expand All @@ -15,5 +15,5 @@ const IDKey = "traceId"
// The trace ID can be used to filter on logs for the same trace across multiple projects.
// Experimental: May be removed in a future update.
func IDHook(ctx context.Context, traceContext Context) context.Context {
return cloudzap.WithLoggerFields(ctx, zap.String(IDKey, traceContext.TraceID))
return cloudslog.With(ctx, slog.String(IDKey, traceContext.TraceID))
}
Loading

0 comments on commit 8a666ed

Please sign in to comment.