Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make user-supplied sinks operate on URIs #606

Merged
merged 6 commits into from
Jul 19, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ type Config struct {
// EncoderConfig sets options for the chosen encoder. See
// zapcore.EncoderConfig for details.
EncoderConfig zapcore.EncoderConfig `json:"encoderConfig" yaml:"encoderConfig"`
// OutputPaths is a list of paths to write logging output to. See Open for
// OutputPaths is a list of URLs to write logging output to. See Open for
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention that filenames are also supported?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, can do.

// details.
OutputPaths []string `json:"outputPaths" yaml:"outputPaths"`
// ErrorOutputPaths is a list of paths to write internal logger errors to.
// ErrorOutputPaths is a list of URLs to write internal logger errors to.
// The default is standard error.
//
// Note that this setting only affects internal errors; for sample code that
Expand Down
111 changes: 83 additions & 28 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
"sync"

"go.uber.org/zap/zapcore"
)

const schemeFile = "file"

var (
_sinkMutex sync.RWMutex
_sinkFactories map[string]func() (Sink, error)
_sinkFactories map[string]func(*url.URL) (Sink, error) // keyed by scheme
)

func init() {
Expand All @@ -42,18 +46,10 @@ func init() {
func resetSinkRegistry() {
_sinkMutex.Lock()
defer _sinkMutex.Unlock()
_sinkFactories = map[string]func() (Sink, error){
"stdout": func() (Sink, error) { return nopCloserSink{os.Stdout}, nil },
"stderr": func() (Sink, error) { return nopCloserSink{os.Stderr}, nil },
}
}

type errSinkNotFound struct {
key string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for %q", e.key)
_sinkFactories = map[string]func(*url.URL) (Sink, error){
schemeFile: newFileSink,
}
}

// Sink defines the interface to write to and close logger destinations.
Expand All @@ -62,33 +58,92 @@ type Sink interface {
io.Closer
}

// RegisterSink adds a Sink at the given key so it can be referenced
// in config OutputPaths.
func RegisterSink(key string, sinkFactory func() (Sink, error)) error {
type nopCloserSink struct{ zapcore.WriteSyncer }

func (nopCloserSink) Close() error { return nil }

type errSinkNotFound struct {
scheme string
}

func (e *errSinkNotFound) Error() string {
return fmt.Sprintf("no sink found for scheme %q", e.scheme)
}

// RegisterSink registers a user-supplied factory for all sinks with a
// particular scheme.
//
// All schemes must be ASCII, valid under section 3.1 of RFC 3986
// (https://tools.ietf.org/html/rfc3986#section-3.1), and may not already have
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "must not"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed.

// a factory registered. Zap automatically registers a factory for the "file"
// scheme.
func RegisterSink(scheme string, factory func(*url.URL) (Sink, error)) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a breaking change? The factory function didn't accept a callback before.

_sinkMutex.Lock()
defer _sinkMutex.Unlock()
if key == "" {
return errors.New("sink key cannot be blank")

if scheme == "" {
return errors.New("can't register a sink factory for empty string")
}
normalized, err := normalizeScheme(scheme)
if err != nil {
return fmt.Errorf("%q is not a valid scheme: %v", scheme, err)
}
if _, ok := _sinkFactories[key]; ok {
return fmt.Errorf("sink already registered for key %q", key)
if _, ok := _sinkFactories[normalized]; ok {
return fmt.Errorf("sink factory already registered for scheme %q", normalized)
}
_sinkFactories[key] = sinkFactory
_sinkFactories[normalized] = factory
return nil
}

// newSink invokes the registered sink factory to create and return the
// sink for the given key. Returns errSinkNotFound if the key cannot be found.
func newSink(key string) (Sink, error) {
func newSink(rawURL string) (Sink, error) {
u, err := url.Parse(rawURL)
if err != nil {
return nil, fmt.Errorf("can't parse %q as a URL: %v", rawURL, err)
}
if u.Scheme == "" {
u.Scheme = schemeFile
}

_sinkMutex.RLock()
defer _sinkMutex.RUnlock()
sinkFactory, ok := _sinkFactories[key]

factory, ok := _sinkFactories[u.Scheme]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optionally: can we hold the RLock just while we're looking up the factory?

_sinkMutex.RLock()
factory, ok := _sinkFactories[u.Scheme]
_sinkMutex.RUnlock()

Shouldn't be an issue since it's just an RLock, but want to avoid holding on to locks while triggering the factory which is user-supplied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, can do.

if !ok {
return nil, &errSinkNotFound{key}
return nil, &errSinkNotFound{u.Scheme}
}
return sinkFactory()
return factory(u)
}

type nopCloserSink struct{ zapcore.WriteSyncer }
func newFileSink(u *url.URL) (Sink, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional: should we ensure there's no fragments etc? ignoring seems OK too, but it might be a little surprising

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

switch u.Path {
case "stdout":
return nopCloserSink{os.Stdout}, nil
case "stderr":
return nopCloserSink{os.Stderr}, nil
}
return os.OpenFile(u.Path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
}

func (nopCloserSink) Close() error { return nil }
func normalizeScheme(s string) (string, error) {
// https://tools.ietf.org/html/rfc3986#section-3.1
s = strings.ToLower(s)
if first := s[0]; 'a' > first || 'z' < first {
return "", errors.New("must start with a letter")
}
if len(s) < 2 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rest of the code will work fine for len(s) == 1, any reason for this special case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, just left over from previous iterations on the code. My bad.

return s, nil
}
for i := 1; i < len(s); i++ { // iterate over bytes, not runes
c := s[i]
switch {
case 'a' <= c && c <= 'z':
continue
case '0' <= c && c <= '9':
continue
case c == '.' || c == '+' || c == '-':
continue
}
return "", fmt.Errorf("may not contain %q", string(c))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%q works fine for characters, it does single quotes instead of double quotes, which seems fine here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely.

}
return s, nil
}
94 changes: 59 additions & 35 deletions sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,56 +21,80 @@
package zap

import (
"errors"
"os"
"bytes"
"io/ioutil"
"net/url"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.uber.org/zap/zapcore"
)

func TestRegisterSink(t *testing.T) {
tests := []struct {
name string
key string
factory func() (Sink, error)
wantError bool
}{
{"valid", "valid", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, false},
{"empty", "", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
{"stdout", "stdout", func() (Sink, error) { return nopCloserSink{os.Stdout}, nil }, true},
}
const (
memScheme = "m"
nopScheme = "no-op.1234"
)
var memCalls, nopCalls int

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := RegisterSink(tt.key, tt.factory)
if tt.wantError {
assert.NotNil(t, err)
} else {
assert.Nil(t, err)
assert.NotNil(t, _sinkFactories[tt.key], "expected the factory to be present")
}
})
buf := bytes.NewBuffer(nil)
memFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, memScheme, "Scheme didn't match registration.")
memCalls++
return nopCloserSink{zapcore.AddSync(buf)}, nil
}
nopFactory := func(u *url.URL) (Sink, error) {
assert.Equal(t, u.Scheme, nopScheme, "Scheme didn't match registration.")
nopCalls++
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
}

func TestNewSink(t *testing.T) {
defer resetSinkRegistry()
errTestSink := errors.New("test erroring")
err := RegisterSink("errors", func() (Sink, error) { return nil, errTestSink })
assert.Nil(t, err)

require.NoError(t, RegisterSink(strings.ToUpper(memScheme), memFactory), "Failed to register scheme %q.", memScheme)
require.NoError(t, RegisterSink(nopScheme, nopFactory), "Failed to register scheme %q.", memScheme)

sink, close, err := Open(
memScheme+"://somewhere",
nopScheme+"://somewhere-else",
)
assert.NoError(t, err, "Unexpected error opening URLs with registered schemes.")

defer close()

assert.Equal(t, 1, memCalls, "Unexpected number of calls to memory factory.")
assert.Equal(t, 1, nopCalls, "Unexpected number of calls to no-op factory.")

_, err = sink.Write([]byte("foo"))
assert.NoError(t, err, "Failed to write to combined WriteSyncer.")
assert.Equal(t, "foo", buf.String(), "Unexpected buffer contents.")
}

func TestRegisterSinkErrors(t *testing.T) {
nopFactory := func(_ *url.URL) (Sink, error) {
return nopCloserSink{zapcore.AddSync(ioutil.Discard)}, nil
}
tests := []struct {
key string
err error
scheme string
err string
}{
{"stdout", nil},
{"errors", errTestSink},
{"nonexistent", &errSinkNotFound{"nonexistent"}},
{"", "empty string"},
{"FILE", "already registered"},
{"42", "not a valid scheme"},
{"http*", "not a valid scheme"},
}

for _, tt := range tests {
t.Run(tt.key, func(t *testing.T) {
_, err := newSink(tt.key)
assert.Equal(t, tt.err, err)
t.Run("scheme-"+tt.scheme, func(t *testing.T) {
defer resetSinkRegistry()

err := RegisterSink(tt.scheme, nopFactory)
if assert.Error(t, err, "expected error") {
assert.Contains(t, err.Error(), tt.err, "unexpected error")
}
})
}
}
37 changes: 15 additions & 22 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,27 @@
package zap

import (
"fmt"
"io"
"io/ioutil"
"os"

"go.uber.org/zap/zapcore"

"go.uber.org/multierr"
)

// Open is a high-level wrapper that takes a variadic number of paths, opens or
// creates each of the specified files, and combines them into a locked
// Open is a high-level wrapper that takes a variadic number of URLs, opens or
// creates each of the specified resources, and combines them into a locked
// WriteSyncer. It also returns any error encountered and a function to close
// any opened files.
//
// Passing no paths returns a no-op WriteSyncer. The special paths "stdout" and
// Passing no URLs returns a no-op WriteSyncer. URLs without a scheme (e.g.,
// "/var/log/foo.log") are treated as though they had the "file" scheme. With
// no scheme or the explicit "file" scheme, the special paths "stdout" and
// "stderr" are interpreted as os.Stdout and os.Stderr, respectively.
//
// Users and third-party packages may register factories for other schemes
// using RegisterSink.
func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
writers, close, err := open(paths)
if err != nil {
Expand All @@ -48,36 +53,24 @@ func Open(paths ...string) (zapcore.WriteSyncer, func(), error) {
}

func open(paths []string) ([]zapcore.WriteSyncer, func(), error) {
var openErr error
writers := make([]zapcore.WriteSyncer, 0, len(paths))
closers := make([]io.Closer, 0, len(paths))
close := func() {
for _, c := range closers {
c.Close()
}
}

var openErr error
for _, path := range paths {
sink, err := newSink(path)
if err == nil {
// Using a registered sink constructor.
writers = append(writers, sink)
closers = append(closers, sink)
if err != nil {
openErr = multierr.Append(openErr, fmt.Errorf("couldn't open sink %q: %v", path, err))
continue
}
if _, ok := err.(*errSinkNotFound); ok {
// No named sink constructor, use key as path to log file.
f, e := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
openErr = multierr.Append(openErr, e)
if e == nil {
writers = append(writers, f)
closers = append(closers, f)
}
continue
}
// Sink constructor failed.
openErr = multierr.Append(openErr, err)
writers = append(writers, sink)
closers = append(closers, sink)
}

if openErr != nil {
close()
return writers, nil, openErr
Expand Down
Loading