Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove changes to support logging for AI #661

Merged
merged 5 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 25 additions & 127 deletions internal/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,17 @@ import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/zap"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"go.uber.org/zap/zapcore"

"github.com/pkg/errors"
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/stateful/runme/v3/internal/runner/client"
"github.com/stateful/runme/v3/internal/tui"
"github.com/stateful/runme/v3/internal/tui/prompt"
"github.com/stateful/runme/v3/internal/version"
runnerv1 "github.com/stateful/runme/v3/pkg/api/gen/proto/go/runme/runner/v1"
"github.com/stateful/runme/v3/pkg/document"
"github.com/stateful/runme/v3/pkg/document/identity"
Expand All @@ -33,7 +28,7 @@ func getIdentityResolver() *identity.IdentityResolver {
}

func getProject() (*project.Project, error) {
logger, err := getLogger(false, false)
logger, err := getLogger(false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,135 +147,38 @@ func getCodeBlocks() (document.CodeBlocks, error) {
return document.CollectCodeBlocks(node), nil
}

func getLogger(devMode bool, aiLogs bool) (*zap.Logger, error) {
if !fLogEnabled && !aiLogs {
return zap.NewNop(), nil
}

cores := make([]zapcore.Core, 0, 2)
if fLogEnabled {
consoleCore, err := createCoreForConsole(devMode)
if err != nil {
return nil, errors.WithStack(err)
}
cores = append(cores, consoleCore)
}

aiLogFile := ""
if aiLogs {
aiCore, newLogFile, err := createAICoreLogger()
if err != nil {
return nil, errors.WithStack(err)
}
aiLogFile = newLogFile
cores = append(cores, aiCore)
}

if len(cores) == 0 {
func getLogger(devMode bool) (*zap.Logger, error) {
if !fLogEnabled {
return zap.NewNop(), nil
}

// Create a multi-core logger with different encodings
core := zapcore.NewTee(cores...)

// Create the logger
newLogger := zap.New(core)
// Record the caller of the log message
newLogger = newLogger.WithOptions(zap.AddCaller())

versionInfo := version.BaseVersionInfo()

newLogger.Info("Logger initialized", zap.String("versionInfo", versionInfo), zap.Bool("devMode", devMode), zap.Bool("aiLogs", aiLogs), zap.String("aiLogFile", aiLogFile))
return newLogger, nil
}

// createCorForConsole creates a zapcore.Core for console output.
func createCoreForConsole(devMode bool) (zapcore.Core, error) {
if !fLogEnabled {
return zapcore.NewNopCore(), nil
config := zap.Config{
Level: zap.NewAtomicLevelAt(zap.InfoLevel),
Development: false,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: "json",
EncoderConfig: zap.NewProductionEncoderConfig(),
OutputPaths: []string{"stderr"},
ErrorOutputPaths: []string{"stderr"},
}

encoderConfig := zap.NewProductionEncoderConfig()
lvl := zap.NewAtomicLevelAt(zap.InfoLevel)

if devMode {
lvl = zap.NewAtomicLevelAt(zap.DebugLevel)
encoderConfig = zap.NewDevelopmentEncoderConfig()
config.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
config.Development = true
config.Encoding = "console"
config.EncoderConfig = zap.NewDevelopmentEncoderConfig()
}

path := "stderr"
if fLogFilePath != "" {
path = fLogFilePath
}

oFile, _, err := zap.Open(path)
if err != nil {
return nil, errors.Wrapf(err, "could not create writer for console logger")
}

var encoder zapcore.Encoder
if devMode {
encoder = zapcore.NewConsoleEncoder(encoderConfig)
} else {
encoder = zapcore.NewJSONEncoder(encoderConfig)
config.OutputPaths = []string{fLogFilePath}
config.ErrorOutputPaths = []string{fLogFilePath}
}

core := zapcore.NewCore(encoder, zapcore.AddSync(oFile), lvl)

if !devMode {
// For non-dev mode, add sampling.
core = zapcore.NewSamplerWithOptions(
core,
time.Second,
100,
100,
)
}
return core, nil
}

// createAICoreLogger creates a core logger that writes logs to files. These logs are always written in JSON
// format. Their purpose is to capture AI traces that we use for retraining. Since these are supposed to be machine
// readable they are always written in JSON format.
func createAICoreLogger() (zapcore.Core, string, error) {
// Configure encoder for JSON format
c := zap.NewProductionEncoderConfig()
// We attach the function key to the logs because that is useful for identifying the function that generated the log.
c.FunctionKey = "function"

jsonEncoder := zapcore.NewJSONEncoder(c)

configDir := getConfigDir()
if configDir == "" {
return nil, "", errors.New("could not determine config directory")
}
logDir := filepath.Join(configDir, "logs")
if _, err := os.Stat(logDir); os.IsNotExist(err) {
// Logger won't be setup yet so we can't use it.
if _, err := fmt.Fprintf(os.Stdout, "Creating log directory %s\n", logDir); err != nil {
return nil, "", errors.Wrapf(err, "could not write to stdout")
}
err := os.MkdirAll(logDir, 0o750)
if err != nil {
return nil, "", errors.Wrapf(err, "could not create log directory %s", logDir)
}
}

// We need to set a unique file name for the logs as a way of dealing with log rotation.
name := fmt.Sprintf("logs.%s.json", time.Now().Format("2006-01-02T15:04:05"))
logFile := filepath.Join(logDir, name)

// TODO(jeremy): How could we handle invoking the log closer if there is one.
oFile, _, err := zap.Open(logFile)
if err != nil {
return nil, logFile, errors.Wrapf(err, "could not open log file %s", logFile)
}

// Force log level to be at least info. Because info is the level at which we capture the logs we need for
// tracing.
core := zapcore.NewCore(jsonEncoder, zapcore.AddSync(oFile), zapcore.InfoLevel)

return core, logFile, nil
l, err := config.Build()
return l, errors.WithStack(err)
}

func validCmdNames(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
Expand Down
7 changes: 4 additions & 3 deletions internal/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ The kernel is used to run long running processes like shells and interacting wit
}
},
RunE: func(cmd *cobra.Command, args []string) error {
logger, err := getLogger(devMode, enableAILogs)
logger, err := getLogger(devMode)
if err != nil {
return err
}
Expand Down Expand Up @@ -139,10 +139,11 @@ The kernel is used to run long running processes like shells and interacting wit
cmd.Flags().StringVarP(&addr, "address", "a", defaultAddr, "Address to create unix (unix:///path/to/socket) or IP socket (localhost:7890)")
cmd.Flags().BoolVar(&devMode, "dev", false, "Enable development mode")
cmd.Flags().BoolVar(&enableRunner, "runner", true, "Enable runner service (legacy, defaults to true)")
// The AIFlag is no longer used, we can remove it as soon as the option has been removed from the frontend.
cmd.Flags().BoolVar(&enableAILogs, "ai-logs", false, "Enable logs to support training an AI")
cmd.Flags().StringVar(&tlsDir, "tls", defaultTLSDir, "Directory in which to generate TLS certificates & use for all incoming and outgoing messages")
cmd.Flags().StringVar(&configDir, configDirF, GetUserConfigHome(), "If ai logs is enabled logs will be written to ${config-dir}/logs")
cmd.Flags().StringVar(&configDir, configDirF, GetUserConfigHome(), "Sets the configuration directory.")
_ = cmd.Flags().MarkHidden("runner")

_ = cmd.Flags().MarkDeprecated("ai-logs", "This flag is no longer used.")
return &cmd
}
4 changes: 1 addition & 3 deletions internal/runner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,15 +229,13 @@ func (r *runnerService) Execute(srv runnerv1.RunnerService_ExecuteServer) error
}
ctx := commandpkg.ContextWithExecutionInfo(srv.Context(), execInfo)

// We want to always log the request because it is used for AI training.
// see: https://github.com/stateful/runme/issues/574
if req.KnownId != "" {
logger = logger.With(zap.String("knownID", req.KnownId))
}
if req.KnownName != "" {
logger = logger.With(zap.String("knownName", req.KnownName))
}
logger.Info("received initial request", zap.Any("req", zapProto(req, logger)))
logger.Debug("received initial request", zap.Any("req", zapProto(req, logger)))

createSession := func(envs []string) (*Session, error) {
// todo(sebastian): owl store?
Expand Down
68 changes: 0 additions & 68 deletions internal/runner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,17 @@ package runner
import (
"bytes"
"context"
"encoding/json"
"io"
"net"
"os"
"os/exec"
"runtime"
"strconv"
"strings"
"syscall"
"testing"
"time"

"github.com/pkg/errors"
"github.com/stateful/runme/v3/internal/ulid"
"google.golang.org/protobuf/encoding/protojson"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand Down Expand Up @@ -126,52 +121,6 @@ func getExecuteResult(
resultc <- result
}

type LogEntry struct {
Msg string `json:"msg"`
ID string `json:"_id"`
KnownID string `json:"knownID"`
KnownName string `json:"knownName"`
Req *json.RawMessage `json:"req"`
}

// readLogMessages reads the log messages
func readLogMessages() ([]*LogEntry, error) {
messages := make([]*LogEntry, 0, 100)
// Flush the log messages
if err := logger.Sync(); err != nil {
ignoreError := false
// N.B. we get a bad file descriptor error when calling Sync on a logger writing to stderr
// We can just ignore that.
if pathErr, ok := err.(*os.PathError); ok && pathErr.Err == syscall.EBADF {
ignoreError = true
}
if strings.Contains(err.Error(), "/dev/stderr") {
ignoreError = true
}
if !ignoreError {
return messages, errors.Wrapf(err, "failed to sync logger")
}
}

// Read the log messages
b, err := os.ReadFile(logFile)
if err != nil {
return messages, errors.Wrapf(err, "failed to read log file")
}

dec := json.NewDecoder(bytes.NewReader(b))
for {
var entry LogEntry
if err := dec.Decode(&entry); err != nil {
if errors.Is(err, io.EOF) {
return messages, nil
}
return messages, errors.Wrapf(err, "failed to decode log entry")
}
messages = append(messages, &entry)
}
}

func Test_runnerService(t *testing.T) {
lis, stop := testStartRunnerServiceServer(t)
t.Cleanup(stop)
Expand Down Expand Up @@ -234,23 +183,6 @@ func Test_runnerService(t *testing.T) {
assert.NoError(t, result.Err)
assert.Equal(t, "1\n2\n", string(result.Stdout))
assert.EqualValues(t, 0, result.ExitCode)

// Ensure there is a log message with the request
messages, err := readLogMessages()
require.NoError(t, err)

var loggedReq *runnerv1.ExecuteRequest

for _, msg := range messages {
if msg.KnownID == knownID && msg.Req != nil {
loggedReq = &runnerv1.ExecuteRequest{}
err := protojson.Unmarshal(*msg.Req, loggedReq)
require.NoError(t, err)
assert.Equal(t, "bash", loggedReq.ProgramName)
assert.Equal(t, []string{"echo 1", "sleep 1", "echo 2"}, loggedReq.Commands)
break
}
}
})

t.Run("ExecuteBasicTempFile", func(t *testing.T) {
Expand Down
Loading