From 8019a21c59d19c1386fe22876c701dfb6871204f Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Mon, 13 Oct 2025 21:23:44 +0200 Subject: [PATCH 01/12] Refactor node startup logic to enable telemetry and dependencies earlier in the subcommands --- core/cmd/shell.go | 52 ++---- core/cmd/shell_local.go | 179 ++++++++++++------- core/cmd/shell_local_test.go | 277 ++++++++++++++++++++--------- core/internal/cltest/mocks.go | 21 +-- core/logger/prometheus.go | 7 +- core/services/keystore/csa_test.go | 3 + 6 files changed, 334 insertions(+), 205 deletions(-) diff --git a/core/cmd/shell.go b/core/cmd/shell.go index d2d5cb4857c..615d79e5614 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -50,6 +50,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/llo" "github.com/smartcontractkit/chainlink/v2/core/services/llo/retirement" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" + "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" "github.com/smartcontractkit/chainlink/v2/core/services/versioning" @@ -59,7 +60,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/static" "github.com/smartcontractkit/chainlink/v2/core/store/migrate" - "github.com/smartcontractkit/chainlink/v2/core/utils" "github.com/smartcontractkit/chainlink/v2/core/web" ) @@ -139,11 +139,9 @@ func initGlobals(cfgProm config.Prometheus, cfgTracing config.Tracing, cfgTeleme return err } -var ( - // ErrorNoAPICredentialsAvailable is returned when not run from a terminal - // and no API credentials have been provided - ErrorNoAPICredentialsAvailable = errors.New("API credentials must be supplied") -) +// ErrNoAPICredentialsAvailable is returned when not run from a terminal +// and no API credentials have been provided +var ErrNoAPICredentialsAvailable = errors.New("API credentials must be supplied") // Shell for the node, local commands and remote commands. type Shell struct { @@ -167,6 +165,12 @@ type Shell struct { configFilesIsSet bool secretsFiles []string secretsFileIsSet bool + + LDB pg.LockedDB // initialized in BeforeNode + DS sqlutil.DataSource // initialized in BeforeNode + KeyStore keystore.Master // initialized in BeforeNode + + CleanupOnce sync.Once // ensures cleanup happens exactly once } func (s *Shell) errorOut(err error) cli.ExitCoder { @@ -190,42 +194,19 @@ func (s *Shell) configExitErr(validateFn func() error) cli.ExitCoder { // AppFactory implements the NewApplication method. type AppFactory interface { - NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (chainlink.Application, error) + NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, ds sqlutil.DataSource, keyStore keystore.Master) (chainlink.Application, error) } // ChainlinkAppFactory is used to create a new Application. type ChainlinkAppFactory struct{} // NewApplication returns a new instance of the node with the given config. -func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, db *sqlx.DB, keyStoreAuthenticator TerminalKeyStoreAuthenticator) (app chainlink.Application, err error) { +func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, appLggr logger.Logger, appRegisterer prometheus.Registerer, ds sqlutil.DataSource, keyStore keystore.Master) (app chainlink.Application, err error) { err = migrate.SetMigrationENVVars(cfg.EVMConfigs()) if err != nil { return nil, err } - err = handleNodeVersioning(ctx, db, appLggr, cfg.RootDir(), cfg.Database(), cfg.WebServer().HTTPPort()) - if err != nil { - return nil, err - } - - ds := sqlutil.WrapDataSource(db, appLggr, sqlutil.TimeoutHook(cfg.Database().DefaultQueryTimeout), sqlutil.MonitorHook(cfg.Database().LogSQL)) - keyStore := keystore.New(ds, utils.GetScryptParams(cfg), appLggr.Infof) - - err = keyStoreAuthenticator.Authenticate(ctx, keyStore, cfg.Password()) - if err != nil { - return nil, errors.Wrap(err, "error authenticating keystore") - } - - beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(ctx, keyStore.CSA()) - if err != nil { - return nil, errors.Wrap(err, "failed to build Beholder auth") - } - - err = initGlobals(cfg.Prometheus(), cfg.Tracing(), cfg.Telemetry(), appLggr, csaPubKeyHex, beholderAuthHeaders) - if err != nil { - appLggr.Errorf("Failed to initialize globals: %v", err) - } - mercuryPool := wsrpc.NewPool(appLggr, cache.Config{ LatestReportTTL: cfg.Mercury().Cache().LatestReportTTL(), MaxStaleAge: cfg.Mercury().Cache().MaxStaleAge(), @@ -738,13 +719,13 @@ type DiskCookieStore struct { // Save stores a cookie. func (d DiskCookieStore) Save(cookie *http.Cookie) error { - return os.WriteFile(d.cookiePath(), []byte(cookie.String()), 0600) + return os.WriteFile(d.cookiePath(), []byte(cookie.String()), 0o600) } // Removes any stored cookie. func (d DiskCookieStore) Reset() error { // Write empty bytes - return os.WriteFile(d.cookiePath(), []byte(""), 0600) + return os.WriteFile(d.cookiePath(), []byte(""), 0o600) } // Retrieve returns any Saved cookies. @@ -785,7 +766,7 @@ func NewUserCache(subdir string, lggr func() logger.Logger) (*UserCache, error) } func (cs *UserCache) ensure() { - if err := os.MkdirAll(cs.dir, 0700); err != nil { + if err := os.MkdirAll(cs.dir, 0o700); err != nil { cs.lggr().Errorw("Failed to make user cache dir", "dir", cs.dir, "err", err) } } @@ -859,7 +840,7 @@ func (t *promptingAPIInitializer) Initialize(ctx context.Context, orm sessions.B // If there are no users in the database, prompt for initial admin user creation if len(dbUsers) == 0 { if !t.prompter.IsTerminal() { - return sessions.User{}, ErrorNoAPICredentialsAvailable + return sessions.User{}, ErrNoAPICredentialsAvailable } for { @@ -886,7 +867,6 @@ func (t *promptingAPIInitializer) Initialize(ctx context.Context, orm sessions.B // Otherwise, multiple admin users exist, prompt for which to use email := t.prompter.Prompt("Enter email of API user account to assume: ") user, err := orm.FindUser(ctx, email) - if err != nil { return sessions.User{}, err } diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 9b927d5fddf..06b604ff895 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -24,6 +24,8 @@ import ( chain_selectors "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink-evm/pkg/assets" "github.com/smartcontractkit/chainlink-evm/pkg/chains/legacyevm" "github.com/smartcontractkit/chainlink-evm/pkg/gas" @@ -73,11 +75,15 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { }, Usage: "Run the Chainlink node", Action: s.RunNode, + Before: s.BeforeNode, + After: s.AfterNode, }, { Name: "rebroadcast-transactions", Usage: "Manually rebroadcast txs matching nonce range with the specified gas price. This is useful in emergencies e.g. high gas prices and/or network congestion to forcibly clear out the pending TX queue", Action: s.RebroadcastTransactions, + Before: s.BeforeNode, + After: s.AfterNode, Flags: []cli.Flag{ cli.Uint64Flag{ Name: "beginningNonce, beginning-nonce, b", @@ -257,6 +263,8 @@ func initLocalSubCmds(s *Shell, safe bool) []cli.Command { Name: "remove-blocks", Usage: "Deletes block range and all associated data", Action: s.RemoveBlocks, + Before: s.BeforeNode, + After: s.AfterNode, Flags: []cli.Flag{ cli.IntFlag{ Name: "start", @@ -288,24 +296,6 @@ func (s *Shell) runNode(c *cli.Context) error { ctx := s.ctx() lggr := logger.Sugared(s.Logger.Named("RunNode")) - var pwd, vrfpwd *string - if passwordFile := c.String("password"); passwordFile != "" { - p, err := utils.PasswordFromFile(passwordFile) - if err != nil { - return errors.Wrap(err, "error reading password from file") - } - pwd = &p - } - if vrfPasswordFile := c.String("vrfpassword"); len(vrfPasswordFile) != 0 { - p, err := utils.PasswordFromFile(vrfPasswordFile) - if err != nil { - return errors.Wrapf(err, "error reading VRF password from vrfpassword file \"%s\"", vrfPasswordFile) - } - vrfpwd = &p - } - - s.Config.SetPasswords(pwd, vrfpwd) - s.Config.LogConfiguration(lggr.Debugf, lggr.Warnf) if err := s.Config.Validate(); err != nil { @@ -318,15 +308,12 @@ func (s *Shell) runNode(c *cli.Context) error { lggr.Warn("Chainlink is running in DEVELOPMENT mode. This is a security risk if enabled in production.") } - if err := utils.EnsureDirAndMaxPerms(s.Config.RootDir(), os.FileMode(0700)); err != nil { + if err := utils.EnsureDirAndMaxPerms(s.Config.RootDir(), os.FileMode(0o700)); err != nil { return fmt.Errorf("failed to create root directory %q: %w", s.Config.RootDir(), err) } - cfg := s.Config - ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) - // rootCtx will be cancelled when SIGINT|SIGTERM is received - rootCtx, cancelRootCtx := context.WithCancel(context.Background()) + rootCtx, cancelRootCtx := context.WithCancel(ctx) // cleanExit is used to skip "fail fast" routine cleanExit := make(chan struct{}) @@ -353,28 +340,12 @@ func (s *Shell) runNode(c *cli.Context) error { lggr.Criticalf("Shutdown grace period of %v exceeded, closing DB and exiting...", s.Config.ShutdownGracePeriod()) // LockedDB.Close() will release DB locks and close DB connection // Executing this explicitly because defers are not executed in case of os.Exit() - if err := ldb.Close(); err != nil { - lggr.Criticalf("Failed to close LockedDB: %v", err) - } - lggr.Debug("Closed DB") - if err := s.CloseLogger(); err != nil { - log.Printf("Failed to close Logger: %v", err) - } + s.afterNode(lggr) os.Exit(-1) }) - // Try opening DB connection and acquiring DB locks at once - if err := ldb.Open(rootCtx); err != nil { - // If not successful, we know neither locks nor connection remains opened - return s.errorOut(errors.Wrap(err, "opening db")) - } - defer lggr.ErrorIfFn(ldb.Close, "Error closing db") - - // From now on, DB locks and DB connection will be released on every return. - // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. - - app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator) + app, err := s.AppFactory.NewApplication(rootCtx, s.Config, s.Logger, s.Registerer, s.DS, s.KeyStore) if err != nil { return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) } @@ -564,7 +535,7 @@ func (s *Shell) runNode(c *cli.Context) error { return errors.Wrap(err, "error creating api initializer") } if user, err = s.FallbackAPIInitializer.Initialize(ctx, authProviderORM, lggr); err != nil { - if errors.Is(err, ErrorNoAPICredentialsAvailable) { + if errors.Is(err, ErrNoAPICredentialsAvailable) { return errors.WithStack(err) } return errors.Wrap(err, "error creating fallback initializer") @@ -696,13 +667,8 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { } lggr := logger.Sugared(s.Logger.Named("RebroadcastTransactions")) - db, err := pg.OpenUnlockedDB(ctx, s.Config.AppID(), s.Config.Database()) - if err != nil { - return s.errorOut(errors.Wrap(err, "opening DB")) - } - defer lggr.ErrorIfFn(db.Close, "Error closing db") - app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, s.Registerer, db, s.KeyStoreAuthenticator) + app, err := s.AppFactory.NewApplication(ctx, s.Config, lggr, s.Registerer, s.DS, s.KeyStore) if err != nil { return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) } @@ -727,14 +693,6 @@ func (s *Shell) RebroadcastTransactions(c *cli.Context) (err error) { return err } - if c.IsSet("password") { - pwd, err2 := utils.PasswordFromFile(c.String("password")) - if err2 != nil { - return s.errorOut(fmt.Errorf("error reading password: %w", err2)) - } - s.Config.SetPasswords(&pwd, nil) - } - err = s.Config.Validate() if err != nil { return s.errorOut(fmt.Errorf("error validating configuration: %w", err)) @@ -1069,41 +1027,124 @@ func (s *Shell) RemoveBlocks(c *cli.Context) error { } lggr := logger.Sugared(s.Logger.Named("RemoveBlocks")) - ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) ctx, cancel := context.WithCancel(context.Background()) go shutdown.HandleShutdown(func(sig string) { cancel() lggr.Info("received signal to stop - closing the database and releasing lock") - if cErr := ldb.Close(); cErr != nil { - lggr.Criticalf("Failed to close LockedDB: %v", cErr) + s.afterNode(lggr) + }) + + app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, s.Registerer, s.DS, s.KeyStore) + if err != nil { + return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) + } + + err = app.DeleteLogPollerDataAfter(ctx, chainID, start) + if err != nil { + return s.errorOut(err) + } + + lggr.Infof("RemoveBlocks: successfully removed blocks") + + return nil +} + +// BeforeNode initializes database, keystore, logger, and beholder for node startup. +// This is used as a Before hook in CLI commands that require these components. +func (s *Shell) BeforeNode(c *cli.Context) error { + if err := s.beforeNode(c); err != nil { + return s.errorOut(err) + } + return nil +} + +// afterNode is a thread-safe helper method to close the database and logger. +// This is used in multiple places: shutdown handler, signal handler, and cleanup. +// It uses sync.Once to ensure cleanup happens exactly once, even if called concurrently. +func (s *Shell) afterNode(lggr logger.SugaredLogger) { + s.CleanupOnce.Do(func() { + if err := s.LDB.Close(); err != nil { + lggr.Criticalf("Failed to close LockedDB: %v", err) } + lggr.Debug("Closed DB") - if cErr := s.CloseLogger(); cErr != nil { - log.Printf("Failed to close Logger: %v", cErr) + if err := s.CloseLogger(); err != nil { + log.Printf("Failed to close Logger: %v", err) } }) +} + +// AfterNode cleans up resources initialized by BeforeNode. +// This is used as an After hook in CLI commands. +func (s *Shell) AfterNode(c *cli.Context) error { + s.afterNode(logger.Sugared(s.Logger)) + return nil +} + +// beforeNode performs the actual initialization of DB, keystore, and telemetry. +// It handles password loading, database connection, keystore authentication, and Beholder setup. +func (s *Shell) beforeNode(c *cli.Context) error { + ctx := s.ctx() + + var pwd, vrfpwd *string + if passwordFile := c.String("password"); passwordFile != "" { + p, err := utils.PasswordFromFile(passwordFile) + if err != nil { + return errors.Wrap(err, "error reading password from file") + } + pwd = &p + } + if vrfPasswordFile := c.String("vrfpassword"); len(vrfPasswordFile) != 0 { + p, err := utils.PasswordFromFile(vrfPasswordFile) + if err != nil { + return errors.Wrapf(err, "error reading VRF password from vrfpassword file \"%s\"", vrfPasswordFile) + } + vrfpwd = &p + } + + s.Config.SetPasswords(pwd, vrfpwd) + + lggr := s.Logger + cfg := s.Config + + ldb := pg.NewLockedDB(cfg.AppID(), cfg.Database(), cfg.Database().Lock(), lggr) + s.LDB = ldb - if err = ldb.Open(ctx); err != nil { + // Try opening DB connection and acquiring DB locks at once + if err := ldb.Open(ctx); err != nil { // If not successful, we know neither locks nor connection remains opened return s.errorOut(errors.Wrap(err, "opening db")) } - defer lggr.ErrorIfFn(ldb.Close, "Error closing db") - + db := ldb.DB() // From now on, DB locks and DB connection will be released on every return. // Keep watching on logger.Fatal* calls and os.Exit(), because defer will not be executed. - app, err := s.AppFactory.NewApplication(ctx, s.Config, s.Logger, s.Registerer, ldb.DB(), s.KeyStoreAuthenticator) + err := handleNodeVersioning(ctx, db, lggr, cfg.RootDir(), cfg.Database(), cfg.WebServer().HTTPPort()) if err != nil { - return s.errorOut(errors.Wrap(err, "fatal error instantiating application")) + return err } - err = app.DeleteLogPollerDataAfter(ctx, chainID, start) + ds := sqlutil.WrapDataSource(db, lggr, sqlutil.TimeoutHook(cfg.Database().DefaultQueryTimeout), sqlutil.MonitorHook(cfg.Database().LogSQL)) + keyStore := keystore.New(ds, utils.GetScryptParams(cfg), lggr.Infof) + s.DS = ds + s.KeyStore = keyStore + + err = s.KeyStoreAuthenticator.Authenticate(ctx, keyStore, cfg.Password()) if err != nil { - return s.errorOut(err) + return errors.Wrap(err, "error authenticating keystore") } - lggr.Infof("RemoveBlocks: successfully removed blocks") + beholderAuthHeaders, csaPubKeyHex, err := keystore.BuildBeholderAuth(ctx, keyStore.CSA()) + if err != nil { + return errors.Wrap(err, "failed to build Beholder auth") + } + + // Initialize globals with beholder and telemetry + err = initGlobals(s.Config.Prometheus(), s.Config.Tracing(), s.Config.Telemetry(), s.Logger, csaPubKeyHex, beholderAuthHeaders) + if err != nil { + return errors.Wrap(err, "failed initializing globals") + } return nil } diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index f74d8e95ee9..bd3f3fde984 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -6,6 +6,7 @@ import ( "math/big" "os" "strconv" + "sync" "testing" "time" @@ -47,6 +48,15 @@ import ( "github.com/smartcontractkit/chainlink/v2/plugins" ) +// resetShellForTest resets the Shell's cleanup guard for test isolation. +// This allows multiple initialization/cleanup cycles in tests. +func resetShellForTest(shell *cmd.Shell) { + shell.CleanupOnce = sync.Once{} + shell.LDB = nil + shell.DS = nil + shell.KeyStore = nil +} + func genTestEVMRelayers(t *testing.T, cfg chainlink.GeneralConfig, ds sqlutil.DataSource, ethKeystore keystore.Eth, csaKeystore core.Keystore) *chainlink.CoreRelayerChainInteroperators { lggr := logger.TestLogger(t) f := chainlink.RelayerFactory{ @@ -73,87 +83,6 @@ func genTestEVMRelayers(t *testing.T, cfg chainlink.GeneralConfig, ds sqlutil.Da return relayers } -func TestShell_RunNodeWithPasswords(t *testing.T) { - tests := []struct { - name string - pwdfile string - wantUnlocked bool - }{ - {"correct", "../internal/fixtures/correct_password.txt", true}, - {"incorrect", "../internal/fixtures/incorrect_password.txt", false}, - {"wrongfile", "doesntexist.txt", false}, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { - s.Password.Keystore = models.NewSecret("dummy") - c.EVM[0].Nodes[0].Name = ptr("fake") - c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") - c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") - // seems to be needed for config validate - c.Insecure.OCRDevelopmentMode = nil - }) - db := pgtest.NewSqlxDB(t) - keyStore := cltest.NewKeyStore(t, db) - authProviderORM := localauth.NewORM(db, time.Minute, logger.TestLogger(t), audit.NoopLogger) - - testRelayers := genTestEVMRelayers(t, cfg, db, keyStore.Eth(), &keystore.CSASigner{CSA: keyStore.CSA()}) - - // Purge the fixture users to test assumption of single admin - // initialUser user created above - pgtest.MustExec(t, db, "DELETE FROM users;") - - app := mocks.NewApplication(t) - app.On("AuthenticationProvider").Return(authProviderORM).Maybe() - app.On("BasicAdminUsersORM").Return(authProviderORM).Maybe() - app.On("GetKeyStore").Return(keyStore).Maybe() - app.On("GetRelayers").Return(testRelayers).Maybe() - app.On("Start", mock.Anything).Maybe().Return(nil) - app.On("Stop").Maybe().Return(nil) - app.On("ID").Maybe().Return(uuid.New()) - - ethClient := evmtest.NewEthClientMock(t) - ethClient.On("Dial", mock.Anything).Return(nil).Maybe() - ethClient.On("BalanceAt", mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(10), nil).Maybe() - - cltest.MustInsertRandomKey(t, keyStore.Eth()) - apiPrompt := cltest.NewMockAPIInitializer(t) - - client := cmd.Shell{ - Config: cfg, - FallbackAPIInitializer: apiPrompt, - Runner: cltest.EmptyRunner{}, - AppFactory: cltest.InstanceAppFactoryWithKeystoreMock{App: app}, - Logger: logger.TestLogger(t), - } - - set := flag.NewFlagSet("test", 0) - flagSetApplyFromAction(client.RunNode, set, "") - - require.NoError(t, set.Set("password", test.pwdfile)) - - c := cli.NewContext(nil, set, nil) - - run := func() error { - cli := cmd.NewApp(&client) - if err := cli.Before(c); err != nil { - return err - } - return client.RunNode(c) - } - - if test.wantUnlocked { - assert.NoError(t, run()) - assert.Equal(t, 1, apiPrompt.Count) - } else { - assert.Error(t, run()) - assert.Equal(t, 0, apiPrompt.Count) - } - }) - } -} - func TestShell_RunNodeWithAPICredentialsFile(t *testing.T) { tests := []struct { name string @@ -256,7 +185,7 @@ func TestShell_DiskMaxSizeBeforeRotateOptionDisablesAsExpected(t *testing.T) { Dir: t.TempDir(), FileMaxSizeMB: int(tt.logFileSize(t) / utils.MB), } - assert.NoError(t, os.MkdirAll(cfg.Dir, os.FileMode(0700))) + require.NoError(t, os.MkdirAll(cfg.Dir, os.FileMode(0o700))) lggr, closeFn := cfg.New() t.Cleanup(func() { assert.NoError(t, closeFn()) }) @@ -325,6 +254,10 @@ func TestShell_RebroadcastTransactions_Txm(t *testing.T) { ctx := cli.NewContext(nil, set, nil) + // Run before hook to initialize components with authentication + err := c.BeforeNode(ctx) + require.NoError(t, err) + for i := beginningNonce; i <= endingNonce; i++ { n := i ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { @@ -405,6 +338,10 @@ func TestShell_RebroadcastTransactions_OutsideRange_Txm(t *testing.T) { require.NoError(t, set.Set("password", "../internal/fixtures/correct_password.txt")) ctx := cli.NewContext(nil, set, nil) + // Run before hook to initialize components with authentication + err := c.BeforeNode(ctx) + require.NoError(t, err) + for i := beginningNonce; i <= endingNonce; i++ { n := i ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { @@ -478,6 +415,11 @@ func TestShell_RebroadcastTransactions_AddressCheck(t *testing.T) { require.NoError(t, set.Set("address", fromAddress.Hex())) require.NoError(t, set.Set("password", "../internal/fixtures/correct_password.txt")) c := cli.NewContext(nil, set, nil) + + // Run before hook to initialize components with authentication + err := client.BeforeNode(c) + require.NoError(t, err) + if test.shouldError { require.ErrorContains(t, client.RebroadcastTransactions(c), test.errorContains) } else { @@ -562,3 +504,174 @@ func TestShell_RemoveBlocks(t *testing.T) { require.NoError(t, err) }) } + +func TestShell_BeforeNode(t *testing.T) { + tests := []struct { + name string + pwdfile string + wantUnlocked bool + }{ + {"correct password", "../internal/fixtures/correct_password.txt", true}, + {"incorrect password", "../internal/fixtures/incorrect_password.txt", false}, + {"wrong file", "doesntexist.txt", false}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + s.Password.Keystore = models.NewSecret("dummy") + c.EVM[0].Nodes[0].Name = ptr("fake") + c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") + c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") + c.Insecure.OCRDevelopmentMode = nil + }) + + shell := cmd.Shell{ + Config: cfg, + KeyStoreAuthenticator: cmd.TerminalKeyStoreAuthenticator{ + Prompter: &cltest.MockCountingPrompter{T: t, NotTerminal: true}, + }, + Logger: logger.TestLogger(t), + } + // Reset for test isolation + defer resetShellForTest(&shell) + + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RunNode, set, "") + require.NoError(t, set.Set("password", test.pwdfile)) + + c := cli.NewContext(nil, set, nil) + + // Create full CLI app and run the Before hook first + app := cmd.NewApp(&shell) + err := app.Before(c) + if err != nil && test.wantUnlocked { + t.Fatalf("CLI Before hook failed: %v", err) + } + + // Run before hook to initialize components with authentication + err = shell.BeforeNode(c) + + if test.wantUnlocked { + require.NoError(t, err) + // Verify that shell components were initialized + assert.NotNil(t, shell.KeyStore) + assert.NotNil(t, shell.DS) + assert.NotNil(t, shell.LDB) + + // Verify keystore is unlocked by checking if we can access keys + ctx := testutils.Context(t) + isEmpty, emptyErr := shell.KeyStore.IsEmpty(ctx) + require.NoError(t, emptyErr) + if !isEmpty { + keys, keysErr := shell.KeyStore.CSA().GetAll() + require.NoError(t, keysErr) + assert.NotEmpty(t, keys) + } + } else { + require.Error(t, err) + } + + // Clean up + if shell.LDB != nil { + shell.LDB.Close() + } + }) + } +} + +func TestShell_RunNode_WithBeforeNode(t *testing.T) { + tests := []struct { + name string + pwdfile string + expectStart bool + }{ + {"correct password", "../internal/fixtures/correct_password.txt", true}, + {"incorrect password", "../internal/fixtures/incorrect_password.txt", false}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + s.Password.Keystore = models.NewSecret("dummy") + c.EVM[0].Nodes[0].Name = ptr("fake") + c.EVM[0].Nodes[0].HTTPURL = commonconfig.MustParseURL("http://fake.com") + c.EVM[0].Nodes[0].WSURL = commonconfig.MustParseURL("WSS://fake.com/ws") + // seems to be needed for config validate + c.Insecure.OCRDevelopmentMode = nil + }) + + db := pgtest.NewSqlxDB(t) + keyStore := cltest.NewKeyStore(t, db) + authProviderORM := localauth.NewORM(db, time.Minute, logger.TestLogger(t), audit.NoopLogger) + + testRelayers := genTestEVMRelayers(t, cfg, db, keyStore.Eth(), &keystore.CSASigner{CSA: keyStore.CSA()}) + + // Purge fixture users to test assumption of single admin + pgtest.MustExec(t, db, "DELETE FROM users;") + + app := mocks.NewApplication(t) + app.On("AuthenticationProvider").Return(authProviderORM).Maybe() + app.On("BasicAdminUsersORM").Return(authProviderORM).Maybe() + app.On("GetKeyStore").Return(keyStore).Maybe() + app.On("GetRelayers").Return(testRelayers).Maybe() + app.On("Start", mock.Anything).Maybe().Return(nil) + app.On("Stop").Maybe().Return(nil) + app.On("ID").Maybe().Return(uuid.New()) + + ethClient := clienttest.NewClient(t) + ethClient.On("Dial", mock.Anything).Return(nil).Maybe() + ethClient.On("BalanceAt", mock.Anything, mock.Anything, mock.Anything).Return(big.NewInt(10), nil).Maybe() + + cltest.MustInsertRandomKey(t, keyStore.Eth()) + apiPrompt := cltest.NewMockAPIInitializer(t) + + shell := cmd.Shell{ + Config: cfg, + FallbackAPIInitializer: apiPrompt, + Runner: cltest.EmptyRunner{}, + AppFactory: cltest.InstanceAppFactory{App: app}, + KeyStoreAuthenticator: cmd.TerminalKeyStoreAuthenticator{ + Prompter: &cltest.MockCountingPrompter{T: t, NotTerminal: true}, + }, + Logger: logger.TestLogger(t), + } + // Reset for test isolation + defer resetShellForTest(&shell) + + set := flag.NewFlagSet("test", 0) + flagSetApplyFromAction(shell.RunNode, set, "") + require.NoError(t, set.Set("password", test.pwdfile)) + + c := cli.NewContext(nil, set, nil) + + // First initialize components (this includes authentication) + cliApp := cmd.NewApp(&shell) + err := cliApp.Before(c) + require.NoError(t, err) + + err = shell.BeforeNode(c) + if test.expectStart { + require.NoError(t, err, "BeforeNode should succeed") + // Verify components are initialized + assert.NotNil(t, shell.KeyStore) + assert.NotNil(t, shell.DS) + assert.NotNil(t, shell.LDB) + + // Now test RunNode with pre-authenticated keystore + // Note: RunNode will start the app but we expect it to work since keystore is authenticated + err = shell.RunNode(c) + require.NoError(t, err, "RunNode should succeed with authenticated keystore") + assert.Equal(t, 1, apiPrompt.Count, "API should be initialized") + } else { + require.Error(t, err, "BeforeNode should fail with incorrect password") + // Don't test RunNode if BeforeNode failed + } + + // Clean up + if shell.LDB != nil { + shell.LDB.Close() + } + }) + } +} diff --git a/core/internal/cltest/mocks.go b/core/internal/cltest/mocks.go index e1cf5d90338..4e056b1391c 100644 --- a/core/internal/cltest/mocks.go +++ b/core/internal/cltest/mocks.go @@ -11,12 +11,12 @@ import ( "time" gethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/robfig/cron/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types" evmclient "github.com/smartcontractkit/chainlink-evm/pkg/client" "github.com/smartcontractkit/chainlink-evm/pkg/config/toml" @@ -30,6 +30,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/sessions" "github.com/smartcontractkit/chainlink/v2/core/web" ) @@ -84,27 +85,13 @@ func (rm *RendererMock) Render(v any, headers ...string) error { return nil } -type InstanceAppFactoryWithKeystoreMock struct { - App chainlink.Application -} - -// NewApplication creates a new application with specified config and calls the authenticate function of the keystore -func (f InstanceAppFactoryWithKeystoreMock) NewApplication(ctx context.Context, cfg chainlink.GeneralConfig, lggr logger.Logger, registerer prometheus.Registerer, db *sqlx.DB, ks cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { - keyStore := f.App.GetKeyStore() - err := ks.Authenticate(ctx, keyStore, cfg.Password()) - if err != nil { - return nil, fmt.Errorf("error authenticating keystore: %w", err) - } - return f.App, nil -} - // InstanceAppFactory is an InstanceAppFactory type InstanceAppFactory struct { App chainlink.Application } // NewApplication creates a new application with specified config -func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { +func (f InstanceAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, sqlutil.DataSource, keystore.Master) (chainlink.Application, error) { return f.App, nil } @@ -112,7 +99,7 @@ type seededAppFactory struct { Application chainlink.Application } -func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, *sqlx.DB, cmd.TerminalKeyStoreAuthenticator) (chainlink.Application, error) { +func (s seededAppFactory) NewApplication(context.Context, chainlink.GeneralConfig, logger.Logger, prometheus.Registerer, sqlutil.DataSource, keystore.Master) (chainlink.Application, error) { return noopStopApplication{s.Application}, nil } diff --git a/core/logger/prometheus.go b/core/logger/prometheus.go index 4eb32c164cd..2fe3073b168 100644 --- a/core/logger/prometheus.go +++ b/core/logger/prometheus.go @@ -10,18 +10,22 @@ var warnCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "log_warn_count", Help: "Number of warning messages in log", }) + var errorCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "log_error_count", Help: "Number of error messages in log", }) + var criticalCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "log_critical_count", Help: "Number of critical messages in log", }) + var panicCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "log_panic_count", Help: "Number of panic messages in log", }) + var fatalCounter = promauto.NewCounter(prometheus.CounterOpts{ Name: "log_fatal_count", Help: "Number of fatal messages in log", @@ -42,7 +46,8 @@ func newPrometheusLoggerWithCounters( errorCounter prometheus.Counter, criticalCounter prometheus.Counter, panicCounter prometheus.Counter, - fatalCounter prometheus.Counter) Logger { + fatalCounter prometheus.Counter, +) Logger { return &prometheusLogger{ h: l.Helper(1), warnCnt: warnCounter, diff --git a/core/services/keystore/csa_test.go b/core/services/keystore/csa_test.go index 281cd0933c5..7568176c641 100644 --- a/core/services/keystore/csa_test.go +++ b/core/services/keystore/csa_test.go @@ -29,6 +29,9 @@ func Test_CSAKeyStore_E2E(t *testing.T) { require.NoError(t, keyStore.Unlock(ctx, cltest.Password)) } + // Initial cleanup to ensure clean state at start + reset() + t.Run("initializes with an empty state", func(t *testing.T) { defer reset() keys, err := ks.GetAll() From d5be742bf51c310045b24657f5daf5c56252dfc2 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Mon, 13 Oct 2025 21:24:19 +0200 Subject: [PATCH 02/12] Wire up OTel logs streaming, integrate chainlink logger with otel --- .changeset/slick-badgers-behave.md | 5 ++ core/cmd/app.go | 18 ++-- core/cmd/shell.go | 1 + core/cmd/shell_local.go | 15 ++++ core/logger/logger.go | 17 +++- core/logger/logger_test.go | 135 ++++++++++++++++++++++++++++- core/logger/zap.go | 37 ++++++++ core/logger/zap_disk_logging.go | 4 +- go.mod | 2 +- 9 files changed, 221 insertions(+), 13 deletions(-) create mode 100644 .changeset/slick-badgers-behave.md diff --git a/.changeset/slick-badgers-behave.md b/.changeset/slick-badgers-behave.md new file mode 100644 index 00000000000..f1b86823df5 --- /dev/null +++ b/.changeset/slick-badgers-behave.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#added Wire up OTel logs streaming, integrate chainlink logger with otel diff --git a/core/cmd/app.go b/core/cmd/app.go index f105dc96030..8d978a86ae8 100644 --- a/core/cmd/app.go +++ b/core/cmd/app.go @@ -261,20 +261,28 @@ func NewApp(s *Shell) *cli.App { return err } + // Configure a new logger with OTel atomic core support lggrCfg := logger.Config{ - LogLevel: s.Config.Log().Level(), - Dir: s.Config.Log().File().Dir(), - JsonConsole: s.Config.Log().JSONConsole(), - UnixTS: s.Config.Log().UnixTimestamps(), + LogLevel: s.Config.Log().Level(), + Dir: s.Config.Log().File().Dir(), + JsonConsole: s.Config.Log().JSONConsole(), + UnixTS: s.Config.Log().UnixTimestamps(), + //nolint:gosec // filemaxsizesmb won't exceed max int FileMaxSizeMB: int(logFileMaxSizeMB), FileMaxAgeDays: int(s.Config.Log().File().MaxAgeDays()), FileMaxBackups: int(s.Config.Log().File().MaxBackups()), SentryEnabled: s.Config.Sentry().DSN() != "", } - l, closeFn := lggrCfg.New() + + // Noop atomic core that can be swapped out later for OTel support + atomicCore := logger.NewAtomicCore() + + l, closeFn := lggrCfg.NewWithCores(atomicCore) s.Logger = l s.CloseLogger = closeFn + // s.SetOtelCore is a hook that can be used to set the OTel core + s.SetOtelCore = atomicCore.Store return nil }, diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 615d79e5614..bb8115eef70 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -150,6 +150,7 @@ type Shell struct { Logger logger.Logger // initialized in Before Registerer prometheus.Registerer // initialized in Before CloseLogger func() error // called in After + SetOtelCore func(*zapcore.Core) // reference to AtomicCore.Store AppFactory AppFactory KeyStoreAuthenticator TerminalKeyStoreAuthenticator FallbackAPIInitializer APIInitializer diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index 06b604ff895..cd3774581ae 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -24,6 +24,8 @@ import ( chain_selectors "github.com/smartcontractkit/chain-selectors" + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-evm/pkg/assets" @@ -1146,5 +1148,18 @@ func (s *Shell) beforeNode(c *cli.Context) error { return errors.Wrap(err, "failed initializing globals") } + // If log streaming is enabled swap core to add Otel + if s.Config.Telemetry().LogStreamingEnabled() { + if s.SetOtelCore == nil { + return errors.New("Shell.SetOtelCore is nil") + } + otelLogger := beholder.GetLogger() + logLevel := s.Config.Log().Level() + otelCore := otelzap.NewCore(otelLogger, otelzap.WithLevel(logLevel)) + + s.SetOtelCore(&otelCore) + lggr.Info("Log streaming enabled") + } + return nil } diff --git a/core/logger/logger.go b/core/logger/logger.go index 9aef5853910..7d735a91669 100644 --- a/core/logger/logger.go +++ b/core/logger/logger.go @@ -28,9 +28,11 @@ type stderrWriter struct{} func (sw stderrWriter) Write(p []byte) (n int, err error) { return os.Stderr.Write(p) } + func (sw stderrWriter) Close() error { return nil // never close stderr } + func (sw stderrWriter) Sync() error { return os.Stderr.Sync() } @@ -174,6 +176,11 @@ type Config struct { // New returns a new Logger with pretty printing to stdout, prometheus counters, and sentry forwarding. // Tests should use TestLogger. func (c *Config) New() (Logger, func() error) { + return c.NewWithCores() +} + +// NewWithCores is like New, but includes additional zapcore.Cores. +func (c *Config) NewWithCores(cores ...zapcore.Core) (Logger, func() error) { if c.diskSpaceAvailableFn == nil { c.diskSpaceAvailableFn = diskSpaceAvailable } @@ -189,9 +196,9 @@ func (c *Config) New() (Logger, func() error) { err error ) if !c.DebugLogsToDisk() { - l, closeLogger, err = newDefaultLogger(cfg, c.UnixTS) + l, closeLogger, err = newDefaultLogger(cfg, c.UnixTS, cores...) } else { - l, closeLogger, err = newRotatingFileLogger(cfg, *c) + l, closeLogger, err = newRotatingFileLogger(cfg, *c, cores...) } if err != nil { log.Fatal(err) @@ -241,12 +248,16 @@ func newZapConfigBase() zap.Config { return cfg } -func newDefaultLogger(zcfg zap.Config, unixTS bool) (Logger, func() error, error) { +func newDefaultLogger(zcfg zap.Config, unixTS bool, cores ...zapcore.Core) (Logger, func() error, error) { core, coreCloseFn, err := newDefaultLoggingCore(zcfg, unixTS) if err != nil { return nil, nil, err } + if len(cores) > 0 { + core = zapcore.NewTee(append([]zapcore.Core{core}, cores...)...) + } + l, loggerCloseFn, err := newLoggerForCore(zcfg, core) if err != nil { coreCloseFn() diff --git a/core/logger/logger_test.go b/core/logger/logger_test.go index dc7558a5156..d171fc76afc 100644 --- a/core/logger/logger_test.go +++ b/core/logger/logger_test.go @@ -4,6 +4,13 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/log/noop" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest/observer" + + "github.com/smartcontractkit/chainlink-common/pkg/logger/otelzap" ) func TestConfig(t *testing.T) { @@ -21,10 +28,134 @@ func TestStderrWriter(t *testing.T) { // Test Write n, err := sw.Write([]byte("Hello, World!")) - assert.NoError(t, err) + require.NoError(t, err) assert.Equal(t, 13, n, "Expected 13 bytes written") // Test Close err = sw.Close() - assert.NoError(t, err) + require.NoError(t, err) +} + +func TestOtelCore(t *testing.T) { + testCases := []struct { + name string + enableOtel bool + }{ + { + name: "otel integration enabled", + enableOtel: true, + }, + { + name: "otel integration disabled", + enableOtel: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cfg := Config{ + LogLevel: zapcore.InfoLevel, + } + + var logger Logger + var closeFn func() error + + if tc.enableOtel { + // Create a no-op OTel logger for testing + noopLogger := noop.NewLoggerProvider().Logger("test") + otelCore := otelzap.NewCore(noopLogger, otelzap.WithLevel(zapcore.DebugLevel)) + + logger, closeFn = cfg.NewWithCores(otelCore) + defer func() { + err := closeFn() + require.NoError(t, err) + }() + require.NotNil(t, logger) + + // Test that logger works with otel core + logger.Info("test log message with otel") + } else { + // Test that regular logger works + logger, closeFn = cfg.NewWithCores() + defer func() { + err := closeFn() + require.NoError(t, err) + }() + require.NotNil(t, logger) + + logger.Info("test log message without otel") + } + + // Test that the logger was created successfully + assert.NotNil(t, logger) + }) + } +} + +func TestAtomicCoreSwap(t *testing.T) { + // This test simulates two processes: + // 1. Process 1 creates a logger with an AtomicCore (initially noop) + // 2. Process 2 swaps in a new OTel core using SetOtelCore + // 3. Verify that subsequent logs go to both original and new cores + + // Create test cores that capture log entries + observedCore1, observedLogs1 := observer.New(zapcore.InfoLevel) + observedCore2, observedLogs2 := observer.New(zapcore.InfoLevel) + + // Process 1: Create logger with AtomicCore + atomicCore := NewAtomicCore() + + // Build the logger manually to have full control over cores + zcfg := newZapConfigBase() + zcfg.Level = zap.NewAtomicLevelAt(zapcore.InfoLevel) + + // Create a tee with observedCore1 and atomicCore + teeCore := zapcore.NewTee(observedCore1, atomicCore) + + // Create logger with the combined core + errSink, _, err := zap.Open(zcfg.ErrorOutputPaths...) + require.NoError(t, err) + + zapLog := zap.New(teeCore, zap.ErrorOutput(errSink), zap.AddCaller(), zap.AddStacktrace(zapcore.ErrorLevel)) + logger := &zapLogger{ + level: zcfg.Level, + SugaredLogger: zapLog.Sugar(), + } + + // Log before swapping - should only go to observedCore1 (atomicCore is noop) + logger.Info("before swap") + assert.Equal(t, 1, observedLogs1.Len(), "Expected 1 log in observedCore1 before swap") + assert.Equal(t, 0, observedLogs2.Len(), "Expected 0 logs in observedCore2 before swap") + + // Process 2: Swap in a new core (simulating SetOtelCore) + // In production, this would be an OTel core, but we use observedCore2 for verification + core2 := observedCore2 + atomicCore.Store(&core2) + + // Log after swapping - should go to both observedCore1 and observedCore2 + logger.Info("after swap") + assert.Equal(t, 2, observedLogs1.Len(), "Expected 2 logs in observedCore1 after swap") + assert.Equal(t, 1, observedLogs2.Len(), "Expected 1 log in observedCore2 after swap") + + // Verify the message in observedCore2 + entries := observedLogs2.All() + require.Len(t, entries, 1) + assert.Equal(t, "after swap", entries[0].Message) + assert.Equal(t, zapcore.InfoLevel, entries[0].Level) + + // Test with different log levels + logger.Debug("debug message") + // Debug is below InfoLevel, so shouldn't be logged + assert.Equal(t, 2, observedLogs1.Len(), "Debug should not be logged at Info level") + assert.Equal(t, 1, observedLogs2.Len(), "Debug should not be logged at Info level") + + logger.Warn("warning message") + assert.Equal(t, 3, observedLogs1.Len(), "Warn should be logged") + assert.Equal(t, 2, observedLogs2.Len(), "Warn should be logged in both cores") + + // Verify the second message in observedCore2 + entries = observedLogs2.All() + require.Len(t, entries, 2) + assert.Equal(t, "warning message", entries[1].Message) + assert.Equal(t, zapcore.WarnLevel, entries[1].Level) } diff --git a/core/logger/zap.go b/core/logger/zap.go index 848f2ea49c0..cdfef072638 100644 --- a/core/logger/zap.go +++ b/core/logger/zap.go @@ -2,12 +2,49 @@ package logger import ( "os" + "sync/atomic" pkgerrors "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) +// AtomicCore provides thread-safe core swapping using atomic operations. +// It starts as a noop core and can be atomically swapped to include additional cores. +var _ zapcore.Core = &AtomicCore{} + +type AtomicCore struct { + atomic.Pointer[zapcore.Core] +} + +// NewAtomicCore creates a new AtomicCore initialized with a noop core +func NewAtomicCore() *AtomicCore { + ac := &AtomicCore{} + noop := zapcore.NewNopCore() + ac.Store(&noop) + return ac +} + +func (d *AtomicCore) load() zapcore.Core { + p := d.Load() + if p == nil { + return zapcore.NewNopCore() + } + return *p +} + +func (d *AtomicCore) Enabled(l zapcore.Level) bool { return d.load().Enabled(l) } + +func (d *AtomicCore) With(fs []zapcore.Field) zapcore.Core { return d.load().With(fs) } + +func (d *AtomicCore) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + return d.load().Check(e, ce) +} + +func (d *AtomicCore) Write(e zapcore.Entry, fs []zapcore.Field) error { return d.load().Write(e, fs) } + +func (d *AtomicCore) Sync() error { return d.load().Sync() } + var _ Logger = &zapLogger{} type zapLogger struct { diff --git a/core/logger/zap_disk_logging.go b/core/logger/zap_disk_logging.go index bf27eb0bd79..68386759821 100644 --- a/core/logger/zap_disk_logging.go +++ b/core/logger/zap_disk_logging.go @@ -4,10 +4,10 @@ import ( "sync" "time" - "github.com/smartcontractkit/chainlink/v2/core/utils" - "go.uber.org/zap" "go.uber.org/zap/zapcore" + + "github.com/smartcontractkit/chainlink/v2/core/utils" ) const ( diff --git a/go.mod b/go.mod index 655ed1129f6..77f1cad2eb4 100644 --- a/go.mod +++ b/go.mod @@ -128,6 +128,7 @@ require ( go.dedis.ch/kyber/v3 v3.1.0 go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.49.0 go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/log v0.13.0 go.opentelemetry.io/otel/metric v1.37.0 go.opentelemetry.io/otel/sdk/metric v1.37.0 go.opentelemetry.io/otel/trace v1.37.0 @@ -391,7 +392,6 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.13.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.36.0 // indirect go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.36.0 // indirect - go.opentelemetry.io/otel/log v0.13.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect go.opentelemetry.io/otel/sdk/log v0.13.0 // indirect go.opentelemetry.io/proto/otlp v1.6.0 // indirect From 64120ea99717bf610915a9dab88d98a70c59ecfb Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 09:48:27 +0200 Subject: [PATCH 03/12] Revert CSA test --- core/services/keystore/csa_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/services/keystore/csa_test.go b/core/services/keystore/csa_test.go index 7568176c641..281cd0933c5 100644 --- a/core/services/keystore/csa_test.go +++ b/core/services/keystore/csa_test.go @@ -29,9 +29,6 @@ func Test_CSAKeyStore_E2E(t *testing.T) { require.NoError(t, keyStore.Unlock(ctx, cltest.Password)) } - // Initial cleanup to ensure clean state at start - reset() - t.Run("initializes with an empty state", func(t *testing.T) { defer reset() keys, err := ks.GetAll() From 9968f6fa3a38f22776bf2f63e83bef7eb3abbe3b Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 11:03:57 +0200 Subject: [PATCH 04/12] Reorder before/after node functions --- core/cmd/shell_local.go | 64 ++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index cd3774581ae..23a0110caae 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -1052,38 +1052,6 @@ func (s *Shell) RemoveBlocks(c *cli.Context) error { return nil } -// BeforeNode initializes database, keystore, logger, and beholder for node startup. -// This is used as a Before hook in CLI commands that require these components. -func (s *Shell) BeforeNode(c *cli.Context) error { - if err := s.beforeNode(c); err != nil { - return s.errorOut(err) - } - return nil -} - -// afterNode is a thread-safe helper method to close the database and logger. -// This is used in multiple places: shutdown handler, signal handler, and cleanup. -// It uses sync.Once to ensure cleanup happens exactly once, even if called concurrently. -func (s *Shell) afterNode(lggr logger.SugaredLogger) { - s.CleanupOnce.Do(func() { - if err := s.LDB.Close(); err != nil { - lggr.Criticalf("Failed to close LockedDB: %v", err) - } - lggr.Debug("Closed DB") - - if err := s.CloseLogger(); err != nil { - log.Printf("Failed to close Logger: %v", err) - } - }) -} - -// AfterNode cleans up resources initialized by BeforeNode. -// This is used as an After hook in CLI commands. -func (s *Shell) AfterNode(c *cli.Context) error { - s.afterNode(logger.Sugared(s.Logger)) - return nil -} - // beforeNode performs the actual initialization of DB, keystore, and telemetry. // It handles password loading, database connection, keystore authentication, and Beholder setup. func (s *Shell) beforeNode(c *cli.Context) error { @@ -1163,3 +1131,35 @@ func (s *Shell) beforeNode(c *cli.Context) error { return nil } + +// BeforeNode initializes database, keystore, logger, and beholder for node startup. +// This is used as a Before hook in CLI commands that require these components. +func (s *Shell) BeforeNode(c *cli.Context) error { + if err := s.beforeNode(c); err != nil { + return s.errorOut(err) + } + return nil +} + +// afterNode is a thread-safe helper method to close the database and logger. +// This is used in multiple places: shutdown handler, signal handler, and cleanup. +// It uses sync.Once to ensure cleanup happens exactly once, even if called concurrently. +func (s *Shell) afterNode(lggr logger.SugaredLogger) { + s.CleanupOnce.Do(func() { + if err := s.LDB.Close(); err != nil { + lggr.Criticalf("Failed to close LockedDB: %v", err) + } + lggr.Debug("Closed DB") + + if err := s.CloseLogger(); err != nil { + log.Printf("Failed to close Logger: %v", err) + } + }) +} + +// AfterNode cleans up resources initialized by BeforeNode. +// This is used as an After hook in CLI commands. +func (s *Shell) AfterNode(c *cli.Context) error { + s.afterNode(logger.Sugared(s.Logger)) + return nil +} From 49aea79ae1b896a1a71c3188ca702e127cc80016 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 11:52:57 +0200 Subject: [PATCH 05/12] Add AfterNode execution to the tests --- core/cmd/shell_local_test.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index bd3f3fde984..4e59df3e857 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -573,9 +573,8 @@ func TestShell_BeforeNode(t *testing.T) { } // Clean up - if shell.LDB != nil { - shell.LDB.Close() - } + err = shell.AfterNode(c) + require.NoError(t, err) }) } } @@ -669,9 +668,8 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { } // Clean up - if shell.LDB != nil { - shell.LDB.Close() - } + err = shell.AfterNode(c) + require.NoError(t, err) }) } } From bdcb283466ebc3de98ad3757b38a8b84fab0c768 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 12:37:32 +0200 Subject: [PATCH 06/12] Fix test by cleaning resources only when they are provisioned --- core/cmd/shell_local_test.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 4e59df3e857..f4c4fab3ece 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -568,13 +568,14 @@ func TestShell_BeforeNode(t *testing.T) { require.NoError(t, keysErr) assert.NotEmpty(t, keys) } + + // Clean up + err = shell.AfterNode(c) + require.NoError(t, err) + } else { require.Error(t, err) } - - // Clean up - err = shell.AfterNode(c) - require.NoError(t, err) }) } } From c32136e68acaa046e5c20e8a466ba608f483b5e9 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 12:51:15 +0200 Subject: [PATCH 07/12] Fix lint --- core/cmd/shell_local_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index f4c4fab3ece..1ec0d0317b6 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -572,7 +572,6 @@ func TestShell_BeforeNode(t *testing.T) { // Clean up err = shell.AfterNode(c) require.NoError(t, err) - } else { require.Error(t, err) } From f7ce8be3d4060c7c7c750c6c6696c7957819974b Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 13:29:28 +0200 Subject: [PATCH 08/12] Remove redundant check for empty keystore --- core/cmd/shell_local_test.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 1ec0d0317b6..fc841fe7cf9 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -560,14 +560,9 @@ func TestShell_BeforeNode(t *testing.T) { assert.NotNil(t, shell.LDB) // Verify keystore is unlocked by checking if we can access keys - ctx := testutils.Context(t) - isEmpty, emptyErr := shell.KeyStore.IsEmpty(ctx) - require.NoError(t, emptyErr) - if !isEmpty { - keys, keysErr := shell.KeyStore.CSA().GetAll() - require.NoError(t, keysErr) - assert.NotEmpty(t, keys) - } + keys, keysErr := shell.KeyStore.CSA().GetAll() + require.NoError(t, keysErr) + assert.NotEmpty(t, keys) // Clean up err = shell.AfterNode(c) From 5f90e2188eed1b3617921d944d37c19b49223a08 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 15:12:17 +0200 Subject: [PATCH 09/12] Fix test by cleaning --- core/cmd/shell_local_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index fc841fe7cf9..8929e90bd9a 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -645,6 +645,15 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { require.NoError(t, err) err = shell.BeforeNode(c) + + // Always clean up database if it was opened, regardless of authentication success + defer func() { + if shell.LDB != nil { + err := shell.AfterNode(c) + require.NoError(t, err) + } + }() + if test.expectStart { require.NoError(t, err, "BeforeNode should succeed") // Verify components are initialized @@ -661,10 +670,6 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { require.Error(t, err, "BeforeNode should fail with incorrect password") // Don't test RunNode if BeforeNode failed } - - // Clean up - err = shell.AfterNode(c) - require.NoError(t, err) }) } } From 99bc87f546dbacdc8613ca270531e915bce3a6a1 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 15:18:48 +0200 Subject: [PATCH 10/12] Fix lint shadowing --- core/cmd/shell_local_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 8929e90bd9a..275d92a340e 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -649,8 +649,8 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { // Always clean up database if it was opened, regardless of authentication success defer func() { if shell.LDB != nil { - err := shell.AfterNode(c) - require.NoError(t, err) + cleanupErr := shell.AfterNode(c) + require.NoError(t, cleanupErr) } }() From acaf12c3b124defb351b8359d062fbc63e3ef280 Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 15:39:02 +0200 Subject: [PATCH 11/12] Fix hanging test --- core/cmd/shell_local_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index 275d92a340e..d4c87f0a72b 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -552,6 +552,14 @@ func TestShell_BeforeNode(t *testing.T) { // Run before hook to initialize components with authentication err = shell.BeforeNode(c) + // Always clean up database if it was opened, regardless of authentication success + defer func() { + if shell.LDB != nil { + cleanupErr := shell.AfterNode(c) + require.NoError(t, cleanupErr) + } + }() + if test.wantUnlocked { require.NoError(t, err) // Verify that shell components were initialized @@ -563,10 +571,6 @@ func TestShell_BeforeNode(t *testing.T) { keys, keysErr := shell.KeyStore.CSA().GetAll() require.NoError(t, keysErr) assert.NotEmpty(t, keys) - - // Clean up - err = shell.AfterNode(c) - require.NoError(t, err) } else { require.Error(t, err) } From 08f1d89a22e318148d9114ddf8589a484e3b312c Mon Sep 17 00:00:00 2001 From: kirqz23 Date: Tue, 14 Oct 2025 15:44:12 +0200 Subject: [PATCH 12/12] Change cleanup in the test --- core/cmd/shell_local_test.go | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/core/cmd/shell_local_test.go b/core/cmd/shell_local_test.go index d4c87f0a72b..6d7cd256685 100644 --- a/core/cmd/shell_local_test.go +++ b/core/cmd/shell_local_test.go @@ -552,14 +552,6 @@ func TestShell_BeforeNode(t *testing.T) { // Run before hook to initialize components with authentication err = shell.BeforeNode(c) - // Always clean up database if it was opened, regardless of authentication success - defer func() { - if shell.LDB != nil { - cleanupErr := shell.AfterNode(c) - require.NoError(t, cleanupErr) - } - }() - if test.wantUnlocked { require.NoError(t, err) // Verify that shell components were initialized @@ -574,6 +566,11 @@ func TestShell_BeforeNode(t *testing.T) { } else { require.Error(t, err) } + // Clean up database if it was opened + if shell.LDB != nil { + cleanupErr := shell.AfterNode(c) + require.NoError(t, cleanupErr) + } }) } } @@ -650,14 +647,6 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { err = shell.BeforeNode(c) - // Always clean up database if it was opened, regardless of authentication success - defer func() { - if shell.LDB != nil { - cleanupErr := shell.AfterNode(c) - require.NoError(t, cleanupErr) - } - }() - if test.expectStart { require.NoError(t, err, "BeforeNode should succeed") // Verify components are initialized @@ -674,6 +663,11 @@ func TestShell_RunNode_WithBeforeNode(t *testing.T) { require.Error(t, err, "BeforeNode should fail with incorrect password") // Don't test RunNode if BeforeNode failed } + // Clean up database if it was opened + if shell.LDB != nil { + cleanupErr := shell.AfterNode(c) + require.NoError(t, cleanupErr) + } }) } }