Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Render captive core logs as Horizon logs #3189

Merged
merged 18 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions ingest/ledgerbackend/captive_core_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/pkg/errors"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -88,6 +89,9 @@ type CaptiveStellarCore struct {
// waitIntervalPrepareRange defines a time to wait between checking if the buffer
// is empty. Default 1s, lower in tests to make them faster.
waitIntervalPrepareRange time.Duration

// Optionally, pass along a custom logger to the underlying runner.
log *log.Entry
}

// NewCaptive returns a new CaptiveStellarCore.
Expand All @@ -105,17 +109,27 @@ func NewCaptive(executablePath, configPath, networkPassphrase string, historyURL
return nil, errors.Wrap(err, "error connecting to history archive")
}

return &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
stellarCoreRunnerFactory: func(configPath2 string) (stellarCoreRunnerInterface, error) {
return newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
},
c := &CaptiveStellarCore{
archive: archive,
executablePath: executablePath,
configPath: configPath,
historyURLs: historyURLs,
networkPassphrase: networkPassphrase,
waitIntervalPrepareRange: time.Second,
}, nil
}
c.stellarCoreRunnerFactory = func(configPath2 string) (stellarCoreRunnerInterface, error) {
runner, innerErr := newStellarCoreRunner(executablePath, configPath2, networkPassphrase, historyURLs)
if innerErr != nil {
return runner, innerErr
}
runner.setLogger(c.log)
return runner, nil
}
return c, nil
}

func (c *CaptiveStellarCore) SetStellarCoreLogger(logger *log.Entry) {
c.log = logger
}

func (c *CaptiveStellarCore) getLatestCheckpointSequence() (uint32, error) {
Expand Down
3 changes: 3 additions & 0 deletions ingest/ledgerbackend/captive_core_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -53,6 +54,8 @@ func (m *stellarCoreRunnerMock) close() error {
return a.Error(0)
}

func (m *stellarCoreRunnerMock) setLogger(*log.Entry) {}

func buildLedgerCloseMeta(sequence uint32) xdr.LedgerCloseMeta {
opResults := []xdr.OperationResult{}
opMeta := []xdr.OperationMeta{}
Expand Down
50 changes: 45 additions & 5 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/pkg/errors"
"github.com/stellar/go/support/log"
)

type stellarCoreRunnerInterface interface {
Expand All @@ -25,6 +26,7 @@ type stellarCoreRunnerInterface interface {
getProcessExitChan() <-chan struct{}
// getProcessExitError returns an exit error of the process, can be nil
getProcessExitError() error
setLogger(*log.Entry)
close() error
}

Expand All @@ -39,13 +41,17 @@ type stellarCoreRunner struct {
shutdown chan struct{}

cmd *exec.Cmd

// processExit channel receives an error when the process exited with an error
// or nil if the process exited without an error.
processExit chan struct{}
processExitError error
metaPipe io.Reader
tempDir string
nonce string

// Optionally, logging can be done to something other than stdout.
Log *log.Entry
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
}

func newStellarCoreRunner(executablePath, configPath, networkPassphrase string, historyURLs []string) (*stellarCoreRunner, error) {
Expand Down Expand Up @@ -111,22 +117,52 @@ func (r *stellarCoreRunner) getConfFileName() string {
return filepath.Join(r.tempDir, "stellar-core.conf")
}

func (*stellarCoreRunner) getLogLineWriter() io.Writer {
r, w := io.Pipe()
br := bufio.NewReader(r)
func (r *stellarCoreRunner) getLogLineWriter() io.Writer {
rd, wr := io.Pipe()
br := bufio.NewReader(rd)

// Strip timestamps from log lines from captive stellar-core. We emit our own.
dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `)
go func() {
levelRx := regexp.MustCompile(`G[A-Z]{4} \[(\w+) ([A-Z]+)\] (.*)`)
for {
line, err := br.ReadString('\n')
if err != nil {
break
}
line = dateRx.ReplaceAllString(line, "")
fmt.Print(line)

// If there's a logger, we attempt to extract metadata about the log
// entry, then redirect it to the logger. Otherwise, we just use stdout.
if r.Log == nil {
fmt.Print(line)
continue
}

matches := levelRx.FindStringSubmatch(line)
if len(matches) >= 4 {
// Extract the substrings from the log entry and trim it
category, level := matches[1], matches[2]
line = matches[3]

levelMapping := map[string]func(string, ...interface{}){
"FATAL": r.Log.Errorf,
"ERROR": r.Log.Errorf,
"WARNING": r.Log.Warnf,
"INFO": r.Log.Infof,
}

if writer, ok := levelMapping[strings.ToUpper(level)]; ok {
writer("%s: %s", category, line)
} else {
r.Log.Infof(line)
}
} else {
r.Log.Infof(line)
}
}
}()
return w
return wr
}

// Makes the temp directory and writes the config file to it; called by the
Expand Down Expand Up @@ -224,6 +260,10 @@ func (r *stellarCoreRunner) getProcessExitError() error {
return r.processExitError
}

func (r *stellarCoreRunner) setLogger(logger *log.Entry) {
r.Log = logger
}

func (r *stellarCoreRunner) close() error {
var err1, err2 error

Expand Down
7 changes: 5 additions & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func NewSystem(config Config) (System, error) {
return nil, errors.Wrap(err, "error creating captive core backend")
}
} else {
//
ledgerBackend, err = ledgerbackend.NewCaptive(
var captiveCoreBackend *ledgerbackend.CaptiveStellarCore
captiveCoreBackend, err = ledgerbackend.NewCaptive(
config.StellarCoreBinaryPath,
config.StellarCoreConfigPath,
config.NetworkPassphrase,
Expand All @@ -182,6 +182,9 @@ func NewSystem(config Config) (System, error) {
cancel()
return nil, errors.Wrap(err, "error creating captive core backend")
}
captiveCoreBackend.SetStellarCoreLogger(
log.WithField("subservice", "stellar-core"))
ledgerBackend = captiveCoreBackend
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can leave the old code and write:

ledgerBackend.(*ledgerbackend.CaptiveStellarCore).SetLogger(log)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though less lines, is it cleaner? As someone newish to Go, it's quite a bit harder to read, especially with the new change about name + subservice, it becomes:

ledgerBackend.(*ledgerbackend.CaptiveStellarCore).SetStellarCoreLogger(log.WithField("subservice", "stellar-core"))

😮

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right!

}
} else {
coreSession := config.CoreSession.Clone()
Expand Down