Skip to content

Commit

Permalink
Add a zap sink to write to Cloud Logging
Browse files Browse the repository at this point in the history
* This makes it easy to send logs directly to Cloud Logging.
* Useful when you are running locally.
  • Loading branch information
jlewi committed May 4, 2024
1 parent 401afe1 commit e7b57e5
Show file tree
Hide file tree
Showing 5 changed files with 400 additions and 1 deletion.
88 changes: 88 additions & 0 deletions gcp/logging/links.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package logging

import (
"fmt"
"net/url"
"sort"
"strings"
"time"
)

// GetLink returns a link to the Cloud Logging console that shows logs matching the given labels.
// The link isn't stable because it doesn't pin the time.
func GetLink(project string, labels map[string]string) string {
labels["resource.labels.project_id"] = project

path := "https://console.cloud.google.com/logs/query"
path += ";query=" + buildLabelsQuery(labels)

query := url.Values{
"project": []string{project},
}

return path + "?" + query.Encode()
}

func buildLabelsQuery(labels map[string]string) string {
// We want the names to appear in sorted order in the link so the link is deterministic.
names := make([]string, 0, len(labels))
for n := range labels {
names = append(names, n)
}
sort.Strings(names)

labelsQuery := []string{}
for _, n := range names {
labelsQuery = append(labelsQuery, fmt.Sprintf(`%s="%s"`, n, labels[n]))
}
return url.QueryEscape(strings.Join(labelsQuery, "\n"))
}

// GetLinkAroundTime returns a link to the logs query pinned at the specified time.
// It will show a time window specified by the duration around the time.
// Duration is rounded to the nearest second, minute or hour depending on its size
func GetLinkAroundTime(project string, labels map[string]string, t time.Time, duration time.Duration) string {
labels["resource.labels.project_id"] = project

path := "https://console.cloud.google.com/logs/query"
path += ";query=" + buildLabelsQuery(labels)

// Pin to a time window centered at time t and +- duration. This creates a stable link to the log entries
size, units := roundDuration(duration)
window := fmt.Sprintf("PT%2.f%s", size, units)

// CursorTimestamp controls the cursor position in the UI; if we don't set it it complains about
// the query being invalid when you scroll down.
path += ";cursorTimestamp=" + t.Format(time.RFC3339)
path += ";aroundTime=" + t.Format(time.RFC3339)
path += ";duration=" + url.QueryEscape(window)

query := url.Values{
"project": []string{project},
}

// Return the URL as a string
return path + "?" + query.Encode()
}

// roundDuration rounds the duration to the time and units used in cloud logging queries
func roundDuration(duration time.Duration) (float64, string) {
size := 1.0
units := "H"
if duration >= time.Hour {
size = duration.Round(time.Hour).Hours()
units = "H"
}

if duration < time.Hour && duration >= 5*time.Minute {
size = duration.Round(time.Minute).Minutes()
units = "M"
}

if duration < 5*time.Minute {
size = duration.Round(time.Second).Seconds()
units = "S"
}

return size, units
}
144 changes: 144 additions & 0 deletions gcp/logging/sink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package logging

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net/url"
"time"

"cloud.google.com/go/logging"
"github.com/pkg/errors"
"go.uber.org/zap"
)

const (
Scheme = "gcplogs"
SeverityField = "severity"
TimeField = "time"

// TraceField is the field Google Cloud Logging looks for the
// trace https://cloud.google.com/logging/docs/structured-logging
TraceField = "logging.googleapis.com/trace"
)

// RegisterSink registers Sink as a zap sink; this allows zap to send logs to Cloud Logging.
//
// project is the GCP project
// name is the name of the log to use
// labels are labels to add to every log entry
func RegisterSink(project string, name string, labels map[string]string) error {
var ctx = context.Background() // Sets your Google Cloud Platform project ID.

client, err := logging.NewClient(ctx, project)
if err != nil {
log.Fatalf("Failed to create Cloud Logging client: %v", err)
}

logger := client.Logger(name, logging.CommonLabels(labels))

sink := &Sink{
Client: client,
Logger: logger,
}

return zap.RegisterSink(Scheme, func(u *url.URL) (zap.Sink, error) {
return sink, nil
})
}

// Sink implements zap.Sink interface. This lets zap send logs to Cloud Logging.
//
// To use the sink:
//
// 1. Call RegisterSink; this will register register the sink with zap
//
// 2. Create a zap logger configuration in which the output path uses the URL
// gcplogs:///projects/${PROJECT}/logs/${LOGNAME}
//
// For example:
// c. := zap.NewProductionConfig()
// c.OutputPaths = []string{"gcplogs:///projects/${PROJECT}/logs/${LOGNAME}", "stdout"}
type Sink struct {
// Set client if you want the sink to take ownership of the client and call close
// If you don't then you must call Client.Close to flush the logs
Client *logging.Client
Logger *logging.Logger
}

