Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slick-badgers-behave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#added Wire up OTel logs streaming, integrate chainlink logger with otel
18 changes: 13 additions & 5 deletions core/cmd/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,20 +261,28 @@ func NewApp(s *Shell) *cli.App {
return err
}

// Configure a new logger with OTel atomic core support
lggrCfg := logger.Config{
LogLevel: s.Config.Log().Level(),
Dir: s.Config.Log().File().Dir(),
JsonConsole: s.Config.Log().JSONConsole(),
UnixTS: s.Config.Log().UnixTimestamps(),
LogLevel: s.Config.Log().Level(),
Dir: s.Config.Log().File().Dir(),
JsonConsole: s.Config.Log().JSONConsole(),
UnixTS: s.Config.Log().UnixTimestamps(),
//nolint:gosec // filemaxsizesmb won't exceed max int
FileMaxSizeMB: int(logFileMaxSizeMB),
FileMaxAgeDays: int(s.Config.Log().File().MaxAgeDays()),
FileMaxBackups: int(s.Config.Log().File().MaxBackups()),
SentryEnabled: s.Config.Sentry().DSN() != "",
}
l, closeFn := lggrCfg.New()

// Noop atomic core that can be swapped out later for OTel support
atomicCore := logger.NewAtomicCore()

l, closeFn := lggrCfg.NewWithCores(atomicCore)

s.Logger = l
s.CloseLogger = closeFn
// s.SetOtelCore is a hook that can be used to set the OTel core
s.SetOtelCore = atomicCore.Store

return nil
},
Expand Down
53 changes: 17 additions & 36 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/llo"
"github.com/smartcontractkit/chainlink/v2/core/services/llo/retirement"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
Expand All @@ -59,7 +60,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/sessions"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/store/migrate"
"github.com/smartcontractkit/chainlink/v2/core/utils"
"github.com/smartcontractkit/chainlink/v2/core/web"
)

Expand Down Expand Up @@ -139,11 +139,9 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme
return err
}

var (
// ErrorNoAPICredentialsAvailable is returned when not run from a terminal
// and no API credentials have been provided
ErrorNoAPICredentialsAvailable = errors.New("API credentials must be supplied")
)
// ErrNoAPICredentialsAvailable is returned when not run from a terminal
// and no API credentials have been provided
var ErrNoAPICredentialsAvailable = errors.New("API credentials must be supplied")

// Shell for the node, local commands and remote commands.
type Shell struct {
Expand All @@ -152,6 +150,7 @@ type Shell struct {
Logger logger.Logger // initialized in Before
Registerer prometheus.Registerer // initialized in Before
CloseLogger func() error // called in After
SetOtelCore func(*zapcore.Core) // reference to AtomicCore.Store
AppFactory AppFactory
KeyStoreAuthenticator TerminalKeyStoreAuthenticator
FallbackAPIInitializer APIInitializer
Expand All @@ -167,6 +166,12 @@ type Shell struct {
configFilesIsSet bool
secretsFiles []string
secretsFileIsSet bool

LDB pg.LockedDB // initialized in BeforeNode
DS sqlutil.DataSource // initialized in BeforeNode
KeyStore keystore.Master // initialized in BeforeNode

CleanupOnce sync.Once // ensures cleanup happens exactly once
}

func (s *Shell) errorOut(err error) cli.ExitCoder {
Expand All @@ -190,42 +195,19 @@ func (s *Shell) configExitErr(validateFn func() error) cli.ExitCoder {

// AppFactory implements the NewApplication method.
type AppFactory interface {
NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error)
NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, ds sqlutil.DataSource, keyStore keystore.Master) (chainlink.Application, error)
}

// ChainlinkAppFactory is used to create a new Application.
type ChainlinkAppFactory struct{}

// NewApplication returns a new instance of the node with the given config.
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) {
func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, ds sqlutil.DataSource, keyStore keystore.Master) (app chainlink.Application, err error) {
err = migrate.SetMigrationENVVars(cfg.EVMConfigs())
if err != nil {
return nil, err
}

err = handleNodeVersioning(ctx, db, appLggr, cfg.RootDir(), cfg.Database(), cfg.WebServer().HTTPPort())
if err != nil {
return nil, err
}

ds := sqlutil.WrapDataSource(db, appLggr, sqlutil.TimeoutHook(cfg.Database().DefaultQueryTimeout), sqlutil.MonitorHook(cfg.Database().LogSQL))
keyStore := keystore.New(ds, utils.GetScryptParams(cfg), appLggr.Infof)

err = keyStoreAuthenticator.Authenticate(ctx, keyStore, cfg.Password())
if err != nil {
return nil, errors.Wrap(err, "error authenticating keystore")
}

beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(ctx, keyStore.CSA())
if err != nil {
return nil, errors.Wrap(err, "failed to build Beholder auth")
}

err = initGlobals(cfg.Prometheus(), cfg.Tracing(), cfg.Telemetry(), appLggr, csaPubKeyHex, beholderAuthHeaders)
if err != nil {
appLggr.Errorf("Failed to initialize globals: %v", err)
}

mercuryPool := wsrpc.NewPool(appLggr, cache.Config{
LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(),
MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(),
Expand Down Expand Up @@ -738,13 +720,13 @@ type DiskCookieStore struct {

// Save stores a cookie.
func (d DiskCookieStore) Save(cookie *http.Cookie) error {
return os.WriteFile(d.cookiePath(), []byte(cookie.String()), 0600)
return os.WriteFile(d.cookiePath(), []byte(cookie.String()), 0o600)
}

// Removes any stored cookie.
func (d DiskCookieStore) Reset() error {
// Write empty bytes
return os.WriteFile(d.cookiePath(), []byte(""), 0600)
return os.WriteFile(d.cookiePath(), []byte(""), 0o600)
}

// Retrieve returns any Saved cookies.
Expand Down Expand Up @@ -785,7 +767,7 @@ func NewUserCache(subdir string, lggr func() logger.Logger) (*UserCache, error)
}

func (cs *UserCache) ensure() {
if err := os.MkdirAll(cs.dir, 0700); err != nil {
if err := os.MkdirAll(cs.dir, 0o700); err != nil {
cs.lggr().Errorw("Failed to make user cache dir", "dir", cs.dir, "err", err)
}
}
Expand Down Expand Up @@ -859,7 +841,7 @@ func (t *promptingAPIInitializer) Initialize(ctx context.Context, orm sessions.B
// If there are no users in the database, prompt for initial admin user creation
if len(dbUsers) == 0 {
if !t.prompter.IsTerminal() {
return sessions.User{}, ErrorNoAPICredentialsAvailable
return sessions.User{}, ErrNoAPICredentialsAvailable
}

for {
Expand All @@ -886,7 +868,6 @@ func (t *promptingAPIInitializer) Initialize(ctx context.Context, orm sessions.B
// Otherwise, multiple admin users exist, prompt for which to use
email := t.prompter.Prompt("Enter email of API user account to assume: ")
user, err := orm.FindUser(ctx, email)

if err != nil {
return sessions.User{}, err
}
Expand Down
Loading
Loading