diff --git a/ingest/ledgerbackend/buffered_meta_pipe_reader.go b/ingest/ledgerbackend/buffered_meta_pipe_reader.go new file mode 100644 index 0000000000..2f0f19f4ce --- /dev/null +++ b/ingest/ledgerbackend/buffered_meta_pipe_reader.go @@ -0,0 +1,162 @@ +package ledgerbackend + +import ( + "bufio" + "io" + "time" + + "github.com/pkg/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +const ( + // The constants below define sizes of metaPipeBufferSize (binary) and + // ledgerReadAheadBufferSize (in ledgers). In general: + // + // metaPipeBufferSize >= + // ledgerReadAheadBufferSize * max over networks (average ledger size in bytes) + // + // so that meta pipe buffer always have binary data that can be unmarshaled into + // ledger buffer. + // After checking a few latest ledgers in pubnet and testnet the average size + // is: 100,000 and 5,000 bytes respectively. + + // metaPipeBufferSize defines the meta pipe buffer size. We need at least + // a couple MB to ensure there are at least a few ledgers captive core can + // unmarshal into read-ahead buffer while waiting for client to finish + // processing previous ledgers. + metaPipeBufferSize = 10 * 1024 * 1024 + // ledgerReadAheadBufferSize defines the size (in ledgers) of read ahead + // buffer that stores unmarshalled ledgers. This is especially important in + // an online mode when GetLedger calls are not blocking. In such case, clients + // usually wait for a specific time duration before checking if the ledger is + // available. When catching up and small buffer this can increase the overall + // time because ledgers are not available. + ledgerReadAheadBufferSize = 20 +) + +type metaResult struct { + *xdr.LedgerCloseMeta + err error +} + +// bufferedLedgerMetaReader is responsible for buffering meta pipe data in a +// fast and safe manner and unmarshaling it into XDR objects. +// +// It solves the following issues: +// +// * Decouples buffering from stellarCoreRunner so it can focus on running core. +// * Decouples unmarshalling and buffering of LedgerCloseMeta's from CaptiveCore. +// * By adding buffering it allows unmarshaling the ledgers available in Stellar-Core +// while previous ledger are being processed. +// * Limits memory usage in case of large ledgers are closed by the network. +// +// Internally, it keeps two buffers: bufio.Reader with binary ledger data and +// buffered channel with unmarshaled xdr.LedgerCloseMeta objects ready for +// processing. The first buffer removes overhead time connected to reading from +// a file. The second buffer allows unmarshaling binary data into XDR objects +// (which can be a bottleneck) while clients are processing previous ledgers. +// +// Finally, when a large ledger (larger than binary buffer) is closed it waits +// until xdr.LedgerCloseMeta objects channel is empty. This prevents memory +// exhaustion when network closes a series a large ledgers. +type bufferedLedgerMetaReader struct { + r *bufio.Reader + c chan metaResult + runner stellarCoreRunnerInterface +} + +// newBufferedLedgerMetaReader creates a new meta reader that will shutdown +// when stellar-core terminates. +func newBufferedLedgerMetaReader(runner stellarCoreRunnerInterface) bufferedLedgerMetaReader { + return bufferedLedgerMetaReader{ + c: make(chan metaResult, ledgerReadAheadBufferSize), + r: bufio.NewReaderSize(runner.getMetaPipe(), metaPipeBufferSize), + runner: runner, + } +} + +// readLedgerMetaFromPipe unmarshalls the next ledger from meta pipe. +// It can block for two reasons: +// * Meta pipe buffer is full so it will wait until it refills. +// * The next ledger available in the buffer exceeds the meta pipe buffer size. +// In such case the method will block until LedgerCloseMeta buffer is empty. +func (b *bufferedLedgerMetaReader) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { + frameLength, err := xdr.ReadFrameLength(b.r) + if err != nil { + select { + case <-b.runner.getProcessExitChan(): + return nil, errors.New("stellar-core process not-running") + default: + return nil, errors.Wrap(err, "error reading frame length") + } + } + + for frameLength > metaPipeBufferSize && len(b.c) > 0 { + // Wait for LedgerCloseMeta buffer to be cleared to minimize memory usage. + select { + case <-b.runner.getProcessExitChan(): + return nil, errors.New("stellar-core process not-running") + case <-time.After(time.Second): + } + } + + var xlcm xdr.LedgerCloseMeta + _, err = xdr.Unmarshal(b.r, &xlcm) + if err != nil { + if err == io.EOF { + err = errors.Wrap(err, "got EOF from subprocess") + } + err = errors.Wrap(err, "unmarshalling framed LedgerCloseMeta") + + select { + case <-b.runner.getProcessExitChan(): + return nil, errors.New("stellar-core process not-running") + default: + return nil, err + } + } + return &xlcm, nil +} + +func (b *bufferedLedgerMetaReader) GetChannel() <-chan metaResult { + return b.c +} + +func (b *bufferedLedgerMetaReader) Start(untilSequence uint32) { + printBufferOccupation := time.NewTicker(5 * time.Second) + defer printBufferOccupation.Stop() + for { + select { + case <-b.runner.getProcessExitChan(): + return + case <-printBufferOccupation.C: + log.Debug("captive core read-ahead buffer occupation:", len(b.c)) + default: + } + + meta, err := b.readLedgerMetaFromPipe() + if err != nil { + // When `GetLedger` sees the error it will close the backend. We shouldn't + // close it now because there may be some ledgers in a buffer. + select { + case b.c <- metaResult{nil, err}: + case <-b.runner.getProcessExitChan(): + } + return + } + select { + case b.c <- metaResult{meta, nil}: + case <-b.runner.getProcessExitChan(): + return + } + + if untilSequence != 0 { + if meta.LedgerSequence() >= untilSequence { + // we are done + return + } + } + } +} diff --git a/ingest/ledgerbackend/captive_core_backend.go b/ingest/ledgerbackend/captive_core_backend.go index 08d30f894c..60c3909781 100644 --- a/ingest/ledgerbackend/captive_core_backend.go +++ b/ingest/ledgerbackend/captive_core_backend.go @@ -2,24 +2,17 @@ package ledgerbackend import ( "encoding/hex" - "io" - "sync" "time" "github.com/pkg/errors" "github.com/stellar/go/historyarchive" - "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) // Ensure CaptiveStellarCore implements LedgerBackend var _ LedgerBackend = (*CaptiveStellarCore)(nil) -const ( - readAheadBufferSize = 2 -) - func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { v := (ledger / ledgersPerCheckpoint) * ledgersPerCheckpoint if v == 0 { @@ -30,11 +23,6 @@ func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { return v } -type metaResult struct { - *xdr.LedgerCloseMeta - err error -} - // CaptiveStellarCore is a ledger backend that starts internal Stellar-Core // subprocess responsible for streaming ledger data. It provides better decoupling // than DatabaseBackend but requires some extra init time. @@ -79,12 +67,7 @@ type CaptiveStellarCore struct { historyURLs []string archive historyarchive.ArchiveInterface - // shutdown is a channel that triggers the backend shutdown by the user. - shutdown chan struct{} - // metaC is a read-ahead buffer. - metaC chan metaResult - // wait is a waiting group waiting for a read-ahead buffer to return. - wait sync.WaitGroup + ledgerBuffer bufferedLedgerMetaReader // For testing stellarCoreRunnerFactory func(configPath string) (stellarCoreRunnerInterface, error) @@ -102,10 +85,6 @@ type CaptiveStellarCore struct { nextLedger uint32 // next ledger expected, error w/ restart if not seen lastLedger *uint32 // end of current segment if offline, nil if online - processExitMutex sync.Mutex - processExit bool - processErr error - // waitIntervalPrepareRange defines a time to wait between checking if the buffer // is empty. Default 1s, lower in tests to make them faster. waitIntervalPrepareRange time.Duration @@ -186,14 +165,10 @@ func (c *CaptiveStellarCore) openOfflineReplaySubprocess(from, to uint32) error c.nextLedger = roundDownToFirstReplayAfterCheckpointStart(from) c.lastLedger = &to c.blocking = true - c.processExit = false - c.processErr = nil // read-ahead buffer - c.metaC = make(chan metaResult, readAheadBufferSize) - c.shutdown = make(chan struct{}) - c.wait.Add(1) - go c.sendLedgerMeta(to) + c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner) + go c.ledgerBuffer.Start(to) return nil } @@ -249,14 +224,10 @@ func (c *CaptiveStellarCore) openOnlineReplaySubprocess(from uint32) error { c.nextLedger = nextLedger c.lastLedger = nil c.blocking = false - c.processExit = false - c.processErr = nil // read-ahead buffer - c.metaC = make(chan metaResult, readAheadBufferSize) - c.shutdown = make(chan struct{}) - c.wait.Add(1) - go c.sendLedgerMeta(0) + c.ledgerBuffer = newBufferedLedgerMetaReader(c.stellarCoreRunner) + go c.ledgerBuffer.Start(0) // if nextLedger is behind - fast-forward until expected ledger if c.nextLedger < from { @@ -318,77 +289,6 @@ func (c *CaptiveStellarCore) runFromParams(from uint32) (runFrom uint32, ledgerH return } -// sendLedgerMeta reads from the captive core pipe, decodes the ledger metadata -// and sends it to the metadata buffered channel -func (c *CaptiveStellarCore) sendLedgerMeta(untilSequence uint32) { - defer c.wait.Done() - printBufferOccupation := time.NewTicker(5 * time.Second) - defer printBufferOccupation.Stop() - for { - select { - case <-c.shutdown: - return - case <-printBufferOccupation.C: - log.Debug("captive core read-ahead buffer occupation:", len(c.metaC)) - default: - } - - meta, err := c.readLedgerMetaFromPipe() - if err != nil { - select { - case processErr := <-c.stellarCoreRunner.getProcessExitChan(): - // First, check if this is an error caused by a process exit. - c.processExitMutex.Lock() - c.processExit = true - c.processErr = processErr - c.processExitMutex.Unlock() - if processErr != nil { - err = errors.Wrap(processErr, "stellar-core process exited with an error") - } else { - err = errors.New("stellar-core process exited without an error unexpectedly") - } - default: - } - // When `GetLedger` sees the error it will close the backend. We shouldn't - // close it now because there may be some ledgers in a buffer. - select { - case c.metaC <- metaResult{nil, err}: - case <-c.shutdown: - } - return - } - select { - case c.metaC <- metaResult{meta, nil}: - case <-c.shutdown: - return - } - - if untilSequence != 0 { - if meta.LedgerSequence() >= untilSequence { - // we are done - return - } - } - } -} - -func (c *CaptiveStellarCore) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { - metaPipe := c.stellarCoreRunner.getMetaPipe() - if metaPipe == nil { - return nil, errors.New("missing metadata pipe") - } - var xlcm xdr.LedgerCloseMeta - _, e0 := xdr.UnmarshalFramed(metaPipe, &xlcm) - if e0 != nil { - if e0 == io.EOF { - return nil, errors.Wrap(e0, "got EOF from subprocess") - } else { - return nil, errors.Wrap(e0, "unmarshalling framed LedgerCloseMeta") - } - } - return &xlcm, nil -} - // PrepareRange prepares the given range (including from and to) to be loaded. // Captive stellar-core backend needs to initalize Stellar-Core state to be // able to stream ledgers. @@ -423,25 +323,18 @@ func (c *CaptiveStellarCore) PrepareRange(ledgerRange Range) error { for { select { - case <-c.shutdown: - return nil + case <-c.stellarCoreRunner.getProcessExitChan(): + processErr := c.stellarCoreRunner.getProcessExitError() + if processErr != nil { + err = errors.Wrap(processErr, "stellar-core process exited with an error") + } else { + err = errors.New("stellar-core process exited unexpectedly without an error") + } + return err default: } // Wait for the first ledger or an error - if len(c.metaC) > 0 { - // If process exited return an error - c.processExitMutex.Lock() - if c.processExit { - if c.processErr != nil { - err = errors.Wrap(c.processErr, "stellar-core process exited with an error") - } else { - err = errors.New("stellar-core process exited without an error unexpectedly") - } - } - c.processExitMutex.Unlock() - if err != nil { - return err - } + if len(c.ledgerBuffer.GetChannel()) > 0 { break } time.Sleep(c.waitIntervalPrepareRange) @@ -511,17 +404,28 @@ func (c *CaptiveStellarCore) GetLedger(sequence uint32) (bool, xdr.LedgerCloseMe var errOut error loop: for { - if !c.blocking && len(c.metaC) == 0 { + if !c.blocking && len(c.ledgerBuffer.GetChannel()) == 0 { return false, xdr.LedgerCloseMeta{}, nil } - metaResult := <-c.metaC - if metaResult.err != nil { - errOut = metaResult.err + var result metaResult + select { + case <-c.stellarCoreRunner.getProcessExitChan(): + processErr := c.stellarCoreRunner.getProcessExitError() + if processErr != nil { + errOut = errors.Wrap(processErr, "stellar-core process exited with an error") + } else { + errOut = errors.New("stellar-core process exited unexpectedly without an error") + } + break loop + case result = <-c.ledgerBuffer.GetChannel(): + } + if result.err != nil { + errOut = result.err break loop } - seq := metaResult.LedgerCloseMeta.LedgerSequence() + seq := result.LedgerCloseMeta.LedgerSequence() if seq != c.nextLedger { // We got something unexpected; close and reset errOut = errors.Errorf("unexpected ledger (expected=%d actual=%d)", c.nextLedger, seq) @@ -530,7 +434,7 @@ loop: c.nextLedger++ if seq == sequence { // Found the requested seq - c.cachedMeta = metaResult.LedgerCloseMeta + c.cachedMeta = result.LedgerCloseMeta // If we got the _last_ ledger in a segment, close before returning. if c.lastLedger != nil && *c.lastLedger == seq { @@ -561,7 +465,7 @@ func (c *CaptiveStellarCore) GetLatestLedgerSequence() (uint32, error) { } if c.lastLedger == nil { - return c.nextLedger - 1 + uint32(len(c.metaC)), nil + return c.nextLedger - 1 + uint32(len(c.ledgerBuffer.GetChannel())), nil } return *c.lastLedger, nil } @@ -576,16 +480,9 @@ func (c *CaptiveStellarCore) Close() error { c.nextLedger = 0 c.lastLedger = nil - if c.shutdown != nil { - close(c.shutdown) - // Do not close the communication channel until we know - // the goroutine is done - c.wait.Wait() - close(c.metaC) - c.shutdown = nil - } - if c.stellarCoreRunner != nil { + // Closing stellarCoreRunner will automatically close bufferedLedgerMetaReader + // because it's listening for getProcessExitChan(). err := c.stellarCoreRunner.close() c.stellarCoreRunner = nil if err != nil { diff --git a/ingest/ledgerbackend/captive_core_backend_test.go b/ingest/ledgerbackend/captive_core_backend_test.go index 711d7ba628..f8bedbdda2 100644 --- a/ingest/ledgerbackend/captive_core_backend_test.go +++ b/ingest/ledgerbackend/captive_core_backend_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "testing" + "time" "github.com/stellar/go/historyarchive" "github.com/stellar/go/network" @@ -37,9 +38,14 @@ func (m *stellarCoreRunnerMock) getMetaPipe() io.Reader { return a.Get(0).(io.Reader) } -func (m *stellarCoreRunnerMock) getProcessExitChan() <-chan error { +func (m *stellarCoreRunnerMock) getProcessExitChan() <-chan struct{} { a := m.Called() - return a.Get(0).(chan error) + return a.Get(0).(chan struct{}) +} + +func (m *stellarCoreRunnerMock) getProcessExitError() error { + a := m.Called() + return a.Error(0) } func (m *stellarCoreRunnerMock) close() error { @@ -139,7 +145,7 @@ func TestCaptivePrepareRange(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() - mockRunner.On("getProcessExitChan").Return(make(chan error)) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil).Once() @@ -168,11 +174,12 @@ func TestCaptivePrepareRange(t *testing.T) { func TestCaptivePrepareRangeCrash(t *testing.T) { var buf bytes.Buffer - ch := make(chan error, 1) // we use buffered channel in tests only - ch <- errors.New("exit code -1") + ch := make(chan struct{}) + close(ch) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() mockRunner.On("getProcessExitChan").Return(ch) + mockRunner.On("getProcessExitError").Return(errors.New("exit code -1")) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil).Once() @@ -199,11 +206,12 @@ func TestCaptivePrepareRangeCrash(t *testing.T) { func TestCaptivePrepareRangeTerminated(t *testing.T) { var buf bytes.Buffer - ch := make(chan error, 1) // we use buffered channel in tests only - ch <- nil + ch := make(chan struct{}) + close(ch) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(200)).Return(nil).Once() mockRunner.On("getProcessExitChan").Return(ch) + mockRunner.On("getProcessExitError").Return(nil) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil).Once() @@ -224,7 +232,7 @@ func TestCaptivePrepareRangeTerminated(t *testing.T) { err := captiveBackend.PrepareRange(BoundedRange(100, 200)) assert.Error(t, err) - assert.EqualError(t, err, "stellar-core process exited without an error unexpectedly") + assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error") } func TestCaptivePrepareRange_ErrClosingSession(t *testing.T) { @@ -289,7 +297,7 @@ func TestCaptivePrepareRange_ToIsAheadOfRootHAS(t *testing.T) { var buf bytes.Buffer mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(100), uint32(192)).Return(nil).Once() - mockRunner.On("getProcessExitChan").Return(make(chan error)) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil) @@ -440,7 +448,7 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("runFrom", uint32(63), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return(&buf) - mockRunner.On("getProcessExitChan").Return(make(chan error)) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("close").Return(nil) mockArchive := &historyarchive.MockArchive{} @@ -473,14 +481,14 @@ func TestCaptivePrepareRangeUnboundedRange_ReuseSession(t *testing.T) { func TestGetLatestLedgerSequence(t *testing.T) { var buf bytes.Buffer - for i := 2; i <= 99; i++ { + for i := 2; i <= 200; i++ { writeLedgerHeader(&buf, uint32(i)) } mockRunner := &stellarCoreRunnerMock{} mockRunner.On("runFrom", uint32(62), "0000000000000000000000000000000000000000000000000000000000000000").Return(nil).Once() mockRunner.On("getMetaPipe").Return(&buf) - mockRunner.On("getProcessExitChan").Return(make(chan error)) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("close").Return(nil).Once() mockArchive := &historyarchive.MockArchive{} @@ -511,9 +519,8 @@ func TestGetLatestLedgerSequence(t *testing.T) { latest, err := captiveBackend.GetLatestLedgerSequence() assert.NoError(t, err) - // readAheadBufferSize is 2 so 2 ledgers are buffered: 65 and 66. - // 64 is already read and in the cache. - assert.Equal(t, uint32(66), latest) + // This should be last read ledger + ledgerReadAheadBufferSize. + assert.Equal(t, uint32(64+ledgerReadAheadBufferSize), latest) exists, _, err := captiveBackend.GetLedger(64) assert.NoError(t, err) @@ -523,8 +530,8 @@ func TestGetLatestLedgerSequence(t *testing.T) { latest, err = captiveBackend.GetLatestLedgerSequence() assert.NoError(t, err) - // readAheadBufferSize is 2 so 2 ledgers are buffered: 65 and 66 - assert.Equal(t, uint32(66), latest) + // This should be last read ledger + ledgerReadAheadBufferSize. + assert.Equal(t, uint32(64+ledgerReadAheadBufferSize), latest) mockRunner.On("close").Return(nil).Once() err = captiveBackend.Close() @@ -539,6 +546,7 @@ func TestCaptiveGetLedger(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil) @@ -603,6 +611,7 @@ func TestCaptiveGetLedger_NextLedgerIsDifferentToLedgerFromBuffer(t *testing.T) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil) @@ -633,7 +642,7 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) - mockRunner.On("getProcessExitChan").Return(make(chan error)) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil) @@ -657,7 +666,7 @@ func TestCaptiveGetLedger_ErrReadingMetaResult(t *testing.T) { // try reading from an empty buffer _, _, err = captiveBackend.GetLedger(64) - tt.EqualError(err, "unmarshalling framed LedgerCloseMeta: unmarshalling XDR frame header: xdr:DecodeUint: EOF while decoding 4 bytes - read: '[]'") + tt.EqualError(err, "error reading frame length: unmarshalling XDR frame header: xdr:DecodeUint: EOF while decoding 4 bytes - read: '[]'") // closes if there is an error getting ledger tt.True(captiveBackend.isClosed()) @@ -671,6 +680,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(65), uint32(66)).Return(nil) + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(fmt.Errorf("transient error")) @@ -698,7 +708,7 @@ func TestCaptiveGetLedger_ErrClosingAfterLastLedger(t *testing.T) { func waitForBufferToFill(captiveBackend *CaptiveStellarCore) { for { - if len(captiveBackend.metaC) == readAheadBufferSize { + if len(captiveBackend.ledgerBuffer.c) == ledgerReadAheadBufferSize { break } } @@ -712,6 +722,7 @@ func TestGetLedgerBoundsCheck(t *testing.T) { mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(128), uint32(130)).Return(nil).Once() + mockRunner.On("getProcessExitChan").Return(make(chan struct{})) mockRunner.On("getMetaPipe").Return(&buf) mockRunner.On("close").Return(nil).Once() @@ -775,10 +786,11 @@ func TestGetLedgerBoundsCheck(t *testing.T) { func TestCaptiveGetLedgerTerminated(t *testing.T) { reader, writer := io.Pipe() - ch := make(chan error, 1) // we use buffered channel in tests only + ch := make(chan struct{}) mockRunner := &stellarCoreRunnerMock{} mockRunner.On("catchup", uint32(64), uint32(100)).Return(nil).Once() mockRunner.On("getProcessExitChan").Return(ch) + mockRunner.On("getProcessExitError").Return(nil) mockRunner.On("getMetaPipe").Return(reader) mockRunner.On("close").Return(nil).Once() @@ -800,18 +812,25 @@ func TestCaptiveGetLedgerTerminated(t *testing.T) { go writeLedgerHeader(writer, 64) err := captiveBackend.PrepareRange(BoundedRange(64, 100)) assert.NoError(t, err) - - ch <- nil - writer.Close() + for { + // Wait for ledger to appear in the buffer + if len(captiveBackend.ledgerBuffer.c) == 1 { + break + } + time.Sleep(100 * time.Millisecond) + } exists, meta, err := captiveBackend.GetLedger(64) assert.NoError(t, err) assert.True(t, exists) assert.Equal(t, uint32(64), meta.LedgerSequence()) + close(ch) + writer.Close() + _, _, err = captiveBackend.GetLedger(65) assert.Error(t, err) - assert.EqualError(t, err, "stellar-core process exited without an error unexpectedly") + assert.EqualError(t, err, "stellar-core process exited unexpectedly without an error") } func TestCaptiveRunFromParams(t *testing.T) { diff --git a/ingest/ledgerbackend/stellar_core_runner.go b/ingest/ledgerbackend/stellar_core_runner.go index 950fa31ccc..40dd05ad95 100644 --- a/ingest/ledgerbackend/stellar_core_runner.go +++ b/ingest/ledgerbackend/stellar_core_runner.go @@ -21,7 +21,10 @@ type stellarCoreRunnerInterface interface { catchup(from, to uint32) error runFrom(from uint32, hash string) error getMetaPipe() io.Reader - getProcessExitChan() <-chan error + // getProcessExitChan returns a channel that closes on process exit + getProcessExitChan() <-chan struct{} + // getProcessExitError returns an exit error of the process, can be nil + getProcessExitError() error close() error } @@ -38,10 +41,11 @@ type stellarCoreRunner struct { cmd *exec.Cmd // processExit channel receives an error when the process exited with an error // or nil if the process exited without an error. - processExit chan error - metaPipe io.Reader - tempDir string - nonce string + processExit chan struct{} + processExitError error + metaPipe io.Reader + tempDir string + nonce string } func newStellarCoreRunner(executablePath, configPath, networkPassphrase string, historyURLs []string) (*stellarCoreRunner, error) { @@ -61,7 +65,8 @@ func newStellarCoreRunner(executablePath, configPath, networkPassphrase string, networkPassphrase: networkPassphrase, historyURLs: historyURLs, shutdown: make(chan struct{}), - processExit: make(chan error), + processExit: make(chan struct{}), + processExitError: nil, tempDir: tempDir, nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()), } @@ -180,9 +185,6 @@ func (r *stellarCoreRunner) catchup(from, to uint32) error { } r.started = true - // Do not remove bufio.Reader wrapping. Turns out that each read from a pipe - // adds an overhead time so it's better to preload data to a buffer. - r.metaPipe = bufio.NewReaderSize(r.metaPipe, 1024*1024) return nil } @@ -207,9 +209,6 @@ func (r *stellarCoreRunner) runFrom(from uint32, hash string) error { } r.started = true - // Do not remove bufio.Reader wrapping. Turns out that each read from a pipe - // adds an overhead time so it's better to preload data to a buffer. - r.metaPipe = bufio.NewReaderSize(r.metaPipe, 1024*1024) return nil } @@ -217,10 +216,14 @@ func (r *stellarCoreRunner) getMetaPipe() io.Reader { return r.metaPipe } -func (r *stellarCoreRunner) getProcessExitChan() <-chan error { +func (r *stellarCoreRunner) getProcessExitChan() <-chan struct{} { return r.processExit } +func (r *stellarCoreRunner) getProcessExitError() error { + return r.processExitError +} + func (r *stellarCoreRunner) close() error { var err1, err2 error diff --git a/ingest/ledgerbackend/stellar_core_runner_posix.go b/ingest/ledgerbackend/stellar_core_runner_posix.go index f821e8f159..c1cfbcb62d 100644 --- a/ingest/ledgerbackend/stellar_core_runner_posix.go +++ b/ingest/ledgerbackend/stellar_core_runner_posix.go @@ -39,8 +39,12 @@ func (c *stellarCoreRunner) start() (io.Reader, error) { c.wg.Add(1) go func() { + err := make(chan error, 1) select { - case c.processExit <- c.cmd.Wait(): + case err <- c.cmd.Wait(): + c.processExitError = <-err + close(c.processExit) + close(err) case <-c.shutdown: } c.wg.Done() diff --git a/ingest/ledgerbackend/stellar_core_runner_windows.go b/ingest/ledgerbackend/stellar_core_runner_windows.go index 8b2d14bccd..6fa93ae138 100644 --- a/ingest/ledgerbackend/stellar_core_runner_windows.go +++ b/ingest/ledgerbackend/stellar_core_runner_windows.go @@ -31,8 +31,12 @@ func (c *stellarCoreRunner) start() (io.Reader, error) { c.wg.Add(1) go func() { + err := make(chan error, 1) select { - case c.processExit <- c.cmd.Wait(): + case err <- c.cmd.Wait(): + c.processExitError = <-err + close(c.processExit) + close(err) case <-c.shutdown: } c.wg.Done() diff --git a/xdr/main.go b/xdr/main.go index 9900173e23..de80952918 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -124,30 +124,39 @@ func MarshalFramed(w io.Writer, v interface{}) error { return err } -// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte -// big-endian length header that has the high bit set, followed by that length worth of -// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. -func UnmarshalFramed(r io.Reader, v interface{}) (int, error) { +// ReadFrameLength returns a length of a framed XDR object. +func ReadFrameLength(r io.Reader) (uint32, error) { var frameLen uint32 n, e := Unmarshal(r, &frameLen) if e != nil { - return n, errors.Wrap(e, "unmarshalling XDR frame header") + return 0, errors.Wrap(e, "unmarshalling XDR frame header") } if n != 4 { - return n, errors.New("bad length of XDR frame header") + return 0, errors.New("bad length of XDR frame header") } if (frameLen & 0x80000000) != 0x80000000 { - return n, errors.New("malformed XDR frame header") + return 0, errors.New("malformed XDR frame header") } frameLen &= 0x7fffffff + return frameLen, nil +} + +// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte +// big-endian length header that has the high bit set, followed by that length worth of +// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. +func UnmarshalFramed(r io.Reader, v interface{}) (int, error) { + frameLen, err := ReadFrameLength(r) + if err != nil { + return 0, errors.Wrap(err, "unmarshalling XDR frame header") + } m, err := xdr.Unmarshal(r, v) if err != nil { - return n + m, errors.Wrap(err, "unmarshalling framed XDR") + return 0, errors.Wrap(err, "unmarshalling framed XDR") } if int64(m) != int64(frameLen) { - return n + m, errors.New("bad length of XDR frame body") + return 0, errors.New("bad length of XDR frame body") } - return m + n, nil + return m + 4 /* frame size: uint32 */, nil } type countWriter struct {