Skip to content

Commit

Permalink
ingest/ledgerbackend: Restart captive core when a new version of core…
Browse files Browse the repository at this point in the history
… is detected on disk (#3687)

Add a ticker to stellarCoreRunner which will execute every 10 seconds. The stellarCoreRunner context will interrupt the goroutine attached to the ticker so that if captive core shutsdown the ticker will also stop.

Upon every tick, we will check to see if the stellar core binary has been modified. If it has been modified we log a warning message to announce we will be shutting down captive core and then we call stellarCoreRunner.close() to terminate the captive core instance. Once stellarCoreRunner.close() is called any pending operations (PrepareRange() or GetLedger()) should return immediately with an error and the ingestion state machine will handle the error by retrying.

When the retry happens it will create a new instance of stellarCoreRunner which should use the newest stellar core binary.
  • Loading branch information
tamirms authored Jun 14, 2021
1 parent 736b047 commit 226d70a
Show file tree
Hide file tree
Showing 4 changed files with 312 additions and 1 deletion.
89 changes: 89 additions & 0 deletions ingest/ledgerbackend/file_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package ledgerbackend

import (
"os"
"sync"
"time"

"github.com/pkg/errors"

"github.com/stellar/go/support/log"
)

type fileWatcher struct {
pathToFile string
duration time.Duration
onChange func()
exit <-chan struct{}
log *log.Entry
stat func(string) (os.FileInfo, error)
lastModTime time.Time
}

func newFileWatcher(runner *stellarCoreRunner) (*fileWatcher, error) {
return newFileWatcherWithOptions(runner, os.Stat, 10*time.Second)
}

func newFileWatcherWithOptions(
runner *stellarCoreRunner,
stat func(string) (os.FileInfo, error),
tickerDuration time.Duration,
) (*fileWatcher, error) {
info, err := stat(runner.executablePath)
if err != nil {
return nil, errors.Wrap(err, "could not stat captive core binary")
}

once := &sync.Once{}
return &fileWatcher{
pathToFile: runner.executablePath,
duration: tickerDuration,
onChange: func() {
once.Do(func() {
runner.log.Warnf("detected new version of captive core binary %s , aborting session.", runner.executablePath)
if err := runner.close(); err != nil {
runner.log.Warnf("could not close captive core %v", err)
}
})
},
exit: runner.ctx.Done(),
log: runner.log,
stat: stat,
lastModTime: info.ModTime(),
}, nil
}

func (f *fileWatcher) loop() {
ticker := time.NewTicker(f.duration)

for {
select {
case <-f.exit:
ticker.Stop()
return
case <-ticker.C:
if f.fileChanged() {
f.onChange()
}
}
}
}

func (f *fileWatcher) fileChanged() bool {
info, err := f.stat(f.pathToFile)
if err != nil {
f.log.Warnf("could not stat %s: %v", f.pathToFile, err)
return false
}

if modTime := info.ModTime(); !f.lastModTime.Equal(modTime) {
f.log.Infof(
"detected update to %s. previous file timestamp was %v current timestamp is %v",
f.pathToFile,
f.lastModTime,
modTime,
)
return true
}
return false
}
207 changes: 207 additions & 0 deletions ingest/ledgerbackend/file_watcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package ledgerbackend

import (
"context"
"fmt"
"os"
"sync"
"testing"
"time"

"github.com/stellar/go/support/log"

"github.com/stretchr/testify/assert"
)

type mockFile struct {
modTime time.Time
}

func (mockFile) Name() string {
return ""
}

func (mockFile) Size() int64 {
return 0
}

func (mockFile) Mode() os.FileMode {
return 0
}

func (mockFile) IsDir() bool {
return false
}

func (mockFile) Sys() interface{} {
return nil
}
func (m mockFile) ModTime() time.Time {
return m.modTime
}

type mockStat struct {
sync.Mutex
t *testing.T
expectedPath string
modTime time.Time
err error
callCount int
}

func (m *mockStat) setResponse(modTime time.Time, err error) {
m.Lock()
defer m.Unlock()
m.modTime = modTime
m.err = err
}

func (m *mockStat) getCallCount() int {
m.Lock()
defer m.Unlock()
return m.callCount
}

func (m *mockStat) stat(fp string) (os.FileInfo, error) {
m.Lock()
defer m.Unlock()
m.callCount++
assert.Equal(m.t, m.expectedPath, fp)
//defer m.onCall(m)
return mockFile{m.modTime}, m.err
}

func createFWFixtures(t *testing.T) (*mockStat, *stellarCoreRunner, *fileWatcher) {
ms := &mockStat{
modTime: time.Now(),
expectedPath: "/some/path",
t: t,
}

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

fw, err := newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
assert.NoError(t, err)
assert.Equal(t, 1, ms.getCallCount())

return ms, runner, fw
}

func TestNewFileWatcherError(t *testing.T) {
ms := &mockStat{
modTime: time.Now(),
expectedPath: "/some/path",
t: t,
}
ms.setResponse(time.Time{}, fmt.Errorf("test error"))

captiveCoreToml, err := NewCaptiveCoreToml(CaptiveCoreTomlParams{})
assert.NoError(t, err)

runner, err := newStellarCoreRunner(CaptiveCoreConfig{
BinaryPath: "/some/path",
HistoryArchiveURLs: []string{"http://localhost"},
Log: log.New(),
Context: context.Background(),
Toml: captiveCoreToml,
}, stellarCoreRunnerModeOffline)
assert.NoError(t, err)

_, err = newFileWatcherWithOptions(runner, ms.stat, time.Millisecond)
assert.EqualError(t, err, "could not stat captive core binary: test error")
assert.Equal(t, 1, ms.getCallCount())
}

func TestFileChanged(t *testing.T) {
ms, _, fw := createFWFixtures(t)

modTime := ms.modTime

assert.False(t, fw.fileChanged())
assert.False(t, fw.fileChanged())
assert.Equal(t, 3, ms.getCallCount())

ms.setResponse(time.Time{}, fmt.Errorf("test error"))
assert.False(t, fw.fileChanged())
assert.Equal(t, 4, ms.getCallCount())

ms.setResponse(modTime, nil)
assert.False(t, fw.fileChanged())
assert.Equal(t, 5, ms.getCallCount())

ms.setResponse(time.Now().Add(time.Hour), nil)
assert.True(t, fw.fileChanged())
assert.Equal(t, 6, ms.getCallCount())
}

func TestCloseRunnerBeforeFileWatcherLoop(t *testing.T) {
_, runner, fw := createFWFixtures(t)

assert.NoError(t, runner.close())

// loop should exit almost immediately because the runner is closed
fw.loop()
}

func TestCloseRunnerDuringFileWatcherLoop(t *testing.T) {
ms, runner, fw := createFWFixtures(t)
done := make(chan struct{})
go func() {
fw.loop()
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// This test ensures that closing the runner will exit fw.loop so that the goroutine is not leaked.

closedRunner := false
for {
select {
case <-done:
assert.True(t, closedRunner)
return
default:
if ms.getCallCount() > 20 {
runner.close()
closedRunner = true
}
}
}
}

func TestFileChangesTriggerRunnerClose(t *testing.T) {
ms, runner, fw := createFWFixtures(t)
done := make(chan struct{})
go func() {
fw.loop()
close(done)
}()

// fw.loop will repeatedly check if the file has changed by calling stat.
// This test ensures that modifying the file will trigger the closing of the runner.
modifiedFile := false
for {
select {
case <-done:
assert.True(t, modifiedFile)
// the runner is closed if and only if runner.ctx.Err() is non-nil
assert.Error(t, runner.ctx.Err())
return
default:
if ms.getCallCount() > 20 {
ms.setResponse(time.Now().Add(time.Hour), nil)
modifiedFile = true
}
}
}
}
14 changes: 14 additions & 0 deletions ingest/ledgerbackend/stellar_core_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,13 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error {
r.started = true
r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader)
go r.ledgerBuffer.start()

if binaryWatcher, err := newFileWatcher(r); err != nil {
r.log.Warnf("could not create captive core binary watcher: %v", err)
} else {
go binaryWatcher.loop()
}

r.wg.Add(1)
go r.handleExit(cmd)

Expand Down Expand Up @@ -304,6 +311,13 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error {
r.started = true
r.ledgerBuffer = newBufferedLedgerMetaReader(r.pipe.Reader)
go r.ledgerBuffer.start()

if binaryWatcher, err := newFileWatcher(r); err != nil {
r.log.Warnf("could not create captive core binary watcher: %v", err)
} else {
go binaryWatcher.loop()
}

r.wg.Add(1)
go r.handleExit(cmd)

Expand Down
3 changes: 2 additions & 1 deletion services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
### New Features
* Add new command `horizon db detect-gaps`, which detects ingestion gaps in the database. The command prints out the `db reingest` commands to run in order to fill the gaps found.

* Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([](https://github.com/stellar/go/pull/XXXX)). **This feature requires Stellar-Core version 17.1 or later.**
* Performance improvement: Captive Core now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.1.0](#v2.1.0) rather than generating a one-time temporary sub-directory ([3670](https://github.com/stellar/go/pull/3670)). **This feature requires Stellar-Core version 17.1 or later.**

* Horizon now monitors the Stellar Core binary on disk (pointed to by `--stellar-core-binary-path`/`STELLAR_CORE_BINARY_PATH`) and restarts its Captive Core subprocess if it detects changes (i.e a more recent file timestamp for the Stellar Core binary) ([3687](https://github.com/stellar/go/pull/3687)).

## v2.4.1

Expand Down

0 comments on commit 226d70a

Please sign in to comment.