Skip to content

🚨 Design workflows of slog handlers: pipeline, middleware, fanout, routing, failover, load balancing...

License

Notifications You must be signed in to change notification settings

samber/slog-multi

Folders and files

NameName
Last commit message
Last commit date

Latest commit

e7e598c Β· Feb 4, 2025

History

82 Commits
Nov 19, 2024
Oct 1, 2024
Apr 10, 2023
Aug 9, 2023
Apr 10, 2023
Nov 1, 2023
Feb 4, 2025
Apr 10, 2023
Feb 4, 2025
Feb 4, 2025
Feb 4, 2025
Aug 9, 2023
Feb 4, 2025
Jan 2, 2025
Jan 2, 2025
May 23, 2023
Aug 9, 2023
Jan 2, 2025
Jan 2, 2025
Jan 2, 2025
Jan 2, 2025
Jan 2, 2025
Dec 5, 2024
Aug 9, 2023
Oct 27, 2024
Feb 4, 2025
Feb 4, 2025
Feb 4, 2025

Repository files navigation

slog: Handler chaining, fanout, routing, failover, load balancing...

tag Go Version GoDoc Build Status Go report Coverage Contributors License

Design workflows of slog handlers:

  • Fanout: distribute log.Record to multiple slog.Handler in parallel
  • Pipe: rewrite log.Record on the fly (eg: for privacy reasons)
  • Router: forward log.Record to all matching slog.Handler
  • Failover: forward log.Record to the first available slog.Handler
  • Pool: increase log bandwidth by sending log.Record to a pool of slog.Handler
  • RecoverHandlerError: catch panics and errors from handlers

Here is a simple workflow with both pipeline and fanout:

workflow example with pipeline and fanout

Middlewares:

See also:

HTTP middlewares:

Loggers:

Log sinks:

πŸš€ Install

go get github.com/samber/slog-multi

Compatibility: go >= 1.21

No breaking changes will be made to exported APIs before v2.0.0.

Warning

Use this library carefully, log processing can be very costly (!)

πŸ’‘ Usage

GoDoc: https://pkg.go.dev/github.com/samber/slog-multi

Broadcast: slogmulti.Fanout()

Distribute logs to multiple slog.Handler in parallel.

import (
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)

