Skip to content
This repository has been archived by the owner on Jan 8, 2024. It is now read-only.

Stream entrypoint logs, distinguish app vs system logs, colorize #855

Merged
merged 8 commits into from
Nov 30, 2020
20 changes: 17 additions & 3 deletions internal/ceb/ceb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint/internal/pkg/gatedwriter"
"github.com/hashicorp/waypoint/internal/server"
pb "github.com/hashicorp/waypoint/internal/server/gen"
"github.com/hashicorp/waypoint/internal/version"
Expand All @@ -37,10 +38,20 @@ const (
type CEB struct {
id string
deploymentId string
logger hclog.Logger
context context.Context
execIdx int64

// logger is the logger that should be used internally. Log messages
// sent here with the proper log level (Info or higher) will also be
// streamed to the server.
logger hclog.Logger

// logCh can be sent entries that will be sent to the server. If the
// server connection is severed or too many entries are sent, some may
// be dropped but the channel should always be consumed.
logCh chan *pb.LogBatch_Entry
logGatedWriter *gatedwriter.Writer

// clientMu must be held anytime reading/writing client. internally
// you probably want to use waitClient() instead of this directly.
clientMu sync.Mutex
Expand All @@ -61,7 +72,8 @@ type CEB struct {
// FIRST child command is ready to be started. This can be closed before
// any command is sent to childCmdCh. It indicates that the child process
// watcher can begin executing.
childReadyCh chan struct{}
childReadySent uint32
mitchellh marked this conversation as resolved.
Show resolved Hide resolved
childReadyCh chan struct{}

// childCmdBase is the base command to use for making any changes to the
// child; use the copyCmd() function to copy this safetly to make changes.
Expand Down Expand Up @@ -92,7 +104,6 @@ func Run(ctx context.Context, os ...Option) error {
// Defaults, initialization
ceb := &CEB{
id: id,
logger: hclog.L(),
context: ctx,
}
ceb.clientCond = sync.NewCond(&ceb.clientMu)
Expand All @@ -107,6 +118,9 @@ func Run(ctx context.Context, os ...Option) error {
}
}

// Setup our system logger
ceb.initSystemLogger()

// We're disabled also if we have no client set and the server address is empty.
// This means we have nothing to connect to.
cfg.disable = cfg.disable || (ceb.client == nil && cfg.ServerAddr == "")
Expand Down
17 changes: 8 additions & 9 deletions internal/ceb/child_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"os/exec"
"path/filepath"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -41,9 +42,8 @@ func (ceb *CEB) initChildCmd(ctx context.Context, cfg *config) error {
// markChildCmdReady will allow watchChildCmd to begin executing commands.
// This should be called once and should not be called concurrently.
func (ceb *CEB) markChildCmdReady() {
if ceb.childReadyCh != nil {
if atomic.CompareAndSwapUint32(&ceb.childReadySent, 0, 1) {
close(ceb.childReadyCh)
ceb.childReadyCh = nil
}
}

Expand All @@ -60,7 +60,7 @@ func (ceb *CEB) watchChildCmd(
// detect this and also exit.
defer close(doneCh)

log := ceb.logger.Named("watchChildCmd")
log := ceb.logger.Named("child")

// We need to wait for readyCh. readyCh is closed by ceb/init.go
// or ceb/config.go when we're ready to begin processing. We have to do
Expand Down Expand Up @@ -88,11 +88,11 @@ func (ceb *CEB) watchChildCmd(
return

case cmd := <-cmdCh:
log.Info("child command received")
log.Debug("child command received")

// If we have an existing process, we need to exit that first.
if currentCh != nil {
log.Info("terminating current child process")
log.Info("terminating current child process for restart")
err := ceb.termChildCmd(log, currentCmd, currentCh, false, false)
if err != nil {
// In the event terminating the child fails, we exit
Expand All @@ -117,7 +117,6 @@ func (ceb *CEB) watchChildCmd(
}

// Start our new command.
log.Info("starting new child process")
currentCh = ceb.startChildCmd(log, cmd)
currentCmd = cmd

Expand Down Expand Up @@ -186,7 +185,7 @@ func (ceb *CEB) termChildCmd(

// If we're not forcing, try a SIGTERM first.
if !force {
log.Info("sending SIGTERM")
log.Debug("sending SIGTERM")
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
log.Warn("error sending SIGTERM, will proceed to SIGKILL", "err", err)
} else {
Expand All @@ -213,7 +212,7 @@ func (ceb *CEB) termChildCmd(
// SIGKILL. We send the signal to the negative value of the pid so
// that it goes to the entire process group, therefore killing all
// grandchildren of our child process as well.
log.Info("sending SIGKILL")
log.Debug("sending SIGKILL")
if err := syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL); err != nil {
log.Warn("error sending SIGKILL", "err", err)
return err
Expand All @@ -227,7 +226,7 @@ func (ceb *CEB) termChildCmd(
// we just set error to nil.
err = nil
}
log.Info("child process exited", "wait_err", err)
log.Debug("child process exited", "wait_err", err)
return err
}

Expand Down
7 changes: 5 additions & 2 deletions internal/ceb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ func (ceb *CEB) handleChildCmdConfig(
return
}

log.Info("env vars changed, sending new child command")
// We don't want to log on the first load.
if len(env) > len(base.Env) || len(last.Env) > len(base.Env) {
log.Info("env vars changed, sending new child command")
}

// Update the env vars
last.Env = env
Expand Down Expand Up @@ -177,7 +180,7 @@ func (ceb *CEB) recvConfig(
}
}

log.Info("new configuration received")
log.Debug("new configuration received")
ch <- resp.Config
}
}
142 changes: 107 additions & 35 deletions internal/ceb/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,73 @@ import (
"context"
"io"
"os"
"strings"

"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/hashicorp/waypoint/internal/pkg/gatedwriter"
pb "github.com/hashicorp/waypoint/internal/server/gen"
)

// initSystemLogger initializes ceb.logger and sets up all the fields
// for streaming system logs to the Waypoint server.
func (ceb *CEB) initSystemLogger() {
// Create an intercept logger with our default options. This will
// behave just like hclog.L() (which we use at the time of writing)
// and let us register additional sinks for streaming and so on.
opts := *hclog.DefaultOptions
opts.Name = "entrypoint"
opts.Level = hclog.Trace
intercept := hclog.NewInterceptLogger(&opts)
nonintercept := hclog.New(&opts)
ceb.logger = intercept

// Create our reader/writer that will send to the server log stream.
r, w := io.Pipe()

// We set the writer as a gated writer so that we can buffer all our
// log messages prior to attempting a log stream connection. Once we
// attempt a log stream connection we flush.
ceb.logGatedWriter = gatedwriter.NewWriter(w)

// Create our channel where we can send logs to. We allow some buffering.
// We then start a goroutine that'll read the logs from this pipe and
// send them to our channel that will eventually get flushed to the server.
entryCh := make(chan *pb.LogBatch_Entry, 30)
ceb.logCh = entryCh
go ceb.logReader(
nonintercept.Named("system_log_streamer"),
r,
pb.LogBatch_Entry_ENTRYPOINT,
)

// Register a sink that will go to the log stream.
intercept.RegisterSink(hclog.NewSinkAdapter(&hclog.LoggerOptions{
Name: "entrypoint",
Level: hclog.Info,
Output: ceb.logGatedWriter,
Color: hclog.ColorOff,
DisableTime: true, // because we calculate it ourselves for streaming
Exclude: ceb.logStreamExclude,
}))
}

func (ceb *CEB) logStreamExclude(level hclog.Level, msg string, args ...interface{}) bool {
if level == hclog.Info {
// We want to exclude some Horizon logs. We don't set the level lower
// because we want the root logs to show up fine we just don't want
// to stream them.
return strings.Contains(msg, "request started") ||
strings.Contains(msg, "request ended")
}

return false
}

func (ceb *CEB) initLogStream(ctx context.Context, cfg *config) error {
log := ceb.logger.Named("log")

Expand All @@ -33,46 +90,18 @@ func (ceb *CEB) initLogStream(ctx context.Context, cfg *config) error {
// read from the pipe the child command will get a SIGPIPE and could
// exit/crash if it doesn't handle it. So even if we don't have a
// connection to the server, we need to be draining the pipe.
entryCh := make(chan *pb.LogBatch_Entry, 30)
go func() {
defer r.Close()
br := bufio.NewReader(r)
for {
line, err := br.ReadString('\n')
if err != nil {
log.Error("error reading logs", "error", err)
return
}

if log.IsTrace() {
log.Trace("sending line", "line", line[:len(line)-1])
}

entry := &pb.LogBatch_Entry{
Timestamp: ptypes.TimestampNow(),
Line: line,
}

// Send the entry. We never block here because blocking the
// pipe is worse. The channel is buffered to help with this.
select {
case entryCh <- entry:
default:
}
}
}()
go ceb.logReader(log, r, pb.LogBatch_Entry_APP)

// Start up our server stream. We do this in a goroutine cause we don't
// want to block the child command startup on it.
go ceb.initLogStreamSender(log, ctx, entryCh)
go ceb.initLogStreamSender(log, ctx)

return nil
}

func (ceb *CEB) initLogStreamSender(
log hclog.Logger,
ctx context.Context,
entryCh <-chan *pb.LogBatch_Entry,
) error {
// wait for initial server connection
serverClient := ceb.waitClient()
Expand All @@ -91,25 +120,31 @@ func (ceb *CEB) initLogStreamSender(
log.Trace("log stream connected")

// NOTE(mitchellh): Lots of improvements we can make here one day:
// - we can coalesce entryCh receives to send less log updates
// - during reconnect we can buffer entryCh receives
// - we can coalesce channel receives to send less log updates
// - during reconnect we can buffer channel receives
go func() {
// Wait for the first notice that the child is ready. This will
// allow us to wait to send any logs until we likely called
// EntrypointConfig.
<-ceb.childReadyCh

for {
var entry *pb.LogBatch_Entry
select {
case <-ctx.Done():
return

case entry = <-entryCh:
case entry = <-ceb.logCh:
}

err := client.Send(&pb.EntrypointLogBatch{
InstanceId: ceb.id,
Lines: []*pb.LogBatch_Entry{entry},
})
if err == io.EOF || status.Code(err) == codes.Unavailable {
log.Error("log stream disconnected from server, attempting reconnect")
err = ceb.initLogStreamSender(log, ctx, entryCh)
log.Error("log stream disconnected from server, attempting reconnect",
"err", err)
err = ceb.initLogStreamSender(log, ctx)
if err == nil {
return
}
Expand All @@ -121,5 +156,42 @@ func (ceb *CEB) initLogStreamSender(
}
}()

// Open the gated writer since we should now start consuming logs.
ceb.logGatedWriter.Flush()

return nil
}

// logReader reads lines from r and sends them to ceb.logCh with the
// proper envelope (pb.LogBatch_Entry). This should be started in a goroutine.
func (ceb *CEB) logReader(
log hclog.Logger,
r io.ReadCloser,
src pb.LogBatch_Entry_Source,
) {
defer r.Close()
br := bufio.NewReader(r)
for {
line, err := br.ReadString('\n')
if err != nil {
log.Error("error reading logs", "error", err)
return
}

if log.IsTrace() {
log.Trace("sending line", "line", line[:len(line)-1])
}
entry := &pb.LogBatch_Entry{
Source: src,
Timestamp: ptypes.TimestampNow(),
Line: line,
}

// Send the entry. We never block here because blocking the
// pipe is worse. The channel is buffered to help with this.
select {
case ceb.logCh <- entry:
default:
}
}
}
Loading