func (s *Sink) Write(in []byte) (n int, err error) {
reader := bytes.NewReader(in)
// Create a scanner to read the data line by line
scanner := bufio.NewScanner(reader)

bytesRead := 0

for scanner.Scan() {
line := scanner.Bytes()

payload := map[string]interface{}{}

entry := logging.Entry{}

if err := json.Unmarshal(line, &payload); err == nil {
entry.Payload = payload

// We need to explicitly copy special fields out of the arbitrary payload into the Cloud Logging fields.
// Otherwise that won't receive special treatment and show up in the UI.
// For example, we need to explicitly copy the severity field so that in Cloud Logging we can filter
// by the severity field.
if severityVal, ok := payload[SeverityField]; ok {
if severity, ok := severityVal.(string); ok {
entry.Severity = logging.ParseSeverity(severity)
}
}

if timeInterface, ok := payload[TimeField]; ok {
if timeVal, ok := timeInterface.(float64); ok {
// We need to convert the float timestamp into a unix timestamp
seconds := int64(timeVal)
fractional := timeVal - float64(seconds)
nanoseconds := int64(fractional * 1e9)

entry.Timestamp = time.Unix(seconds, nanoseconds)
}
}

// If the trace field is present we need to copy it out of the payload and into the entry and delete
// the entry in the payload.
if traceVal, ok := payload[TraceField]; ok {
entry.Trace = traceVal.(string)
delete(payload, TraceField)
}
} else {
entry.Payload = string(line)
}
// Log sends the records to GCP asynchronously; its non-blocking
s.Logger.Log(entry)
bytesRead += len(line)
// N.B. The newline gets stripped so we need to add 1
bytesRead += 1
}

if err := scanner.Err(); err != nil {
return bytesRead, err
}

if bytesRead != len(in) {
return bytesRead, errors.New(fmt.Sprintf("Unexpect number of bytes read. Expected to read %d bytes but only read %d", len(in), bytesRead))
}

return bytesRead, nil
}

func (s *Sink) Close() error {
return s.Client.Close()
}

// Sync flushes any buffers in the logger.
func (s *Sink) Sync() error {
return s.Logger.Flush()
}
164 changes: 164 additions & 0 deletions gcp/logging/sink_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package logging

import (
"context"
"fmt"
"log"
"os"
"strings"
"testing"
"time"

"github.com/google/uuid"

"cloud.google.com/go/logging"
"cloud.google.com/go/logging/logadmin"
"github.com/go-logr/logr"
"github.com/go-logr/zapr"
"github.com/pkg/browser"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/api/iterator"
)

const (
testLogName = "sinkTest"
runIDLabel = "runID"
)

func Test_Sink(t *testing.T) {
if os.Getenv("GITHUB_ACTIONS") != "" {
t.Skipf("Test is skipped in GitHub actions")
}
project := "foyle-dev"

runID := uuid.NewString()
labels := map[string]string{
"testLabel": "testValue",
runIDLabel: runID,
}

if err := RegisterSink(project, testLogName, labels); err != nil {
t.Fatalf("Failed to register the sink: %v", err)
}

logr, err := newLogger()
if err != nil {
t.Fatalf("Failed to create the logger: %v", err)
}

logr.Info("latest message", "field1", "value")

logr.Error(errors.New("some error"), "Try writing an error", "field1", "value")

queryLabels := map[string]string{}
for k, v := range labels {
// Need to prepend "labels."
queryLabels["labels."+k] = v
}
link := GetLink(project, queryLabels)

t.Logf("Stackdriver link: %v", link)

// Flush logs before exiting
// https://pkg.go.dev/go.uber.org/zap#Logger.Sync
if err := zap.L().Sync(); err != nil {
// Ignore any errors about sync'ing stdout
if !strings.Contains(err.Error(), "sync /dev/stdout") {
t.Fatalf("Could not sync logs: %v", err)
}
}

// Set this to true if you want to open the link in a browser
if false {
if err := browser.OpenURL(link); err != nil {
t.Errorf("Could not open URL: %v", err)
}
}

adminClient, err := logadmin.NewClient(context.Background(), project)

if err != nil {
log.Fatalf("Failed to create logadmin client: %v", err)
}
defer adminClient.Close()

numExpected := 2
entries, err := readLogs(adminClient, project, runID, numExpected, 5*time.Minute)

if err != nil {
t.Fatalf("Error getting entries: %v", err)
}

for _, entry := range entries {
t.Logf("Entry: %v", entry)
}

// N.B. the diagnostic log entry that shows up in the log doesn't show up here because it uses a different log name
// projects/${PROJECT}/logs/diagnostic-log"
if len(entries) != numExpected {
t.Fatalf("Incorrect number of log entries; want %d got %d", numExpected, len(entries))
}
}

func newLogger() (logr.Logger, error) {
// We need to use a production config because we want to use the JSON encoder
c := zap.NewProductionConfig()
// Configure the encoder to use the fields Cloud Logging expects.
// https://cloud.google.com/logging/docs/structured-logging
c.EncoderConfig.LevelKey = SeverityField
c.EncoderConfig.TimeKey = TimeField
c.EncoderConfig.MessageKey = "message"
c.Level = zap.NewAtomicLevelAt(zap.DebugLevel)

stackdriver := Scheme + ":///loggingsink/test"
c.OutputPaths = []string{"stdout", stackdriver}

newLogger, err := c.Build()
if err != nil {
panic(fmt.Sprintf("Failed to build zap logger; error %v", err))
}

zap.ReplaceGlobals(newLogger)

logR := zapr.NewLogger(newLogger)
return logR, nil
}

// readLogs reads the most recent log entries.
func readLogs(client *logadmin.Client, projectID string, runID string, minExpected int, timeout time.Duration) ([]*logging.Entry, error) {
ctx := context.Background()

endTime := time.Now().Add(timeout)
pollTime := 10 * time.Second
for {
var entries []*logging.Entry

iter := client.Entries(ctx,
// Get the log entries for this run.
logadmin.Filter(fmt.Sprintf(`logName = "projects/%s/logs/%s" AND labels.%s = "%s"`, projectID, testLogName, runIDLabel, runID)),
// Get most recent entries first.
logadmin.NewestFirst(),
)

// Fetch the most recent entries.
for len(entries) < 2*minExpected {
entry, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
entries = append(entries, entry)
}
if len(entries) >= minExpected {
return entries, nil
}

if time.Now().Add(pollTime).After(endTime) {
return nil, errors.New("Timed out waiting for entries")
}
time.Sleep(pollTime)
}
}
Loading

0 comments on commit e7b57e5

Please sign in to comment.