func main() {
    logstash, _ := net.Dial("tcp", "logstash.acme:4242")    // use github.com/netbrain/goautosocket for auto-reconnect
    stderr := os.Stderr

    logger := slog.New(
        slogmulti.Fanout(
            slog.NewJSONHandler(logstash, &slog.HandlerOptions{}),  // pass to first handler: logstash over tcp
            slog.NewTextHandler(stderr, &slog.HandlerOptions{}),    // then to second handler: stderr
            // ...
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

Stderr output:

time=2023-04-10T14:00:0.000000+00:00 level=ERROR msg="A message" user.id=user-123 user.created_at=2023-04-10T14:00:0.000000+00:00 environment=dev error="an error"

Netcat output:

{
	"time":"2023-04-10T14:00:0.000000+00:00",
	"level":"ERROR",
	"msg":"A message",
	"user":{
		"id":"user-123",
		"created_at":"2023-04-10T14:00:0.000000+00:00"
	},
	"environment":"dev",
	"error":"an error"
}

Routing: slogmulti.Router()

Distribute logs to all matching slog.Handler in parallel.

import (
    slogmulti "github.com/samber/slog-multi"
    slogslack "github.com/samber/slog-slack"
    "log/slog"
)

func main() {
    slackChannelUS := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-us"}.NewSlackHandler()
    slackChannelEU := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-eu"}.NewSlackHandler()
    slackChannelAPAC := slogslack.Option{Level: slog.LevelError, WebhookURL: "xxx", Channel: "supervision-apac"}.NewSlackHandler()

    logger := slog.New(
        slogmulti.Router().
            Add(slackChannelUS, recordMatchRegion("us")).
            Add(slackChannelEU, recordMatchRegion("eu")).
            Add(slackChannelAPAC, recordMatchRegion("apac")).
            Handler(),
    )

    logger.
        With("region", "us").
        With("pool", "us-east-1").
        Error("Server desynchronized")
}

func recordMatchRegion(region string) func(ctx context.Context, r slog.Record) bool {
    return func(ctx context.Context, r slog.Record) bool {
        ok := false

        r.Attrs(func(attr slog.Attr) bool {
            if attr.Key == "region" && attr.Value.Kind() == slog.KindString && attr.Value.String() == region {
                ok = true
                return false
            }

            return true
        })

        return ok
    }
}

Failover: slogmulti.Failover()

List multiple targets for a slog.Record instead of retrying on the same unavailable log management system.

import (
    "net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)


func main() {
    // ncat -l 1000 -k
    // ncat -l 1001 -k
    // ncat -l 1002 -k

    // list AZs
    // use github.com/netbrain/goautosocket for auto-reconnect
    logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
    logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
    logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

    logger := slog.New(
        slogmulti.Failover()(
            slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),    // send to this instance first
            slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),    // then this instance in case of failure
            slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),    // and finally this instance in case of double failure
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

Load balancing: slogmulti.Pool()

Increase log bandwidth by sending log.Record to a pool of slog.Handler.

import (
    "net"
    slogmulti "github.com/samber/slog-multi"
    "log/slog"
)

func main() {
    // ncat -l 1000 -k
    // ncat -l 1001 -k
    // ncat -l 1002 -k

    // list AZs
    // use github.com/netbrain/goautosocket for auto-reconnect
    logstash1, _ := net.Dial("tcp", "logstash.eu-west-3a.internal:1000")
    logstash2, _ := net.Dial("tcp", "logstash.eu-west-3b.internal:1000")
    logstash3, _ := net.Dial("tcp", "logstash.eu-west-3c.internal:1000")

    logger := slog.New(
        slogmulti.Pool()(
            // a random handler will be picked
            slog.HandlerOptions{}.NewJSONHandler(logstash1, nil),
            slog.HandlerOptions{}.NewJSONHandler(logstash2, nil),
            slog.HandlerOptions{}.NewJSONHandler(logstash3, nil),
        ),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        With("error", fmt.Errorf("an error")).
        Error("A message")
}

Recover errors: slog.RecoverHandlerError()

Returns a slog.Handler that recovers from panics or error of the chain of handlers.

import (
	slogformatter "github.com/samber/slog-formatter"
	slogmulti "github.com/samber/slog-multi"
	"log/slog"
)

recovery := slogmulti.RecoverHandlerError(
    func(ctx context.Context, record slog.Record, err error) {
        // will be called only if subsequent handlers fail or return an error
        log.Println(err.Error())
    },
)
sink := NewSinkHandler(...)

logger := slog.New(
    slogmulti.
        Pipe(recovery).
        Handler(sink),
)

err := fmt.Errorf("an error")
logger.Error("a message",
    slog.Any("very_private_data", "abcd"),
    slog.Any("user", user),
    slog.Any("err", err))

// outputs:
// time=2023-04-10T14:00:0.000000+00:00 level=ERROR msg="a message" error.message="an error" error.type="*errors.errorString" user="John doe" very_private_data="********"

Chaining: slogmulti.Pipe()

Rewrite log.Record on the fly (eg: for privacy reason).

func main() {
    // first middleware: format go `error` type into an object {error: "*myCustomErrorType", message: "could not reach https://a.b/c"}
    errorFormattingMiddleware := slogmulti.NewHandleInlineMiddleware(errorFormattingMiddleware)

    // second middleware: remove PII
    gdprMiddleware := NewGDPRMiddleware()

    // final handler
    sink := slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{})

    logger := slog.New(
        slogmulti.
            Pipe(errorFormattingMiddleware).
            Pipe(gdprMiddleware).
            // ...
            Handler(sink),
    )

    logger.
        With(
            slog.Group("user",
                slog.String("id", "user-123"),
                slog.String("email", "user-123"),
                slog.Time("created_at", time.Now()),
            ),
        ).
        With("environment", "dev").
        Error("A message",
            slog.String("foo", "bar"),
            slog.Any("error", fmt.Errorf("an error")),
        )
}

Stderr output:

{
    "time":"2023-04-10T14:00:0.000000+00:00",
    "level":"ERROR",
    "msg":"A message",
    "user":{
        "id":"*******",
        "email":"*******",
        "created_at":"*******"
    },
    "environment":"dev",
    "foo":"bar",
    "error":{
        "type":"*myCustomErrorType",
        "message":"an error"
    }
}

Custom middleware

Middleware must match the following prototype:

type Middleware func(slog.Handler) slog.Handler

The example above uses:

Note: WithAttrs and WithGroup methods of custom middleware must return a new instance, instead of this.

Inline handler

An "inline handler" (aka. lambda), is a shortcut to implement slog.Handler, that hooks a single method and proxies others.

mdw := slogmulti.NewHandleInlineHandler(
    // simulate "Handle()"
    func(ctx context.Context, groups []string, attrs []slog.Attr, record slog.Record) error {
        // [...]
        return nil
    },
)
mdw := slogmulti.NewInlineHandler(
    // simulate "Enabled()"
    func(ctx context.Context, groups []string, attrs []slog.Attr, level slog.Level) bool {
        // [...]
        return true
    },
    // simulate "Handle()"
    func(ctx context.Context, groups []string, attrs []slog.Attr, record slog.Record) error {
        // [...]
        return nil
    },
)

Inline middleware

An "inline middleware" (aka. lambda), is a shortcut to implement middleware, that hooks a single method and proxies others.

// hook `logger.Enabled` method
mdw := slogmulti.NewEnabledInlineMiddleware(func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
    // [...]
    return next(ctx, level)
})
// hook `logger.Handle` method
mdw := slogmulti.NewHandleInlineMiddleware(func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error {
    // [...]
    return next(ctx, record)
})
// hook `logger.WithAttrs` method
mdw := slogmulti.NewWithAttrsInlineMiddleware(func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler{
    // [...]
    return next(attrs)
})
// hook `logger.WithGroup` method
mdw := slogmulti.NewWithGroupInlineMiddleware(func(name string, next func(string) slog.Handler) slog.Handler{
    // [...]
    return next(name)
})

A super inline middleware that hooks all methods.

Warning: you would rather implement your own middleware.

mdw := slogmulti.NewInlineMiddleware(
    func(ctx context.Context, level slog.Level, next func(context.Context, slog.Level) bool) bool{
        // [...]
        return next(ctx, level)
    },
    func(ctx context.Context, record slog.Record, next func(context.Context, slog.Record) error) error{
        // [...]
        return next(ctx, record)
    },
    func(attrs []slog.Attr, next func([]slog.Attr) slog.Handler) slog.Handler{
        // [...]
        return next(attrs)
    },
    func(name string, next func(string) slog.Handler) slog.Handler{
        // [...]
        return next(name)
    },
)

🀝 Contributing

Don't hesitate ;)

# Install some dev dependencies
make tools

# Run tests
make test
# or
make watch-test

πŸ‘€ Contributors

Contributors

πŸ’« Show your support

Give a ⭐️ if this project helped you!

GitHub Sponsors

πŸ“ License

Copyright Β© 2023 Samuel Berthe.

This project is MIT licensed.