From 079baea1074d04f6ac29439d47b31142fe20913f Mon Sep 17 00:00:00 2001 From: Graydon Hoare Date: Wed, 20 May 2020 14:07:49 -0700 Subject: [PATCH 1/7] exp/ingest: Captive-stellar-core sketch (#2322) This is an initial sketch of support for an ingestion backend driven by a captive stellar-core subprocess replaying ledgers in memory. It's going to need a bunch of iteration before it's viable, but this PR should serve as a place to examine & discuss the approach, fix bugs, etc. --- .../ledgerbackend/captive_core_backend.go | 429 ++++++++++++++++++ .../captive_core_backend_posix.go | 66 +++ .../captive_core_backend_test.go | 28 ++ .../captive_core_backend_windows.go | 67 +++ exp/ingest/ledgerbackend/database_backend.go | 22 + exp/ingest/ledgerbackend/ledger_backend.go | 8 +- .../ledgerbackend/mock_database_backend.go | 5 + go.mod | 3 +- go.sum | 5 + .../internal/expingest/fake_ledger_backend.go | 4 + services/horizon/internal/expingest/fsm.go | 17 + services/horizon/internal/expingest/main.go | 15 +- 12 files changed, 662 insertions(+), 7 deletions(-) create mode 100644 exp/ingest/ledgerbackend/captive_core_backend.go create mode 100644 exp/ingest/ledgerbackend/captive_core_backend_posix.go create mode 100644 exp/ingest/ledgerbackend/captive_core_backend_test.go create mode 100644 exp/ingest/ledgerbackend/captive_core_backend_windows.go diff --git a/exp/ingest/ledgerbackend/captive_core_backend.go b/exp/ingest/ledgerbackend/captive_core_backend.go new file mode 100644 index 0000000000..ee77fd563b --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend.go @@ -0,0 +1,429 @@ +package ledgerbackend + +import ( + "bufio" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "os/exec" + "path/filepath" + "regexp" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "github.com/stellar/go/network" + "github.com/stellar/go/support/historyarchive" + "github.com/stellar/go/xdr" +) + +// Ensure captiveStellarCore implements LedgerBackend +var _ LedgerBackend = (*captiveStellarCore)(nil) + +// This is a not-very-complete or well-organized sketch of code be used to +// stream LedgerCloseMeta data from a "captive" stellar-core: one running as a +// subprocess and replaying portions of history against an in-memory ledger. +// +// A captive stellar-core still needs (and allocates, in os.TempDir()) a +// temporary directory to run in: one in which its config file is stored, along +// with temporary files it downloads and decompresses, and its bucket +// state. Only the ledger will be in-memory (and we might even switch this to +// SQLite + large buffers in the future if the in-memory ledger gets too big.) +// +// Feel free to reorganize this to fit better. It's preliminary! + +// TODO: switch from history URLs to history archive interface provided from support package, to permit mocking + +// In this (crude, initial) sketch, we replay ledgers in blocks of 17,280 +// which is 24 hours worth of ledgers at 5 second intervals. +const ledgersPerProcess = 17280 +const ledgersPerCheckpoint = 64 + +// The number of checkpoints we're willing to scan over and ignore, without +// restarting a subprocess. +const numCheckpointsLeeway = 10 + +func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { + v := (ledger / ledgersPerCheckpoint) * ledgersPerCheckpoint + if v == 0 { + return 1 + } + // All other checkpoints start at the next multiple of 64 + return v +} + +type captiveStellarCore struct { + nonce string + networkPassphrase string + historyURLs []string + lastLedger *uint32 // end of current segment if offline, nil if online + cmd *exec.Cmd + metaPipe io.Reader + + nextLedgerMutex sync.Mutex + nextLedger uint32 // next ledger expected, error w/ restart if not seen +} + +// NewCaptive returns a new captiveStellarCore that is not running. Will lazily start a subprocess +// to feed it a block of streaming metadata when user calls .GetLedger(), and will kill +// and restart the subprocess if subsequent calls to .GetLedger() are discontiguous. +// +// Platform-specific pipe setup logic is in the .start() methods. +func NewCaptive(networkPassphrase string, historyURLs []string) *captiveStellarCore { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + return &captiveStellarCore{ + nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()), + networkPassphrase: networkPassphrase, + historyURLs: historyURLs, + nextLedger: 0, + } +} + +// Each captiveStellarCore is either doing bulk offline replay or tracking +// a network as it closes ledgers online. These cases are differentiated +// by the lastLedger field of captiveStellarCore, which is nil in the online +// case (indicating there's no end to the subprocess) and non-nil in the +// offline case (indicating that the subprocess will be closed after it yields +// the last ledger in the segment). +func (c *captiveStellarCore) IsInOfflineReplayMode() bool { + return c.lastLedger != nil +} + +func (c *captiveStellarCore) IsInOnlineTrackingMode() bool { + return c.lastLedger == nil +} + +// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte +// big-endian length header that has the high bit set, followed by that length worth of +// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. +func unmarshalFramed(r io.Reader, v interface{}) (int, error) { + var frameLen uint32 + n, e := xdr.Unmarshal(r, &frameLen) + if e != nil { + err := errors.Wrap(e, "unmarshalling XDR frame header") + return n, err + } + if n != 4 { + err := errors.New("bad length of XDR frame header") + return n, err + } + if (frameLen & 0x80000000) != 0x80000000 { + err := errors.New("malformed XDR frame header") + return n, err + } + frameLen &= 0x7fffffff + m, e := xdr.Unmarshal(r, v) + if e != nil { + err := errors.Wrap(e, "unmarshalling framed XDR") + return n + m, err + } + if int64(m) != int64(frameLen) { + err := errors.New("bad length of XDR frame body") + return n + m, err + } + return m + n, nil +} + +// Returns the sequence number of an LCM, returning an error if the LCM is of +// an unknown version. +func peekLedgerSequence(xlcm *xdr.LedgerCloseMeta) (uint32, error) { + v0, ok := xlcm.GetV0() + if !ok { + err := errors.New("unexpected XDR LedgerCloseMeta version") + return 0, err + } + return uint32(v0.LedgerHeader.Header.LedgerSeq), nil +} + +// Note: the xdr.LedgerCloseMeta structure is _not_ the same as +// the ledgerbackend.LedgerCloseMeta structure; the latter should +// probably migrate to the former eventually. +func (c *captiveStellarCore) copyLedgerCloseMeta(xlcm *xdr.LedgerCloseMeta, lcm *LedgerCloseMeta) error { + v0, ok := xlcm.GetV0() + if !ok { + return errors.New("unexpected XDR LedgerCloseMeta version") + } + lcm.LedgerHeader = v0.LedgerHeader + envelopes := make(map[xdr.Hash]xdr.TransactionEnvelope) + for _, tx := range v0.TxSet.Txs { + hash, e := network.HashTransactionInEnvelope(tx, c.networkPassphrase) + if e != nil { + return errors.Wrap(e, "error hashing tx in LedgerCloseMeta") + } + envelopes[hash] = tx + } + for _, trm := range v0.TxProcessing { + txe, ok := envelopes[trm.Result.TransactionHash] + if !ok { + return errors.New("unknown tx hash in LedgerCloseMeta") + } + lcm.TransactionEnvelope = append(lcm.TransactionEnvelope, txe) + lcm.TransactionResult = append(lcm.TransactionResult, trm.Result) + lcm.TransactionMeta = append(lcm.TransactionMeta, trm.TxApplyProcessing) + lcm.TransactionFeeChanges = append(lcm.TransactionFeeChanges, trm.FeeProcessing) + } + for _, urm := range v0.UpgradesProcessing { + lcm.UpgradesMeta = append(lcm.UpgradesMeta, urm.Changes) + } + return nil +} + +func (c *captiveStellarCore) openOfflineReplaySubprocess(nextLedger, lastLedger uint32) error { + c.Close() + maxLedger, e := c.GetLatestLedgerSequence() + if e != nil { + return errors.Wrap(e, "getting latest ledger sequence") + } + if nextLedger > maxLedger { + err := errors.Errorf("sequence %d greater than max available %d", + nextLedger, maxLedger) + return err + } + if lastLedger > maxLedger { + lastLedger = maxLedger + } + rangeArg := fmt.Sprintf("%d/%d", lastLedger, (lastLedger-nextLedger)+1) + args := []string{"--conf", c.getConfFileName(), "catchup", rangeArg, + "--replay-in-memory"} + cmd := exec.Command("stellar-core", args...) + cmd.Dir = c.getTmpDir() + cmd.Stdout = c.getLogLineWriter() + cmd.Stderr = cmd.Stdout + c.cmd = cmd + e = c.start() + if e != nil { + err := errors.Wrap(e, "starting stellar-core subprocess") + return err + } + // The next ledger should be the first ledger of the checkpoint containing + // the requested ledger + c.nextLedgerMutex.Lock() + c.nextLedger = roundDownToFirstReplayAfterCheckpointStart(nextLedger) + c.nextLedgerMutex.Unlock() + c.lastLedger = &lastLedger + return nil +} + +func (c *captiveStellarCore) PrepareRange(from uint32, to uint32) error { + // `from-1` here because being able to read ledger `from-1` is a confirmation + // that the range is ready. This effectively makes getting ledger #1 impossible. + // TODO: should be replaced with by a tee reader with buffer or similar in the + // later stage of development. + if e := c.openOfflineReplaySubprocess(from-1, to); e != nil { + return errors.Wrap(e, "opening subprocess") + } + + if c.metaPipe == nil { + return errors.New("missing metadata pipe") + } + + _, _, err := c.GetLedger(from - 1) + if err != nil { + return errors.Wrap(err, "opening getting ledger `from-1`") + } + + return nil +} + +// We assume that we'll be called repeatedly asking for ledgers in ascending +// order, so when asked for ledger 23 we start a subprocess doing catchup +// "100023/100000", which should replay 23, 24, 25, ... 100023. The wrinkle in +// this is that core will actually replay from the _checkpoint before_ +// the implicit start ledger, so we might need to skip a few ledgers until +// we hit the one requested (this routine does so transparently if needed). +func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) { + // First, if we're open but out of range for the request, close. + if !c.IsClosed() && !c.LedgerWithinCheckpoints(sequence, numCheckpointsLeeway) { + c.Close() + } + + // Next, if we're closed, open. + if c.IsClosed() { + if e := c.openOfflineReplaySubprocess(sequence, sequence+ledgersPerProcess); e != nil { + return false, LedgerCloseMeta{}, errors.Wrap(e, "opening subprocess") + } + } + + // Check that we're where we expect to be: in range ... + if !c.LedgerWithinCheckpoints(sequence, 1) { + return false, LedgerCloseMeta{}, errors.New("unexpected subprocess next-ledger") + } + + // ... and open + if c.metaPipe == nil { + return false, LedgerCloseMeta{}, errors.New("missing metadata pipe") + } + + // Now loop along the range until we find the ledger we want. + var errOut error + for { + var xlcm xdr.LedgerCloseMeta + _, e0 := unmarshalFramed(c.metaPipe, &xlcm) + if e0 != nil { + if e0 == io.EOF { + errOut = errors.Wrap(e0, "got EOF from subprocess") + break + } else { + errOut = errors.Wrap(e0, "unmarshalling framed LedgerCloseMeta") + break + } + } + seq, e1 := peekLedgerSequence(&xlcm) + if e1 != nil { + errOut = e1 + break + } + c.nextLedgerMutex.Lock() + if seq != c.nextLedger { + // We got something unexpected; close and reset + errOut = errors.Errorf("unexpected ledger %d", seq) + c.nextLedgerMutex.Unlock() + break + } + c.nextLedger++ + c.nextLedgerMutex.Unlock() + if seq == sequence { + // Found the requested seq + var lcm LedgerCloseMeta + e2 := c.copyLedgerCloseMeta(&xlcm, &lcm) + if e2 != nil { + errOut = e2 + break + } + // If we got the _last_ ledger in a segment, close before returning. + if c.lastLedger != nil && *c.lastLedger == seq { + c.Close() + } + return true, lcm, nil + } + } + // All paths above that break out of the loop (instead of return) + // set e to non-nil: there was an error and we should close and + // reset state before retuning an error to our caller. + c.Close() + return false, LedgerCloseMeta{}, errOut +} + +func (c *captiveStellarCore) GetLatestLedgerSequence() (uint32, error) { + archive, e := historyarchive.Connect( + c.historyURLs[0], + historyarchive.ConnectOptions{}, + ) + if e != nil { + return 0, e + } + has, e := archive.GetRootHAS() + if e != nil { + return 0, e + } + return has.CurrentLedger, nil +} + +// LedgerWithinCheckpoints returns true if a given ledger is after the next ledger to be read +// from a given subprocess (so ledger will be read eventually) and no more +// than numCheckpoints checkpoints ahead of the next ledger to be read +// (so it will not be too long before ledger is read). +func (c *captiveStellarCore) LedgerWithinCheckpoints(ledger uint32, numCheckpoints uint32) bool { + return ((c.nextLedger <= ledger) && + (ledger <= (c.nextLedger + (numCheckpoints * ledgersPerCheckpoint)))) +} + +func (c *captiveStellarCore) IsClosed() bool { + c.nextLedgerMutex.Lock() + defer c.nextLedgerMutex.Unlock() + return c.nextLedger == 0 +} + +func (c *captiveStellarCore) Close() error { + if c.IsClosed() { + return nil + } + c.nextLedgerMutex.Lock() + c.nextLedger = 0 + c.nextLedgerMutex.Unlock() + + c.lastLedger = nil + var e1, e2 error + if c.metaPipe != nil { + c.metaPipe = nil + } + if c.processIsAlive() { + e1 = c.cmd.Process.Kill() + c.cmd.Wait() + c.cmd = nil + } + e2 = os.RemoveAll(c.getTmpDir()) + if e1 != nil { + return errors.Wrap(e1, "error killing subprocess") + } + if e2 != nil { + return errors.Wrap(e2, "error removing subprocess tmpdir") + } + return nil +} + +func (c *captiveStellarCore) getTmpDir() string { + return filepath.Join(os.TempDir(), c.nonce) +} + +func (c *captiveStellarCore) getConfFileName() string { + return filepath.Join(c.getTmpDir(), "stellar-core.conf") +} + +func (c *captiveStellarCore) getConf() string { + lines := []string{ + "# Generated file -- do not edit", + "RUN_STANDALONE=true", + "NODE_IS_VALIDATOR=false", + "DISABLE_XDR_FSYNC=true", + "UNSAFE_QUORUM=true", + fmt.Sprintf(`NETWORK_PASSPHRASE="%s"`, c.networkPassphrase), + fmt.Sprintf(`BUCKET_DIR_PATH="%s"`, filepath.Join(c.getTmpDir(), "buckets")), + fmt.Sprintf(`METADATA_OUTPUT_STREAM="%s"`, c.getPipeName()), + } + for i, val := range c.historyURLs { + lines = append(lines, fmt.Sprintf("[HISTORY.h%d]", i)) + lines = append(lines, fmt.Sprintf(`get="curl -sf %s/{0} -o {1}"`, val)) + } + // Add a fictional quorum -- necessary to convince core to start up; + // but not used at all for our purposes. Pubkey here is just random. + lines = append(lines, + "[QUORUM_SET]", + "THRESHOLD_PERCENT=100", + `VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]`) + return strings.ReplaceAll(strings.Join(lines, "\n"), "\\", "\\\\") +} + +func (c *captiveStellarCore) getLogLineWriter() io.Writer { + r, w := io.Pipe() + br := bufio.NewReader(r) + // Strip timestamps from log lines from captive stellar-core. We emit our own. + dateRx := regexp.MustCompile("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3} ") + go func() { + for { + line, e := br.ReadString('\n') + if e != nil { + break + } + line = dateRx.ReplaceAllString(line, "") + // Leaving for debug purposes: + // fmt.Print(line) + } + }() + return w +} + +// Makes the temp directory and writes the config file to it; called by the +// platform-specific captiveStellarCore.Start() methods. +func (c *captiveStellarCore) writeConf() error { + dir := c.getTmpDir() + e := os.MkdirAll(dir, 0755) + if e != nil { + return errors.Wrap(e, "error creating subprocess tmpdir") + } + conf := c.getConf() + return ioutil.WriteFile(c.getConfFileName(), []byte(conf), 0644) +} diff --git a/exp/ingest/ledgerbackend/captive_core_backend_posix.go b/exp/ingest/ledgerbackend/captive_core_backend_posix.go new file mode 100644 index 0000000000..e85a531f7b --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend_posix.go @@ -0,0 +1,66 @@ +// +build !windows + +package ledgerbackend + +import ( + "bufio" + "os" + "syscall" + + "github.com/pkg/errors" +) + +// Posix-specific methods for the captiveStellarCore type. + +func (c *captiveStellarCore) getPipeName() string { + // The exec.Cmd.ExtraFiles field carries *io.File values that are assigned + // to child process fds counting from 3, and we'll be passing exactly one + // fd: the write end of the anonymous pipe below. + return "fd:3" +} + +// Starts the subprocess and sets the c.metaPipe field +func (c *captiveStellarCore) start() error { + + // First make an anonymous pipe. + // Note io.File objects close-on-finalization. + readFile, writeFile, e := os.Pipe() + if e != nil { + return errors.Wrap(e, "error making a pipe") + } + + defer writeFile.Close() + + // Then write config file pointing to it. + e = c.writeConf() + if e != nil { + return errors.Wrap(e, "error writing conf") + } + + // Add the write-end to the set of inherited file handles. This is defined + // to be fd 3 on posix platforms. + c.cmd.ExtraFiles = []*os.File{writeFile} + e = c.cmd.Start() + if e != nil { + return errors.Wrap(e, "error starting stellar-core") + } + + // Launch a goroutine to reap immediately on exit (I think this is right, + // as we do not want zombies and we might abruptly forget / kill / close + // the process, but I'm not certain). + cmd := c.cmd + go cmd.Wait() + + c.metaPipe = bufio.NewReaderSize(readFile, 1024*1024) + return nil +} + +func (c *captiveStellarCore) processIsAlive() bool { + if c.cmd == nil { + return false + } + if c.cmd.Process == nil { + return false + } + return c.cmd.Process.Signal(syscall.Signal(0)) == nil +} diff --git a/exp/ingest/ledgerbackend/captive_core_backend_test.go b/exp/ingest/ledgerbackend/captive_core_backend_test.go new file mode 100644 index 0000000000..c68f838e8a --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend_test.go @@ -0,0 +1,28 @@ +package ledgerbackend + +import ( + "testing" + + "github.com/stellar/go/support/log" + logpkg "github.com/stellar/go/support/log" + "github.com/stretchr/testify/assert" +) + +// TODO: test frame decoding +// TODO: test from static base64-encoded data + +func TestCaptiveCore(t *testing.T) { + log.SetLevel(logpkg.InfoLevel) + c := NewCaptive("Public Global Stellar Network ; September 2015", + []string{"http://history.stellar.org/prd/core-live/core_live_001"}) + seq, e := c.GetLatestLedgerSequence() + assert.NoError(t, e) + assert.Greater(t, seq, uint32(0)) + ok, lcm, e := c.GetLedger(seq - 200) + assert.NoError(t, e) + assert.Equal(t, true, ok) + assert.Equal(t, uint32(lcm.LedgerHeader.Header.LedgerSeq), seq-200) + assert.DirExists(t, c.getTmpDir()) + e = c.Close() + assert.NoError(t, e) +} diff --git a/exp/ingest/ledgerbackend/captive_core_backend_windows.go b/exp/ingest/ledgerbackend/captive_core_backend_windows.go new file mode 100644 index 0000000000..f19d97921f --- /dev/null +++ b/exp/ingest/ledgerbackend/captive_core_backend_windows.go @@ -0,0 +1,67 @@ +// +build windows + +package ledgerbackend + +import ( + "bufio" + "fmt" + "github.com/Microsoft/go-winio" + "os" +) + +// Windows-specific methods for the captiveStellarCore type. + +func (c *captiveStellarCore) getPipeName() string { + return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) +} + +func (c *captiveStellarCore) start() error { + // First set up the server pipe. + listener, e := winio.ListenPipe(c.getPipeName(), nil) + if e != nil { + return e + } + + // Then write config file pointing to it. + e = c.writeConf() + if e != nil { + return e + } + + // Then start the process. + e = c.cmd.Start() + if e != nil { + return e + } + + // Launch a goroutine to reap immediately on exit (I think this is right, + // as we do not want zombies and we might abruptly forget / kill / close + // the process, but I'm not certain). + cmd := c.cmd + go func() { + cmd.Wait() + }() + + // Then accept on the server end. + connection, e := listener.Accept() + if e != nil { + return e + } + + c.metaPipe = bufio.NewReaderSize(connection, 1024*1024) + return nil +} + +func (c *captiveStellarCore) processIsAlive() bool { + if c.cmd == nil { + return false + } + if c.cmd.Process == nil { + return false + } + p, e := os.FindProcess(c.cmd.Process.Pid) + if e != nil || p == nil { + return false + } + return true +} diff --git a/exp/ingest/ledgerbackend/database_backend.go b/exp/ingest/ledgerbackend/database_backend.go index 40f8d3620f..ac5059d70d 100644 --- a/exp/ingest/ledgerbackend/database_backend.go +++ b/exp/ingest/ledgerbackend/database_backend.go @@ -39,6 +39,28 @@ func NewDatabaseBackendFromSession(session *db.Session) (*DatabaseBackend, error return &DatabaseBackend{session: session}, nil } +func (dbb *DatabaseBackend) PrepareRange(from uint32, to uint32) error { + fromExists, _, err := dbb.GetLedger(from) + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + if !fromExists { + return errors.New("`from` ledger does not exist") + } + + toExists, _, err := dbb.GetLedger(to) + if err != nil { + return errors.Wrap(err, "error getting ledger") + } + + if !toExists { + return errors.New("`to` ledger does not exist") + } + + return nil +} + // GetLatestLedgerSequence returns the most recent ledger sequence number present in the database. func (dbb *DatabaseBackend) GetLatestLedgerSequence() (uint32, error) { var ledger []ledgerHeader diff --git a/exp/ingest/ledgerbackend/ledger_backend.go b/exp/ingest/ledgerbackend/ledger_backend.go index 22faf4be61..03092b0f2d 100644 --- a/exp/ingest/ledgerbackend/ledger_backend.go +++ b/exp/ingest/ledgerbackend/ledger_backend.go @@ -1,12 +1,18 @@ package ledgerbackend -import "github.com/stellar/go/xdr" +import ( + "github.com/stellar/go/xdr" +) // LedgerBackend represents the interface to a ledger data store. type LedgerBackend interface { GetLatestLedgerSequence() (sequence uint32, err error) // The first returned value is false when the ledger does not exist in a backend. GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) + // Prepares the given range (including from and to) to be loaded. Some backends + // (like captive stellar-core) need to process data before being able to stream + // ledgers. + PrepareRange(from uint32, to uint32) error Close() error } diff --git a/exp/ingest/ledgerbackend/mock_database_backend.go b/exp/ingest/ledgerbackend/mock_database_backend.go index 9a370cc972..5edd0f5b4f 100644 --- a/exp/ingest/ledgerbackend/mock_database_backend.go +++ b/exp/ingest/ledgerbackend/mock_database_backend.go @@ -15,6 +15,11 @@ func (m *MockDatabaseBackend) GetLatestLedgerSequence() (uint32, error) { return args.Get(0).(uint32), args.Error(1) } +func (m *MockDatabaseBackend) PrepareRange(from uint32, to uint32) error { + args := m.Called(from, to) + return args.Error(1) +} + func (m *MockDatabaseBackend) GetLedger(sequence uint32) (bool, LedgerCloseMeta, error) { args := m.Called(sequence) return args.Bool(0), args.Get(1).(LedgerCloseMeta), args.Error(2) diff --git a/go.mod b/go.mod index 0715db44f5..b1efe28d7d 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( firebase.google.com/go v3.12.0+incompatible github.com/BurntSushi/toml v0.3.1 github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 + github.com/Microsoft/go-winio v0.4.14 github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f // indirect github.com/asaskevich/govalidator v0.0.0-20180319081651-7d2e70ef918f github.com/aws/aws-sdk-go v1.25.25 @@ -58,7 +59,7 @@ require ( github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 - github.com/sirupsen/logrus v1.2.0 + github.com/sirupsen/logrus v1.4.1 github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 // indirect github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 // indirect github.com/spf13/cobra v0.0.0-20160830174925-9c28e4bbd74e diff --git a/go.sum b/go.sum index 09dc406dd6..76b0f86559 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 h1:PPfYWScYacO3Q6JMCLkyh6Ea2Q/REDTMgmiTAeiV8Jg= github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5/go.mod h1:xnKTFzjGUiZtiOagBsfnvomW+nJg2usB1ZpordQWqNM= +github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f h1:zvClvFQwU++UpIUBGC8YmDlfhUrweEy1R1Fj1gu5iIM= @@ -188,6 +190,8 @@ github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 h1:bUGsEnyNbVPw06B github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749/go.mod h1:ZY1cvUeJuFPAdZ/B6v7RHavJWZn2YPVFQ1OSXhCGOkg= github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 h1:WN9BUFbdyOsSH/XohnWpXOlq9NBD5sGAB2FciQMUEe8= @@ -276,6 +280,7 @@ golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c h1:+EXw7AwNOKzPFXMZ1yNjO40aWCh3PIquJB2fYlv9wcs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/services/horizon/internal/expingest/fake_ledger_backend.go b/services/horizon/internal/expingest/fake_ledger_backend.go index 3d23d4a7af..7521603ce7 100644 --- a/services/horizon/internal/expingest/fake_ledger_backend.go +++ b/services/horizon/internal/expingest/fake_ledger_backend.go @@ -16,6 +16,10 @@ func (fakeLedgerBackend) GetLatestLedgerSequence() (uint32, error) { return 1, nil } +func (fakeLedgerBackend) PrepareRange(from uint32, to uint32) error { + return nil +} + func fakeAccount() xdr.LedgerEntryChange { account := keypair.MustRandom().Address() return xdr.LedgerEntryChange{ diff --git a/services/horizon/internal/expingest/fsm.go b/services/horizon/internal/expingest/fsm.go index d976a322e6..4f7cbc0102 100644 --- a/services/horizon/internal/expingest/fsm.go +++ b/services/horizon/internal/expingest/fsm.go @@ -631,6 +631,23 @@ func (h reingestHistoryRangeState) run(s *System) (transition, error) { return stop(), errors.Errorf("invalid range: [%d, %d]", h.fromLedger, h.toLedger) } + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + }).Info("Preparing ledger backend to retrieve range") + startTime := time.Now() + + err := s.ledgerBackend.PrepareRange(h.fromLedger, h.toLedger) + if err != nil { + return stop(), errors.Wrap(err, "error preparing range") + } + + log.WithFields(logpkg.F{ + "from": h.fromLedger, + "to": h.toLedger, + "duration": time.Since(startTime).Seconds(), + }).Info("Range ready") + if h.force { if err := s.historyQ.Begin(); err != nil { return stop(), errors.Wrap(err, "Error starting a transaction") diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 7be96bcfa9..92202cf86b 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -147,11 +147,16 @@ func NewSystem(config Config) (*System, error) { coreSession := config.CoreSession.Clone() coreSession.Ctx = ctx - ledgerBackend, err := ledgerbackend.NewDatabaseBackendFromSession(coreSession) - if err != nil { - cancel() - return nil, errors.Wrap(err, "error creating ledger backend") - } + // ledgerBackend, err := ledgerbackend.NewDatabaseBackendFromSession(coreSession) + // if err != nil { + // cancel() + // return nil, errors.Wrap(err, "error creating ledger backend") + // } + + ledgerBackend := ledgerbackend.NewCaptive( + config.NetworkPassphrase, + []string{config.HistoryArchiveURL}, + ) historyQ := &history.Q{config.HistorySession.Clone()} historyQ.Ctx = ctx From 1afa3b9241fab37a18740367376f12b32ebd96aa Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 21 May 2020 17:46:02 +0200 Subject: [PATCH 2/7] services/horizon/cmd: Add configuration for captive core ingestion (#2609) * Add configuration for captive core ingestion * Use captive core path configuration when initializing ingestion system --- .../ledgerbackend/captive_core_backend.go | 6 +++-- .../captive_core_backend_test.go | 7 ++++-- go.list | 3 ++- services/horizon/cmd/db.go | 3 +++ services/horizon/cmd/ingest.go | 6 +++++ services/horizon/cmd/root.go | 22 ++++++++++++++++ services/horizon/docker/Dockerfile | 9 ++++++- services/horizon/internal/config.go | 14 ++++++----- services/horizon/internal/expingest/main.go | 25 +++++++++++-------- 9 files changed, 73 insertions(+), 22 deletions(-) diff --git a/exp/ingest/ledgerbackend/captive_core_backend.go b/exp/ingest/ledgerbackend/captive_core_backend.go index ee77fd563b..4bc858cf30 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend.go +++ b/exp/ingest/ledgerbackend/captive_core_backend.go @@ -61,6 +61,7 @@ type captiveStellarCore struct { historyURLs []string lastLedger *uint32 // end of current segment if offline, nil if online cmd *exec.Cmd + executablePath string metaPipe io.Reader nextLedgerMutex sync.Mutex @@ -72,11 +73,12 @@ type captiveStellarCore struct { // and restart the subprocess if subsequent calls to .GetLedger() are discontiguous. // // Platform-specific pipe setup logic is in the .start() methods. -func NewCaptive(networkPassphrase string, historyURLs []string) *captiveStellarCore { +func NewCaptive(executablePath, networkPassphrase string, historyURLs []string) *captiveStellarCore { r := rand.New(rand.NewSource(time.Now().UnixNano())) return &captiveStellarCore{ nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()), networkPassphrase: networkPassphrase, + executablePath: executablePath, historyURLs: historyURLs, nextLedger: 0, } @@ -188,7 +190,7 @@ func (c *captiveStellarCore) openOfflineReplaySubprocess(nextLedger, lastLedger rangeArg := fmt.Sprintf("%d/%d", lastLedger, (lastLedger-nextLedger)+1) args := []string{"--conf", c.getConfFileName(), "catchup", rangeArg, "--replay-in-memory"} - cmd := exec.Command("stellar-core", args...) + cmd := exec.Command(c.executablePath, args...) cmd.Dir = c.getTmpDir() cmd.Stdout = c.getLogLineWriter() cmd.Stderr = cmd.Stdout diff --git a/exp/ingest/ledgerbackend/captive_core_backend_test.go b/exp/ingest/ledgerbackend/captive_core_backend_test.go index c68f838e8a..82a442a259 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend_test.go +++ b/exp/ingest/ledgerbackend/captive_core_backend_test.go @@ -13,8 +13,11 @@ import ( func TestCaptiveCore(t *testing.T) { log.SetLevel(logpkg.InfoLevel) - c := NewCaptive("Public Global Stellar Network ; September 2015", - []string{"http://history.stellar.org/prd/core-live/core_live_001"}) + c := NewCaptive( + "stellar-core", + "Public Global Stellar Network ; September 2015", + []string{"http://history.stellar.org/prd/core-live/core_live_001"}, + ) seq, e := c.GetLatestLedgerSequence() assert.NoError(t, e) assert.Greater(t, seq, uint32(0)) diff --git a/go.list b/go.list index c9db790501..8778b5bac4 100644 --- a/go.list +++ b/go.list @@ -4,6 +4,7 @@ cloud.google.com/go v0.34.0 firebase.google.com/go v3.12.0+incompatible github.com/BurntSushi/toml v0.3.1 github.com/Masterminds/squirrel v0.0.0-20161115235646-20f192218cf5 +github.com/Microsoft/go-winio v0.4.14 github.com/Shopify/sarama v1.19.0 github.com/Shopify/toxiproxy v2.1.4+incompatible github.com/ajg/form v0.0.0-20160822230020-523a5da1a92f @@ -98,7 +99,7 @@ github.com/sebest/xff v0.0.0-20150611211316-7a36e3a787b5 github.com/segmentio/go-loggly v0.5.1-0.20171222203950-eb91657e62b2 github.com/sergi/go-diff v0.0.0-20161205080420-83532ca1c1ca github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 -github.com/sirupsen/logrus v1.2.0 +github.com/sirupsen/logrus v1.4.1 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337 github.com/spf13/cast v0.0.0-20150508191742-4d07383ffe94 diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index eb658645b7..55dd716db1 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -179,6 +179,9 @@ var dbReingestRangeCmd = &cobra.Command{ HistoryArchiveURL: config.HistoryArchiveURLs[0], IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 0d8cc8a32c..819ebdc096 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -110,6 +110,9 @@ var ingestVerifyRangeCmd = &cobra.Command{ OrderBookGraph: orderbook.NewOrderBookGraph(), IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { @@ -188,6 +191,9 @@ var ingestStressTestCmd = &cobra.Command{ OrderBookGraph: orderbook.NewOrderBookGraph(), IngestFailedTransactions: config.IngestFailedTransactions, } + if config.EnableCaptiveCoreIngestion { + ingestConfig.StellarCorePath = config.StellarCoreBinaryPath + } system, err := expingest.NewSystem(ingestConfig) if err != nil { diff --git a/services/horizon/cmd/root.go b/services/horizon/cmd/root.go index 90f987177a..fe95b37d92 100644 --- a/services/horizon/cmd/root.go +++ b/services/horizon/cmd/root.go @@ -108,6 +108,24 @@ var dbURLConfigOption = &support.ConfigOption{ // Add a new entry here to connect a new field in the horizon.Config struct var configOpts = support.ConfigOptions{ dbURLConfigOption, + &support.ConfigOption{ + Name: "stellar-core-binary-path", + EnvVar: "STELLAR_CORE_BINARY_PATH", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to stellar core binary", + ConfigKey: &config.StellarCoreBinaryPath, + }, + &support.ConfigOption{ + Name: "enable-captive-core-ingestion", + EnvVar: "ENABLE_CAPTIVE_CORE_INGESTION", + OptType: types.Bool, + FlagDefault: false, + Required: false, + Usage: "[experimental flag!] causes Horizon to ingest from a Stellar Core subprocess instead of a persistent Stellar Core database", + ConfigKey: &config.EnableCaptiveCoreIngestion, + }, &support.ConfigOption{ Name: "stellar-core-db-url", EnvVar: "STELLAR_CORE_DATABASE_URL", @@ -404,6 +422,10 @@ func initRootConfig() { stdLog.Fatalf("--history-archive-urls must be set when --ingest is set") } + if config.EnableCaptiveCoreIngestion && config.StellarCoreBinaryPath == "" { + stdLog.Fatalf("--stellar-core-binary-path must be set when --enable-captive-core-ingestion is set") + } + // Configure log file if config.LogFile != "" { logFile, err := os.OpenFile(config.LogFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) diff --git a/services/horizon/docker/Dockerfile b/services/horizon/docker/Dockerfile index 192fcd8d86..ba98718d7a 100644 --- a/services/horizon/docker/Dockerfile +++ b/services/horizon/docker/Dockerfile @@ -8,8 +8,15 @@ RUN go install github.com/stellar/go/services/horizon FROM ubuntu:16.04 +ENV STELLAR_CORE_VERSION 13.0.0-1220-9ed3da29 +ENV STELLAR_CORE_BINARY_PATH /usr/local/bin/stellar-core + # ca-certificates are required to make tls connections -RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates +RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget + +RUN wget -O stellar-core.deb https://s3.amazonaws.com/stellar.org/releases/stellar-core/stellar-core-${STELLAR_CORE_VERSION}_amd64.deb +RUN dpkg -i stellar-core.deb +RUN rm stellar-core.deb COPY --from=builder /go/bin/horizon ./ diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 334504b54f..74f6fa8222 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -11,12 +11,14 @@ import ( // Config is the configuration for horizon. It gets populated by the // app's main function and is provided to NewApp. type Config struct { - DatabaseURL string - StellarCoreDatabaseURL string - StellarCoreURL string - HistoryArchiveURLs []string - Port uint - AdminPort uint + DatabaseURL string + StellarCoreBinaryPath string + StellarCoreDatabaseURL string + StellarCoreURL string + EnableCaptiveCoreIngestion bool + HistoryArchiveURLs []string + Port uint + AdminPort uint // MaxDBConnections has a priority over all 4 values below. MaxDBConnections int diff --git a/services/horizon/internal/expingest/main.go b/services/horizon/internal/expingest/main.go index 92202cf86b..38a17e7265 100644 --- a/services/horizon/internal/expingest/main.go +++ b/services/horizon/internal/expingest/main.go @@ -57,6 +57,7 @@ type Config struct { CoreSession *db.Session StellarCoreURL string StellarCoreCursor string + StellarCorePath string NetworkPassphrase string HistorySession *db.Session @@ -147,16 +148,20 @@ func NewSystem(config Config) (*System, error) { coreSession := config.CoreSession.Clone() coreSession.Ctx = ctx - // ledgerBackend, err := ledgerbackend.NewDatabaseBackendFromSession(coreSession) - // if err != nil { - // cancel() - // return nil, errors.Wrap(err, "error creating ledger backend") - // } - - ledgerBackend := ledgerbackend.NewCaptive( - config.NetworkPassphrase, - []string{config.HistoryArchiveURL}, - ) + var ledgerBackend ledgerbackend.LedgerBackend + if len(config.StellarCorePath) > 0 { + ledgerBackend = ledgerbackend.NewCaptive( + config.StellarCorePath, + config.NetworkPassphrase, + []string{config.HistoryArchiveURL}, + ) + } else { + ledgerBackend, err = ledgerbackend.NewDatabaseBackendFromSession(coreSession) + if err != nil { + cancel() + return nil, errors.Wrap(err, "error creating ledger backend") + } + } historyQ := &history.Q{config.HistorySession.Clone()} historyQ.Ctx = ctx From 05bc2caec65bc795559b70ee1c25afe398148bc1 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Fri, 29 May 2020 16:33:55 +0200 Subject: [PATCH 3/7] exp/ingest/ledgerbackend: Add captiveStellarCore tests (#2625) This commit refactors `captiveStellarCore` ledger backend and adds tests. `stellarCoreRunner` has been extracted from `captiveStellarCore` to allow mocking stellar-core runs. This makes it possible to write tests that do not execute `stellar-core` binary. Functions marshalling and unmarshalling "framed" XDR have been moved to `xdr` package. --- .../io/single_ledger_state_reader_test.go | 10 +- .../ledgerbackend/captive_core_backend.go | 163 +++--------------- .../captive_core_backend_test.go | 143 +++++++++++++-- .../ledgerbackend/stellar_core_runner.go | 140 +++++++++++++++ ..._posix.go => stellar_core_runner_posix.go} | 9 +- ...dows.go => stellar_core_runner_windows.go} | 9 +- .../ingest_history_range_state_test.go | 10 +- .../horizon/internal/expingest/main_test.go | 24 +++ support/historyarchive/verify.go | 2 +- support/historyarchive/xdrstream.go | 20 --- support/historyarchive/xdrstream_test.go | 8 +- xdr/main.go | 53 ++++++ 12 files changed, 391 insertions(+), 200 deletions(-) create mode 100644 exp/ingest/ledgerbackend/stellar_core_runner.go rename exp/ingest/ledgerbackend/{captive_core_backend_posix.go => stellar_core_runner_posix.go} (86%) rename exp/ingest/ledgerbackend/{captive_core_backend_windows.go => stellar_core_runner_windows.go} (87%) diff --git a/exp/ingest/io/single_ledger_state_reader_test.go b/exp/ingest/io/single_ledger_state_reader_test.go index 3ca82be580..25f0209883 100644 --- a/exp/ingest/io/single_ledger_state_reader_test.go +++ b/exp/ingest/io/single_ledger_state_reader_test.go @@ -607,7 +607,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsWithDiscard() { secondEntry := metaEntry(2) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -643,7 +643,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetryFailsWithDiscardError() { firstEntry := metaEntry(1) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -679,7 +679,7 @@ func (s *ReadBucketEntryTestSuite) TestReadEntryRetrySucceedsAfterDiscardError() secondEntry := metaEntry(2) b := &bytes.Buffer{} - s.Require().NoError(historyarchive.WriteFramedXdr(b, firstEntry)) + s.Require().NoError(xdr.MarshalFramed(b, firstEntry)) writeInvalidFrame(b) s.mockArchive. @@ -765,7 +765,7 @@ func createInvalidXdrStream(closeError error) *historyarchive.XdrStream { func writeInvalidFrame(b *bytes.Buffer) { bufferSize := b.Len() - err := historyarchive.WriteFramedXdr(b, metaEntry(1)) + err := xdr.MarshalFramed(b, metaEntry(1)) if err != nil { panic(err) } @@ -776,7 +776,7 @@ func writeInvalidFrame(b *bytes.Buffer) { func createXdrStream(entries ...xdr.BucketEntry) *historyarchive.XdrStream { b := &bytes.Buffer{} for _, e := range entries { - err := historyarchive.WriteFramedXdr(b, e) + err := xdr.MarshalFramed(b, e) if err != nil { panic(err) } diff --git a/exp/ingest/ledgerbackend/captive_core_backend.go b/exp/ingest/ledgerbackend/captive_core_backend.go index 4bc858cf30..e08d975928 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend.go +++ b/exp/ingest/ledgerbackend/captive_core_backend.go @@ -1,18 +1,8 @@ package ledgerbackend import ( - "bufio" - "fmt" "io" - "io/ioutil" - "math/rand" - "os" - "os/exec" - "path/filepath" - "regexp" - "strings" "sync" - "time" "github.com/pkg/errors" "github.com/stellar/go/network" @@ -56,13 +46,11 @@ func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { } type captiveStellarCore struct { - nonce string networkPassphrase string historyURLs []string lastLedger *uint32 // end of current segment if offline, nil if online - cmd *exec.Cmd - executablePath string - metaPipe io.Reader + + stellarCoreRunner stellarCoreRunnerInterface nextLedgerMutex sync.Mutex nextLedger uint32 // next ledger expected, error w/ restart if not seen @@ -74,13 +62,15 @@ type captiveStellarCore struct { // // Platform-specific pipe setup logic is in the .start() methods. func NewCaptive(executablePath, networkPassphrase string, historyURLs []string) *captiveStellarCore { - r := rand.New(rand.NewSource(time.Now().UnixNano())) return &captiveStellarCore{ - nonce: fmt.Sprintf("captive-stellar-core-%x", r.Uint64()), networkPassphrase: networkPassphrase, - executablePath: executablePath, historyURLs: historyURLs, nextLedger: 0, + stellarCoreRunner: &stellarCoreRunner{ + executablePath: executablePath, + networkPassphrase: networkPassphrase, + historyURLs: historyURLs, + }, } } @@ -98,37 +88,6 @@ func (c *captiveStellarCore) IsInOnlineTrackingMode() bool { return c.lastLedger == nil } -// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte -// big-endian length header that has the high bit set, followed by that length worth of -// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. -func unmarshalFramed(r io.Reader, v interface{}) (int, error) { - var frameLen uint32 - n, e := xdr.Unmarshal(r, &frameLen) - if e != nil { - err := errors.Wrap(e, "unmarshalling XDR frame header") - return n, err - } - if n != 4 { - err := errors.New("bad length of XDR frame header") - return n, err - } - if (frameLen & 0x80000000) != 0x80000000 { - err := errors.New("malformed XDR frame header") - return n, err - } - frameLen &= 0x7fffffff - m, e := xdr.Unmarshal(r, v) - if e != nil { - err := errors.Wrap(e, "unmarshalling framed XDR") - return n + m, err - } - if int64(m) != int64(frameLen) { - err := errors.New("bad length of XDR frame body") - return n + m, err - } - return m + n, nil -} - // Returns the sequence number of an LCM, returning an error if the LCM is of // an unknown version. func peekLedgerSequence(xlcm *xdr.LedgerCloseMeta) (uint32, error) { @@ -187,19 +146,12 @@ func (c *captiveStellarCore) openOfflineReplaySubprocess(nextLedger, lastLedger if lastLedger > maxLedger { lastLedger = maxLedger } - rangeArg := fmt.Sprintf("%d/%d", lastLedger, (lastLedger-nextLedger)+1) - args := []string{"--conf", c.getConfFileName(), "catchup", rangeArg, - "--replay-in-memory"} - cmd := exec.Command(c.executablePath, args...) - cmd.Dir = c.getTmpDir() - cmd.Stdout = c.getLogLineWriter() - cmd.Stderr = cmd.Stdout - c.cmd = cmd - e = c.start() - if e != nil { - err := errors.Wrap(e, "starting stellar-core subprocess") - return err + + err := c.stellarCoreRunner.run(nextLedger, lastLedger) + if err != nil { + return errors.Wrap(err, "error running stellar-core") } + // The next ledger should be the first ledger of the checkpoint containing // the requested ledger c.nextLedgerMutex.Lock() @@ -218,7 +170,7 @@ func (c *captiveStellarCore) PrepareRange(from uint32, to uint32) error { return errors.Wrap(e, "opening subprocess") } - if c.metaPipe == nil { + if c.stellarCoreRunner.getMetaPipe() == nil { return errors.New("missing metadata pipe") } @@ -255,7 +207,8 @@ func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, } // ... and open - if c.metaPipe == nil { + metaPipe := c.stellarCoreRunner.getMetaPipe() + if metaPipe == nil { return false, LedgerCloseMeta{}, errors.New("missing metadata pipe") } @@ -263,7 +216,7 @@ func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, var errOut error for { var xlcm xdr.LedgerCloseMeta - _, e0 := unmarshalFramed(c.metaPipe, &xlcm) + _, e0 := xdr.UnmarshalFramed(metaPipe, &xlcm) if e0 != nil { if e0 == io.EOF { errOut = errors.Wrap(e0, "got EOF from subprocess") @@ -281,7 +234,7 @@ func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, c.nextLedgerMutex.Lock() if seq != c.nextLedger { // We got something unexpected; close and reset - errOut = errors.Errorf("unexpected ledger %d", seq) + errOut = errors.Errorf("unexpected ledger (expected=%d actual=%d)", c.nextLedger, seq) c.nextLedgerMutex.Unlock() break } @@ -348,84 +301,10 @@ func (c *captiveStellarCore) Close() error { c.nextLedgerMutex.Unlock() c.lastLedger = nil - var e1, e2 error - if c.metaPipe != nil { - c.metaPipe = nil - } - if c.processIsAlive() { - e1 = c.cmd.Process.Kill() - c.cmd.Wait() - c.cmd = nil - } - e2 = os.RemoveAll(c.getTmpDir()) - if e1 != nil { - return errors.Wrap(e1, "error killing subprocess") - } - if e2 != nil { - return errors.Wrap(e2, "error removing subprocess tmpdir") - } - return nil -} - -func (c *captiveStellarCore) getTmpDir() string { - return filepath.Join(os.TempDir(), c.nonce) -} - -func (c *captiveStellarCore) getConfFileName() string { - return filepath.Join(c.getTmpDir(), "stellar-core.conf") -} - -func (c *captiveStellarCore) getConf() string { - lines := []string{ - "# Generated file -- do not edit", - "RUN_STANDALONE=true", - "NODE_IS_VALIDATOR=false", - "DISABLE_XDR_FSYNC=true", - "UNSAFE_QUORUM=true", - fmt.Sprintf(`NETWORK_PASSPHRASE="%s"`, c.networkPassphrase), - fmt.Sprintf(`BUCKET_DIR_PATH="%s"`, filepath.Join(c.getTmpDir(), "buckets")), - fmt.Sprintf(`METADATA_OUTPUT_STREAM="%s"`, c.getPipeName()), - } - for i, val := range c.historyURLs { - lines = append(lines, fmt.Sprintf("[HISTORY.h%d]", i)) - lines = append(lines, fmt.Sprintf(`get="curl -sf %s/{0} -o {1}"`, val)) - } - // Add a fictional quorum -- necessary to convince core to start up; - // but not used at all for our purposes. Pubkey here is just random. - lines = append(lines, - "[QUORUM_SET]", - "THRESHOLD_PERCENT=100", - `VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]`) - return strings.ReplaceAll(strings.Join(lines, "\n"), "\\", "\\\\") -} -func (c *captiveStellarCore) getLogLineWriter() io.Writer { - r, w := io.Pipe() - br := bufio.NewReader(r) - // Strip timestamps from log lines from captive stellar-core. We emit our own. - dateRx := regexp.MustCompile("^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3} ") - go func() { - for { - line, e := br.ReadString('\n') - if e != nil { - break - } - line = dateRx.ReplaceAllString(line, "") - // Leaving for debug purposes: - // fmt.Print(line) - } - }() - return w -} - -// Makes the temp directory and writes the config file to it; called by the -// platform-specific captiveStellarCore.Start() methods. -func (c *captiveStellarCore) writeConf() error { - dir := c.getTmpDir() - e := os.MkdirAll(dir, 0755) - if e != nil { - return errors.Wrap(e, "error creating subprocess tmpdir") + err := c.stellarCoreRunner.close() + if err != nil { + return errors.Wrap(err, "error closing stellar-core subprocess") } - conf := c.getConf() - return ioutil.WriteFile(c.getConfFileName(), []byte(conf), 0644) + return nil } diff --git a/exp/ingest/ledgerbackend/captive_core_backend_test.go b/exp/ingest/ledgerbackend/captive_core_backend_test.go index 82a442a259..5e66b09ed7 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend_test.go +++ b/exp/ingest/ledgerbackend/captive_core_backend_test.go @@ -1,31 +1,138 @@ package ledgerbackend import ( + "bytes" + "encoding/hex" + "io" "testing" - "github.com/stellar/go/support/log" - logpkg "github.com/stellar/go/support/log" + "github.com/stellar/go/network" + "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) // TODO: test frame decoding // TODO: test from static base64-encoded data -func TestCaptiveCore(t *testing.T) { - log.SetLevel(logpkg.InfoLevel) - c := NewCaptive( - "stellar-core", - "Public Global Stellar Network ; September 2015", - []string{"http://history.stellar.org/prd/core-live/core_live_001"}, +type stellarCoreRunnerMock struct { + mock.Mock +} + +func (m *stellarCoreRunnerMock) run(from, to uint32) error { + a := m.Called(from, to) + return a.Error(0) +} + +func (m *stellarCoreRunnerMock) getMetaPipe() io.Reader { + a := m.Called() + return a.Get(0).(io.Reader) +} + +func (m *stellarCoreRunnerMock) close() error { + a := m.Called() + return a.Error(0) +} + +func writeLedgerHeader(w io.Writer, sequence uint32) error { + opResults := []xdr.OperationResult{} + opMeta := []xdr.OperationMeta{} + + tmpHash, _ := hex.DecodeString("cde54da3901f5b9c0331d24fbb06ac9c5c5de76de9fb2d4a7b86c09e46f11d8c") + var hash [32]byte + copy(hash[:], tmpHash) + + ledgerCloseMeta := xdr.LedgerCloseMeta{ + V: 0, + V0: &xdr.LedgerCloseMetaV0{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxSet: xdr.TransactionSet{ + Txs: []xdr.TransactionEnvelope{ + { + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAccountAddress("GAEJJMDDCRYF752PKIJICUVL7MROJBNXDV2ZB455T7BAFHU2LCLSE2LW"), + Fee: xdr.Uint32(sequence), + }, + }, + }, + }, + }, + TxProcessing: []xdr.TransactionResultMeta{ + { + Result: xdr.TransactionResultPair{ + TransactionHash: xdr.Hash(hash), + Result: xdr.TransactionResult{ + FeeCharged: xdr.Int64(sequence), + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + Results: &opResults, + }, + }, + }, + TxApplyProcessing: xdr.TransactionMeta{ + Operations: &opMeta, + }, + }, + }, + }, + } + + return xdr.MarshalFramed(w, ledgerCloseMeta) +} + +func TestCaptiveNew(t *testing.T) { + executablePath := "/etc/stellar-core" + networkPassphrase := network.PublicNetworkPassphrase + historyURLs := []string{"http://history.stellar.org/prd/core-live/core_live_001"} + + captiveStellarCore := NewCaptive( + executablePath, + networkPassphrase, + historyURLs, ) - seq, e := c.GetLatestLedgerSequence() - assert.NoError(t, e) - assert.Greater(t, seq, uint32(0)) - ok, lcm, e := c.GetLedger(seq - 200) - assert.NoError(t, e) - assert.Equal(t, true, ok) - assert.Equal(t, uint32(lcm.LedgerHeader.Header.LedgerSeq), seq-200) - assert.DirExists(t, c.getTmpDir()) - e = c.Close() - assert.NoError(t, e) + + assert.Equal(t, networkPassphrase, captiveStellarCore.networkPassphrase) + assert.Equal(t, historyURLs, captiveStellarCore.historyURLs) + assert.Equal(t, uint32(0), captiveStellarCore.nextLedger) + + assert.Equal(t, executablePath, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).executablePath) + assert.Equal(t, networkPassphrase, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).networkPassphrase) + assert.Equal(t, historyURLs, captiveStellarCore.stellarCoreRunner.(*stellarCoreRunner).historyURLs) +} + +func TestCaptivePrepareRange(t *testing.T) { + var buf bytes.Buffer + + // Core will actually start with the last checkpoint before the from ledger + // and then rewind to the `from` ledger. + for i := 64; i <= 99; i++ { + err := writeLedgerHeader(&buf, uint32(i)) + require.NoError(t, err) + } + + mockRunner := &stellarCoreRunnerMock{} + // We prepare [from-1, to] range because it's not possible to rewind the reader + // and there is no other way to check if stellar-core has built the state without + // reading actual ledger. + mockRunner.On("run", uint32(99), uint32(200)).Return(nil).Once() + mockRunner.On("getMetaPipe").Return(&buf) + mockRunner.On("close").Return(nil).Once() + + captiveBackend := captiveStellarCore{ + networkPassphrase: network.PublicNetworkPassphrase, + historyURLs: []string{"http://history.stellar.org/prd/core-live/core_live_001"}, + stellarCoreRunner: mockRunner, + } + + err := captiveBackend.PrepareRange(100, 200) + assert.NoError(t, err) + err = captiveBackend.Close() + assert.NoError(t, err) } diff --git a/exp/ingest/ledgerbackend/stellar_core_runner.go b/exp/ingest/ledgerbackend/stellar_core_runner.go new file mode 100644 index 0000000000..9dee16c40f --- /dev/null +++ b/exp/ingest/ledgerbackend/stellar_core_runner.go @@ -0,0 +1,140 @@ +package ledgerbackend + +import ( + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/pkg/errors" +) + +type stellarCoreRunnerInterface interface { + run(from, to uint32) error + getMetaPipe() io.Reader + close() error +} + +type stellarCoreRunner struct { + executablePath string + networkPassphrase string + historyURLs []string + + cmd *exec.Cmd + metaPipe io.Reader + tempDir string +} + +func (r *stellarCoreRunner) getConf() string { + lines := []string{ + "# Generated file -- do not edit", + "RUN_STANDALONE=true", + "NODE_IS_VALIDATOR=false", + "DISABLE_XDR_FSYNC=true", + "UNSAFE_QUORUM=true", + fmt.Sprintf(`NETWORK_PASSPHRASE="%s"`, r.networkPassphrase), + fmt.Sprintf(`BUCKET_DIR_PATH="%s"`, filepath.Join(r.getTmpDir(), "buckets")), + fmt.Sprintf(`METADATA_OUTPUT_STREAM="%s"`, r.getPipeName()), + } + for i, val := range r.historyURLs { + lines = append(lines, fmt.Sprintf("[HISTORY.h%d]", i)) + lines = append(lines, fmt.Sprintf(`get="curl -sf %s/{0} -o {1}"`, val)) + } + // Add a fictional quorum -- necessary to convince core to start up; + // but not used at all for our purposes. Pubkey here is just random. + lines = append(lines, + "[QUORUM_SET]", + "THRESHOLD_PERCENT=100", + `VALIDATORS=["GCZBOIAY4HLKAJVNJORXZOZRAY2BJDBZHKPBHZCRAIUR5IHC2UHBGCQR"]`) + return strings.ReplaceAll(strings.Join(lines, "\n"), "\\", "\\\\") +} + +func (r *stellarCoreRunner) getConfFileName() string { + return filepath.Join(r.getTmpDir(), "stellar-core.conf") +} + +func (*stellarCoreRunner) getLogLineWriter() io.Writer { + _, w := io.Pipe() + // br := bufio.NewReader(r) + // // Strip timestamps from log lines from captive stellar-core. We emit our own. + // dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + // go func() { + // for { + // line, e := br.ReadString('\n') + // if e != nil { + // break + // } + // line = dateRx.ReplaceAllString(line, "") + // fmt.Print(line) + // } + // }() + return w +} + +func (r *stellarCoreRunner) getTmpDir() string { + if r.tempDir != "" { + return r.tempDir + } + random := rand.New(rand.NewSource(time.Now().UnixNano())) + r.tempDir = filepath.Join(os.TempDir(), fmt.Sprintf("captive-stellar-core-%x", random.Uint64())) + return r.tempDir +} + +// Makes the temp directory and writes the config file to it; called by the +// platform-specific captiveStellarCore.Start() methods. +func (r *stellarCoreRunner) writeConf() error { + dir := r.getTmpDir() + e := os.MkdirAll(dir, 0755) + if e != nil { + return errors.Wrap(e, "error creating subprocess tmpdir") + } + conf := r.getConf() + return ioutil.WriteFile(r.getConfFileName(), []byte(conf), 0644) +} + +func (r *stellarCoreRunner) run(from, to uint32) error { + err := r.writeConf() + if err != nil { + return errors.Wrap(err, "error writing configuration") + } + + rangeArg := fmt.Sprintf("%d/%d", to, to-from+1) + args := []string{"--conf", r.getConfFileName(), "catchup", rangeArg, "--replay-in-memory"} + cmd := exec.Command(r.executablePath, args...) + cmd.Dir = r.getTmpDir() + cmd.Stdout = r.getLogLineWriter() + cmd.Stderr = cmd.Stdout + r.cmd = cmd + err = r.start() + if err != nil { + return errors.Wrap(err, "error starting stellar-core subprocess") + } + return nil +} + +func (r *stellarCoreRunner) getMetaPipe() io.Reader { + return r.metaPipe +} + +func (r *stellarCoreRunner) close() error { + var err1, err2 error + + if r.processIsAlive() { + err1 = r.cmd.Process.Kill() + r.cmd.Wait() + r.cmd = nil + } + err2 = os.RemoveAll(r.getTmpDir()) + if err1 != nil { + return errors.Wrap(err1, "error killing subprocess") + } + if err2 != nil { + return errors.Wrap(err2, "error removing subprocess tmpdir") + } + return nil +} diff --git a/exp/ingest/ledgerbackend/captive_core_backend_posix.go b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go similarity index 86% rename from exp/ingest/ledgerbackend/captive_core_backend_posix.go rename to exp/ingest/ledgerbackend/stellar_core_runner_posix.go index e85a531f7b..a0a088cc52 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend_posix.go +++ b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go @@ -10,9 +10,9 @@ import ( "github.com/pkg/errors" ) -// Posix-specific methods for the captiveStellarCore type. +// Posix-specific methods for the stellarCoreRunner type. -func (c *captiveStellarCore) getPipeName() string { +func (c *stellarCoreRunner) getPipeName() string { // The exec.Cmd.ExtraFiles field carries *io.File values that are assigned // to child process fds counting from 3, and we'll be passing exactly one // fd: the write end of the anonymous pipe below. @@ -20,8 +20,7 @@ func (c *captiveStellarCore) getPipeName() string { } // Starts the subprocess and sets the c.metaPipe field -func (c *captiveStellarCore) start() error { - +func (c *stellarCoreRunner) start() error { // First make an anonymous pipe. // Note io.File objects close-on-finalization. readFile, writeFile, e := os.Pipe() @@ -55,7 +54,7 @@ func (c *captiveStellarCore) start() error { return nil } -func (c *captiveStellarCore) processIsAlive() bool { +func (c *stellarCoreRunner) processIsAlive() bool { if c.cmd == nil { return false } diff --git a/exp/ingest/ledgerbackend/captive_core_backend_windows.go b/exp/ingest/ledgerbackend/stellar_core_runner_windows.go similarity index 87% rename from exp/ingest/ledgerbackend/captive_core_backend_windows.go rename to exp/ingest/ledgerbackend/stellar_core_runner_windows.go index f19d97921f..8e950d67a3 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend_windows.go +++ b/exp/ingest/ledgerbackend/stellar_core_runner_windows.go @@ -5,17 +5,18 @@ package ledgerbackend import ( "bufio" "fmt" - "github.com/Microsoft/go-winio" "os" + + "github.com/Microsoft/go-winio" ) -// Windows-specific methods for the captiveStellarCore type. +// Windows-specific methods for the stellarCoreRunner type. -func (c *captiveStellarCore) getPipeName() string { +func (c *stellarCoreRunner) getPipeName() string { return fmt.Sprintf(`\\.\pipe\%s`, c.nonce) } -func (c *captiveStellarCore) start() error { +func (c *stellarCoreRunner) start() error { // First set up the server pipe. listener, e := winio.ListenPipe(c.getPipeName(), nil) if e != nil { diff --git a/services/horizon/internal/expingest/ingest_history_range_state_test.go b/services/horizon/internal/expingest/ingest_history_range_state_test.go index f9ce94338f..5eb4e46672 100644 --- a/services/horizon/internal/expingest/ingest_history_range_state_test.go +++ b/services/horizon/internal/expingest/ingest_history_range_state_test.go @@ -180,6 +180,7 @@ type ReingestHistoryRangeStateTestSuite struct { suite.Suite historyQ *mockDBQ historyAdapter *adapters.MockHistoryArchiveAdapter + ledgerBackend *mockLedgerBackend runner *mockProcessorsRunner system *System } @@ -187,17 +188,21 @@ type ReingestHistoryRangeStateTestSuite struct { func (s *ReingestHistoryRangeStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &adapters.MockHistoryArchiveAdapter{} + s.ledgerBackend = &mockLedgerBackend{} s.runner = &mockProcessorsRunner{} s.system = &System{ ctx: context.Background(), historyQ: s.historyQ, historyAdapter: s.historyAdapter, + ledgerBackend: s.ledgerBackend, runner: s.runner, } s.historyQ.On("GetTx").Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() s.historyQ.On("Begin").Return(nil).Once() + + s.ledgerBackend.On("PrepareRange", uint32(100), uint32(200)).Return(nil).Once() } func (s *ReingestHistoryRangeStateTestSuite) TearDownTest() { @@ -356,7 +361,6 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccess() { func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { s.historyQ.On("GetLastLedgerExpIngestNonBlocking").Return(uint32(0), nil).Once() - s.historyQ.On("GetTx").Return(&sqlx.Tx{}).Once() toidFrom := toid.New(100, 0, 0) @@ -368,6 +372,10 @@ func (s *ReingestHistoryRangeStateTestSuite) TestSuccessOneLedger() { s.runner.On("RunTransactionProcessorsOnLedger", uint32(100)).Return(io.StatsLedgerTransactionProcessorResults{}, nil).Once() s.historyQ.On("Commit").Return(nil).Once() + // Recreate mock in this single test to remove previous assertion. + *s.ledgerBackend = mockLedgerBackend{} + s.ledgerBackend.On("PrepareRange", uint32(100), uint32(100)).Return(nil).Once() + err := s.system.ReingestRange(100, 100, false) s.Assert().NoError(err) } diff --git a/services/horizon/internal/expingest/main_test.go b/services/horizon/internal/expingest/main_test.go index b530051d3d..237128d5b7 100644 --- a/services/horizon/internal/expingest/main_test.go +++ b/services/horizon/internal/expingest/main_test.go @@ -359,6 +359,30 @@ func (m *mockDBQ) CreateAssets(assets []xdr.Asset, batchSize int) (map[string]hi return args.Get(0).(map[string]history.Asset), args.Error(1) } +type mockLedgerBackend struct { + mock.Mock +} + +func (m *mockLedgerBackend) GetLatestLedgerSequence() (sequence uint32, err error) { + args := m.Called() + return args.Get(0).(uint32), args.Error(1) +} + +func (m *mockLedgerBackend) GetLedger(sequence uint32) (bool, ledgerbackend.LedgerCloseMeta, error) { + args := m.Called(sequence) + return args.Get(0).(bool), args.Get(1).(ledgerbackend.LedgerCloseMeta), args.Error(2) +} + +func (m *mockLedgerBackend) PrepareRange(from uint32, to uint32) error { + args := m.Called(from, to) + return args.Error(0) +} + +func (m *mockLedgerBackend) Close() error { + args := m.Called() + return args.Error(0) +} + type mockProcessorsRunner struct { mock.Mock } diff --git a/support/historyarchive/verify.go b/support/historyarchive/verify.go index 56f35d4779..378fbf4b6c 100644 --- a/support/historyarchive/verify.go +++ b/support/historyarchive/verify.go @@ -228,7 +228,7 @@ func (arch *Archive) VerifyBucketEntries(h Hash) error { var entry xdr.BucketEntry err = rdr.ReadOne(&entry) if err == nil { - err2 := WriteFramedXdr(hsh, &entry) + err2 := xdr.MarshalFramed(hsh, &entry) if err2 != nil { return err2 } diff --git a/support/historyarchive/xdrstream.go b/support/historyarchive/xdrstream.go index ed1b2bf815..60647c9c8b 100644 --- a/support/historyarchive/xdrstream.go +++ b/support/historyarchive/xdrstream.go @@ -189,23 +189,3 @@ func (x *XdrStream) BytesRead() int64 { func (x *XdrStream) Discard(n int64) (int64, error) { return io.CopyN(ioutil.Discard, x.rdr, n) } - -func WriteFramedXdr(out io.Writer, in interface{}) error { - var tmp bytes.Buffer - n, err := xdr.Marshal(&tmp, in) - if err != nil { - return err - } - un := uint32(n) - if un > 0x7fffffff { - return fmt.Errorf("Overlong write: %d bytes", n) - } - - un = un | 0x80000000 - binary.Write(out, binary.BigEndian, &un) - k, err := tmp.WriteTo(out) - if int64(n) != k { - return fmt.Errorf("Mismatched write length: %d vs. %d", n, k) - } - return err -} diff --git a/support/historyarchive/xdrstream_test.go b/support/historyarchive/xdrstream_test.go index 4edea22143..8347d56a69 100644 --- a/support/historyarchive/xdrstream_test.go +++ b/support/historyarchive/xdrstream_test.go @@ -35,7 +35,7 @@ func TestXdrStreamHash(t *testing.T) { // - uint32 representing the number of bytes of a structure, // - xdr-encoded `BucketEntry` above. b := &bytes.Buffer{} - err := WriteFramedXdr(b, bucketEntry) + err := xdr.MarshalFramed(b, bucketEntry) require.NoError(t, err) expectedHash := sha256.Sum256(b.Bytes()) @@ -82,8 +82,8 @@ func TestXdrStreamDiscard(t *testing.T) { fullStream := createXdrStream(firstEntry, secondEntry) b := &bytes.Buffer{} - require.NoError(t, WriteFramedXdr(b, firstEntry)) - require.NoError(t, WriteFramedXdr(b, secondEntry)) + require.NoError(t, xdr.MarshalFramed(b, firstEntry)) + require.NoError(t, xdr.MarshalFramed(b, secondEntry)) expectedHash := sha256.Sum256(b.Bytes()) fullStream.SetExpectedHash(expectedHash) @@ -121,7 +121,7 @@ func TestXdrStreamDiscard(t *testing.T) { func createXdrStream(entries ...xdr.BucketEntry) *XdrStream { b := &bytes.Buffer{} for _, e := range entries { - err := WriteFramedXdr(b, e) + err := xdr.MarshalFramed(b, e) if err != nil { panic(err) } diff --git a/xdr/main.go b/xdr/main.go index c236854110..ac099d16ec 100644 --- a/xdr/main.go +++ b/xdr/main.go @@ -5,9 +5,13 @@ package xdr import ( "bytes" "encoding/base64" + "encoding/binary" "fmt" "io" "strings" + + xdr "github.com/stellar/go-xdr/xdr3" + "github.com/stellar/go/support/errors" ) // Keyer represents a type that can be converted into a LedgerKey @@ -69,6 +73,55 @@ func MarshalBase64(v interface{}) (string, error) { return base64.StdEncoding.EncodeToString(raw.Bytes()), nil } +func MarshalFramed(w io.Writer, v interface{}) error { + var tmp bytes.Buffer + n, err := Marshal(&tmp, v) + if err != nil { + return err + } + un := uint32(n) + if un > 0x7fffffff { + return fmt.Errorf("Overlong write: %d bytes", n) + } + + un = un | 0x80000000 + err = binary.Write(w, binary.BigEndian, &un) + if err != nil { + return errors.Wrap(err, "error in binary.Write") + } + k, err := tmp.WriteTo(w) + if int64(n) != k { + return fmt.Errorf("Mismatched write length: %d vs. %d", n, k) + } + return err +} + +// XDR and RPC define a (minimal) framing format which our metadata arrives in: a 4-byte +// big-endian length header that has the high bit set, followed by that length worth of +// XDR data. Decoding this involves just a little more work than xdr.Unmarshal. +func UnmarshalFramed(r io.Reader, v interface{}) (int, error) { + var frameLen uint32 + n, e := Unmarshal(r, &frameLen) + if e != nil { + return n, errors.Wrap(e, "unmarshalling XDR frame header") + } + if n != 4 { + return n, errors.New("bad length of XDR frame header") + } + if (frameLen & 0x80000000) != 0x80000000 { + return n, errors.New("malformed XDR frame header") + } + frameLen &= 0x7fffffff + m, err := xdr.Unmarshal(r, v) + if err != nil { + return n + m, errors.Wrap(err, "unmarshalling framed XDR") + } + if int64(m) != int64(frameLen) { + return n + m, errors.New("bad length of XDR frame body") + } + return m + n, nil +} + type countWriter struct { Count int } From d8007f2b5f5d7dd409e94aadb1b473241bcb49e0 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 5 Jun 2020 15:05:33 +0200 Subject: [PATCH 4/7] services/horizon: Write captive-core logs to /dev/null when discarding them (#2672) In macOS, captive-core blocks when writing to a pipe which isn't read from. --- .../ledgerbackend/stellar_core_runner.go | 35 ++++++++++--------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/exp/ingest/ledgerbackend/stellar_core_runner.go b/exp/ingest/ledgerbackend/stellar_core_runner.go index 9dee16c40f..0fc407e11d 100644 --- a/exp/ingest/ledgerbackend/stellar_core_runner.go +++ b/exp/ingest/ledgerbackend/stellar_core_runner.go @@ -1,6 +1,7 @@ package ledgerbackend import ( + "bufio" "fmt" "io" "io/ioutil" @@ -8,6 +9,7 @@ import ( "os" "os/exec" "path/filepath" + "regexp" "strings" "time" @@ -58,21 +60,21 @@ func (r *stellarCoreRunner) getConfFileName() string { return filepath.Join(r.getTmpDir(), "stellar-core.conf") } -func (*stellarCoreRunner) getLogLineWriter() io.Writer { - _, w := io.Pipe() - // br := bufio.NewReader(r) - // // Strip timestamps from log lines from captive stellar-core. We emit our own. - // dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) - // go func() { - // for { - // line, e := br.ReadString('\n') - // if e != nil { - // break - // } - // line = dateRx.ReplaceAllString(line, "") - // fmt.Print(line) - // } - // }() +func (*stellarCoreRunner) GetLogLineWriter() io.Writer { + r, w := io.Pipe() + br := bufio.NewReader(r) + // Strip timestamps from log lines from captive stellar-core. We emit our own. + dateRx := regexp.MustCompile(`^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3} `) + go func() { + for { + line, e := br.ReadString('\n') + if e != nil { + break + } + line = dateRx.ReplaceAllString(line, "") + fmt.Print(line) + } + }() return w } @@ -107,7 +109,8 @@ func (r *stellarCoreRunner) run(from, to uint32) error { args := []string{"--conf", r.getConfFileName(), "catchup", rangeArg, "--replay-in-memory"} cmd := exec.Command(r.executablePath, args...) cmd.Dir = r.getTmpDir() - cmd.Stdout = r.getLogLineWriter() + // In order to get the full stellar core logs: + // cmd.Stdout = r.GetLogLineWriter() cmd.Stderr = cmd.Stdout r.cmd = cmd err = r.start() From 188c373a1924b97482b12384673ea94d07a3d9a6 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Fri, 5 Jun 2020 15:06:07 +0200 Subject: [PATCH 5/7] Add missing Core dependencies in Dockefile (#2671) Also, upgrade the container to ubuntu 18.04 (16.04 didn't have a compatible libstdc++ library) --- services/horizon/docker/Dockerfile | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/services/horizon/docker/Dockerfile b/services/horizon/docker/Dockerfile index ba98718d7a..7e5e56318d 100644 --- a/services/horizon/docker/Dockerfile +++ b/services/horizon/docker/Dockerfile @@ -6,7 +6,7 @@ RUN go mod download COPY . ./ RUN go install github.com/stellar/go/services/horizon -FROM ubuntu:16.04 +FROM ubuntu:18.04 ENV STELLAR_CORE_VERSION 13.0.0-1220-9ed3da29 ENV STELLAR_CORE_BINARY_PATH /usr/local/bin/stellar-core @@ -14,10 +14,14 @@ ENV STELLAR_CORE_BINARY_PATH /usr/local/bin/stellar-core # ca-certificates are required to make tls connections RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates wget + +RUN apt-get install -y --no-install-recommends libpqxx-4.0v5 curl RUN wget -O stellar-core.deb https://s3.amazonaws.com/stellar.org/releases/stellar-core/stellar-core-${STELLAR_CORE_VERSION}_amd64.deb RUN dpkg -i stellar-core.deb RUN rm stellar-core.deb +RUN apt-get clean + COPY --from=builder /go/bin/horizon ./ ENTRYPOINT ["./horizon"] From f67623aa5ed9d64afe7a38b0f8aab6096b4eb392 Mon Sep 17 00:00:00 2001 From: Bartek Nowotarski Date: Thu, 11 Jun 2020 19:55:10 +0200 Subject: [PATCH 6/7] exp/ingest/io: Refactor readers (#2644) * Rename `DBLedgerReader` to `LedgerTransactionReader` because it works with any ledger backend. * `LedgerTransactionReader` gets `LedgerCloseMeta` from a provided ledger backend. * `LedgerChangeReader` has been refactored and now embeds `LedgerTransactionReader`. * Updated `GetChangesFromLedgerEntryChanges` to be a public function. * Removed `ctx.Context` from readers because it's not idiomatic. * Removed unused interfaces and mocks. Part of #2187. --- exp/ingest/io/change.go | 48 +++++ exp/ingest/io/change_reader.go | 165 ------------------ exp/ingest/io/ledger_change_reader.go | 127 ++++++++++++++ exp/ingest/io/ledger_change_reader_test.go | 81 +-------- exp/ingest/io/ledger_reader.go | 144 --------------- exp/ingest/io/ledger_transaction.go | 62 +------ exp/ingest/io/ledger_transaction_reader.go | 81 +++++++++ exp/ingest/io/mock_ledger_reader.go | 46 ----- .../io/mock_ledger_transaction_processor.go | 14 -- exp/ingest/io/processors.go | 2 +- .../internal/expingest/processor_runner.go | 29 ++- 11 files changed, 283 insertions(+), 516 deletions(-) delete mode 100644 exp/ingest/io/change_reader.go create mode 100644 exp/ingest/io/ledger_change_reader.go delete mode 100644 exp/ingest/io/ledger_reader.go create mode 100644 exp/ingest/io/ledger_transaction_reader.go delete mode 100644 exp/ingest/io/mock_ledger_reader.go delete mode 100644 exp/ingest/io/mock_ledger_transaction_processor.go diff --git a/exp/ingest/io/change.go b/exp/ingest/io/change.go index 5a87301a4f..6d65cd7e74 100644 --- a/exp/ingest/io/change.go +++ b/exp/ingest/io/change.go @@ -20,6 +20,54 @@ type Change struct { Post *xdr.LedgerEntry } +// GetChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. +// Each `update` and `removed` is preceded with `state` and `create` changes +// are alone, without `state`. The transformation we're doing is to move each +// change (state/update, state/removed or create) to an array of pre/post pairs. +// Then: +// - for create, pre is null and post is a new entry, +// - for update, pre is previous state and post is the current state, +// - for removed, pre is previous state and post is null. +// +// stellar-core source: +// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 +func GetChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { + changes := []Change{} + + for i, entryChange := range ledgerEntryChanges { + switch entryChange.Type { + case xdr.LedgerEntryChangeTypeLedgerEntryCreated: + created := entryChange.MustCreated() + changes = append(changes, Change{ + Type: created.Data.Type, + Pre: nil, + Post: &created, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: + state := ledgerEntryChanges[i-1].MustState() + updated := entryChange.MustUpdated() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: &updated, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: + state := ledgerEntryChanges[i-1].MustState() + changes = append(changes, Change{ + Type: state.Data.Type, + Pre: &state, + Post: nil, + }) + case xdr.LedgerEntryChangeTypeLedgerEntryState: + continue + default: + panic("Invalid LedgerEntryChangeType") + } + } + + return changes +} + // LedgerEntryChangeType returns type in terms of LedgerEntryChangeType. func (c *Change) LedgerEntryChangeType() xdr.LedgerEntryChangeType { switch { diff --git a/exp/ingest/io/change_reader.go b/exp/ingest/io/change_reader.go deleted file mode 100644 index 065df2989e..0000000000 --- a/exp/ingest/io/change_reader.go +++ /dev/null @@ -1,165 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/xdr" -) - -// ChangeReader provides convenient, streaming access to a sequence of Changes -type ChangeReader interface { - // Read should return the next `Change` in the leader. If there are no more - // changes left it should return an `io.EOF` error. - Read() (Change, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some changes available so reader can stop - // streaming them. - Close() error -} - -// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core -// for a single ledger -type LedgerChangeReader struct { - dbReader DBLedgerReader - streamedFeeChanges bool - streamedMetaChanges bool - streamedUpgradeChanges bool - pending []Change - pendingIndex int -} - -// Ensure LedgerChangeReader implements ChangeReader -var _ ChangeReader = (*LedgerChangeReader)(nil) - -// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. -// Note that the returned LedgerChangeReader is not thread safe and should not be shared -// by multiple goroutines. -func NewLedgerChangeReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*LedgerChangeReader, error) { - reader, err := NewDBLedgerReader(ctx, sequence, backend) - if err != nil { - return nil, err - } - - return &LedgerChangeReader{dbReader: *reader}, nil -} - -// GetHeader returns the ledger header for the reader -func (r *LedgerChangeReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return r.dbReader.GetHeader() -} - -func (r *LedgerChangeReader) getNextFeeChange() (Change, error) { - if r.streamedFeeChanges { - return Change{}, io.EOF - } - - // Remember that it's possible that transaction can remove a preauth - // tx signer even when it's a failed transaction so we need to check - // failed transactions too. - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.dbReader.rewind() - r.streamedFeeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes := transaction.GetFeeChanges() - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextMetaChange() (Change, error) { - if r.streamedMetaChanges { - return Change{}, io.EOF - } - - for { - transaction, err := r.dbReader.Read() - if err != nil { - if err == io.EOF { - r.streamedMetaChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - changes, err := transaction.GetChanges() - if err != nil { - return Change{}, err - } - if len(changes) >= 1 { - r.pending = append(r.pending, changes[1:]...) - return changes[0], nil - } - } -} - -func (r *LedgerChangeReader) getNextUpgradeChange() (Change, error) { - if r.streamedUpgradeChanges { - return Change{}, io.EOF - } - - change, err := r.dbReader.readUpgradeChange() - if err != nil { - if err == io.EOF { - r.streamedUpgradeChanges = true - return Change{}, io.EOF - } else { - return Change{}, err - } - } - - return change, nil -} - -// Read returns the next change in the stream. -// If there are no changes remaining io.EOF is returned -// as an error. -func (r *LedgerChangeReader) Read() (Change, error) { - if err := r.dbReader.ctx.Err(); err != nil { - return Change{}, err - } - - if r.pendingIndex < len(r.pending) { - next := r.pending[r.pendingIndex] - r.pendingIndex++ - if r.pendingIndex == len(r.pending) { - r.pendingIndex = 0 - r.pending = r.pending[:0] - } - return next, nil - } - - change, err := r.getNextFeeChange() - if err == nil || err != io.EOF { - return change, err - } - - change, err = r.getNextMetaChange() - if err == nil || err != io.EOF { - return change, err - } - - return r.getNextUpgradeChange() -} - -func (r *LedgerChangeReader) Close() error { - r.pending = nil - r.streamedFeeChanges = true - r.streamedMetaChanges = true - r.streamedUpgradeChanges = true - return r.dbReader.Close() -} diff --git a/exp/ingest/io/ledger_change_reader.go b/exp/ingest/io/ledger_change_reader.go new file mode 100644 index 0000000000..3879efaebe --- /dev/null +++ b/exp/ingest/io/ledger_change_reader.go @@ -0,0 +1,127 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" +) + +// ChangeReader provides convenient, streaming access to a sequence of Changes. +type ChangeReader interface { + // Read should return the next `Change` in the leader. If there are no more + // changes left it should return an `io.EOF` error. + Read() (Change, error) + // Close should be called when reading is finished. This is especially + // helpful when there are still some changes available so reader can stop + // streaming them. + Close() error +} + +// ledgerChangeReaderState defines possible states of LedgerChangeReader. +type ledgerChangeReaderState int + +const ( + // feeChangesState is active when LedgerChangeReader is reading fee changes. + feeChangesState ledgerChangeReaderState = iota + // feeChangesState is active when LedgerChangeReader is reading transaction meta changes. + metaChangesState + // feeChangesState is active when LedgerChangeReader is reading upgrade changes. + upgradeChangesState +) + +// LedgerChangeReader is a ChangeReader which returns Changes from Stellar Core +// for a single ledger +type LedgerChangeReader struct { + *LedgerTransactionReader + state ledgerChangeReaderState + pending []Change + pendingIndex int + upgradeIndex int +} + +// Ensure LedgerChangeReader implements ChangeReader +var _ ChangeReader = (*LedgerChangeReader)(nil) + +// NewLedgerChangeReader constructs a new LedgerChangeReader instance bound to the given ledger. +// Note that the returned LedgerChangeReader is not thread safe and should not be shared +// by multiple goroutines. +func NewLedgerChangeReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerChangeReader, error) { + transactionReader, err := NewLedgerTransactionReader(backend, sequence) + if err != nil { + return nil, err + } + + return &LedgerChangeReader{ + LedgerTransactionReader: transactionReader, + state: feeChangesState, + }, nil +} + +// Read returns the next change in the stream. +// If there are no changes remaining io.EOF is returned as an error. +func (r *LedgerChangeReader) Read() (Change, error) { + // Changes within a ledger should be read in the following order: + // - fee changes of all transactions, + // - transaction meta changes of all transactions, + // - upgrade changes. + // Because a single transaction can introduce many changes we read all the + // changes from a single transaction and save them in r.pending. + // When Read() is called we stream pending changes first. We also call Read() + // recursively after adding some changes (what will return them from r.pending) + // to not duplicate the code. + if r.pendingIndex < len(r.pending) { + next := r.pending[r.pendingIndex] + r.pendingIndex++ + if r.pendingIndex == len(r.pending) { + r.pendingIndex = 0 + r.pending = r.pending[:0] + } + return next, nil + } + + switch r.state { + case feeChangesState, metaChangesState: + tx, err := r.LedgerTransactionReader.Read() + if err != nil { + if err == io.EOF { + // If done streaming fee changes rewind to stream meta changes + if r.state == feeChangesState { + r.LedgerTransactionReader.Rewind() + } + r.state++ + return r.Read() + } + return Change{}, err + } + + switch r.state { + case feeChangesState: + r.pending = append(r.pending, tx.GetFeeChanges()...) + case metaChangesState: + metaChanges, err := tx.GetChanges() + if err != nil { + return Change{}, err + } + r.pending = append(r.pending, metaChanges...) + } + return r.Read() + case upgradeChangesState: + // Get upgrade changes + if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta) { + changes := GetChangesFromLedgerEntryChanges( + r.LedgerTransactionReader.ledgerCloseMeta.UpgradesMeta[r.upgradeIndex], + ) + r.pending = append(r.pending, changes...) + r.upgradeIndex++ + return r.Read() + } + } + + return Change{}, io.EOF +} + +// Close should be called when reading is finished. +func (r *LedgerChangeReader) Close() error { + r.pending = nil + return r.LedgerTransactionReader.Close() +} diff --git a/exp/ingest/io/ledger_change_reader_test.go b/exp/ingest/io/ledger_change_reader_test.go index 85cf27ee30..4fc58d6180 100644 --- a/exp/ingest/io/ledger_change_reader_test.go +++ b/exp/ingest/io/ledger_change_reader_test.go @@ -1,7 +1,6 @@ package io import ( - "context" "fmt" "io" "testing" @@ -26,11 +25,11 @@ func TestNewLedgerChangeReaderFails(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, fmt.Errorf("ledger error"), ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.EqualError( t, err, - "error reading ledger from backend: ledger error", + "error getting ledger from the backend: ledger error", ) } @@ -42,7 +41,7 @@ func TestNewLedgerChangeReaderLedgerDoesNotExist(t *testing.T) { ledgerbackend.LedgerCloseMeta{}, nil, ).Once() - _, err := NewLedgerChangeReader(context.Background(), seq, mock) + _, err := NewLedgerChangeReader(mock, seq) assert.Equal( t, err, @@ -69,7 +68,7 @@ func TestNewLedgerChangeReaderSucceeds(t *testing.T) { nil, ).Once() - reader, err := NewLedgerChangeReader(context.Background(), seq, mock) + reader, err := NewLedgerChangeReader(mock, seq) assert.NoError(t, err) assert.Equal(t, reader.GetHeader(), header) @@ -108,7 +107,7 @@ func assertChangesEqual( backend ledgerbackend.LedgerBackend, expected []balanceEntry, ) { - reader, err := NewLedgerChangeReader(context.Background(), sequence, backend) + reader, err := NewLedgerChangeReader(backend, sequence) assert.NoError(t, err) changes := []balanceEntry{} @@ -266,73 +265,3 @@ func TestLedgerChangeReaderOrder(t *testing.T) { assertChangesEqual(t, seq, mock, []balanceEntry{}) mock.AssertExpectations(t) } - -func TestLedgerChangeReaderContext(t *testing.T) { - mock := &ledgerbackend.MockDatabaseBackend{} - seq := uint32(123) - - ledger := ledgerbackend.LedgerCloseMeta{ - TransactionResult: []xdr.TransactionResultPair{ - xdr.TransactionResultPair{}, - xdr.TransactionResultPair{}, - }, - TransactionEnvelope: []xdr.TransactionEnvelope{ - xdr.TransactionEnvelope{}, - xdr.TransactionEnvelope{}, - }, - TransactionMeta: []xdr.TransactionMeta{ - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - xdr.TransactionMeta{ - V: 1, - V1: &xdr.TransactionMetaV1{ - Operations: []xdr.OperationMeta{}, - }, - }, - }, - TransactionFeeChanges: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 100), - }, - xdr.LedgerEntryChanges{ - buildChange(feeAddress, 300), - }, - }, - UpgradesMeta: []xdr.LedgerEntryChanges{ - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 2), - }, - xdr.LedgerEntryChanges{ - buildChange(upgradeAddress, 3), - }, - }, - } - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel := context.WithCancel(context.Background()) - reader, err := NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) - - mock.On("GetLedger", seq).Return(true, ledger, nil).Once() - ctx, cancel = context.WithCancel(context.Background()) - reader, err = NewLedgerChangeReader(ctx, seq, mock) - mock.AssertExpectations(t) - assert.NoError(t, err) - - change, err := reader.Read() - assert.Equal(t, balanceEntry{feeAddress, 100}, parseChange(change)) - assert.NoError(t, err) - - cancel() - _, err = reader.Read() - assert.Equal(t, context.Canceled, err) -} diff --git a/exp/ingest/io/ledger_reader.go b/exp/ingest/io/ledger_reader.go deleted file mode 100644 index 63bc271cd8..0000000000 --- a/exp/ingest/io/ledger_reader.go +++ /dev/null @@ -1,144 +0,0 @@ -package io - -import ( - "context" - "io" - - "github.com/stellar/go/exp/ingest/ledgerbackend" - "github.com/stellar/go/support/errors" - "github.com/stellar/go/xdr" -) - -// LedgerReader provides convenient, streaming access to the transactions within a ledger. -type LedgerReader interface { - GetSequence() uint32 - GetHeader() xdr.LedgerHeaderHistoryEntry - // Read should return the next transaction. If there are no more - // transactions it should return `io.EOF` error. - Read() (LedgerTransaction, error) - // Close should be called when reading is finished. This is especially - // helpful when there are still some transactions available so reader can stop - // streaming them. - Close() error -} - -// DBLedgerReader is a database-backed implementation of the io.LedgerReader interface. -// Use NewDBLedgerReader to create a new instance. -type DBLedgerReader struct { - ctx context.Context - sequence uint32 - backend ledgerbackend.LedgerBackend - header xdr.LedgerHeaderHistoryEntry - transactions []LedgerTransaction - upgradeChanges []Change - readIdx int - upgradeReadIdx int -} - -// Ensure DBLedgerReader implements LedgerReader -var _ LedgerReader = (*DBLedgerReader)(nil) - -// NewDBLedgerReader creates a new DBLedgerReader instance. -// Note that DBLedgerReader is not thread safe and should not be shared by multiple goroutines -func NewDBLedgerReader( - ctx context.Context, sequence uint32, backend ledgerbackend.LedgerBackend, -) (*DBLedgerReader, error) { - reader := &DBLedgerReader{ - ctx: ctx, - sequence: sequence, - backend: backend, - } - - err := reader.init() - if err != nil { - return nil, err - } - - return reader, nil -} - -// GetSequence returns the sequence number of the ledger data stored by this object. -func (dblrc *DBLedgerReader) GetSequence() uint32 { - return dblrc.sequence -} - -// GetHeader returns the XDR Header data associated with the stored ledger. -func (dblrc *DBLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - return dblrc.header -} - -// Read returns the next transaction in the ledger, ordered by tx number, each time it is called. When there -// are no more transactions to return, an EOF error is returned. -func (dblrc *DBLedgerReader) Read() (LedgerTransaction, error) { - if err := dblrc.ctx.Err(); err != nil { - return LedgerTransaction{}, err - } - - if dblrc.readIdx < len(dblrc.transactions) { - dblrc.readIdx++ - return dblrc.transactions[dblrc.readIdx-1], nil - } - return LedgerTransaction{}, io.EOF -} - -// readUpgradeChange returns the next upgrade change in the ledger, each time it -// is called. When there are no more upgrades to return, an EOF error is returned. -func (dblrc *DBLedgerReader) readUpgradeChange() (Change, error) { - if err := dblrc.ctx.Err(); err != nil { - return Change{}, err - } - - if dblrc.upgradeReadIdx < len(dblrc.upgradeChanges) { - dblrc.upgradeReadIdx++ - return dblrc.upgradeChanges[dblrc.upgradeReadIdx-1], nil - } - return Change{}, io.EOF -} - -// Rewind resets the reader back to the first transaction in the ledger -func (dblrc *DBLedgerReader) rewind() { - dblrc.readIdx = 0 -} - -// Init pulls data from the backend to set this object up for use. -func (dblrc *DBLedgerReader) init() error { - exists, ledgerCloseMeta, err := dblrc.backend.GetLedger(dblrc.sequence) - - if err != nil { - return errors.Wrap(err, "error reading ledger from backend") - } - if !exists { - return ErrNotFound - } - - dblrc.header = ledgerCloseMeta.LedgerHeader - - dblrc.storeTransactions(ledgerCloseMeta) - - for _, upgradeChanges := range ledgerCloseMeta.UpgradesMeta { - changes := getChangesFromLedgerEntryChanges(upgradeChanges) - dblrc.upgradeChanges = append(dblrc.upgradeChanges, changes...) - } - - return nil -} - -// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide -// a per-transaction view of the data when Read() is called. -func (dblrc *DBLedgerReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { - for i := range lcm.TransactionEnvelope { - dblrc.transactions = append(dblrc.transactions, LedgerTransaction{ - Index: uint32(i + 1), // Transactions start at '1' - Envelope: lcm.TransactionEnvelope[i], - Result: lcm.TransactionResult[i], - Meta: lcm.TransactionMeta[i], - FeeChanges: lcm.TransactionFeeChanges[i], - }) - } -} - -func (dblrc *DBLedgerReader) Close() error { - dblrc.transactions = nil - dblrc.upgradeChanges = nil - return nil -} diff --git a/exp/ingest/io/ledger_transaction.go b/exp/ingest/io/ledger_transaction.go index 0f222f4f8c..766d2afe67 100644 --- a/exp/ingest/io/ledger_transaction.go +++ b/exp/ingest/io/ledger_transaction.go @@ -24,7 +24,7 @@ func (t *LedgerTransaction) txInternalError() bool { // GetFeeChanges returns a developer friendly representation of LedgerEntryChanges // connected to fees. func (t *LedgerTransaction) GetFeeChanges() []Change { - return getChangesFromLedgerEntryChanges(t.FeeChanges) + return GetChangesFromLedgerEntryChanges(t.FeeChanges) } // GetChanges returns a developer friendly representation of LedgerEntryChanges. @@ -40,7 +40,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { return changes, errors.New("TransactionMeta.V=0 not supported") case 1: v1Meta := t.Meta.MustV1() - txChanges := getChangesFromLedgerEntryChanges(v1Meta.TxChanges) + txChanges := GetChangesFromLedgerEntryChanges(v1Meta.TxChanges) changes = append(changes, txChanges...) // Ignore operations meta if txInternalError https://github.com/stellar/go/issues/2111 @@ -49,7 +49,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v1Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) @@ -57,7 +57,7 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { case 2: v2Meta := t.Meta.MustV2() - txChangesBefore := getChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) + txChangesBefore := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesBefore) changes = append(changes, txChangesBefore...) // Ignore operations meta and txChangesAfter if txInternalError @@ -67,13 +67,13 @@ func (t *LedgerTransaction) GetChanges() ([]Change, error) { } for _, operationMeta := range v2Meta.Operations { - opChanges := getChangesFromLedgerEntryChanges( + opChanges := GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) changes = append(changes, opChanges...) } - txChangesAfter := getChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) + txChangesAfter := GetChangesFromLedgerEntryChanges(v2Meta.TxChangesAfter) changes = append(changes, txChangesAfter...) default: return changes, errors.New("Unsupported TransactionMeta version") @@ -120,55 +120,7 @@ func operationChanges(ops []xdr.OperationMeta, index uint32) []Change { } operationMeta := ops[index] - return getChangesFromLedgerEntryChanges( + return GetChangesFromLedgerEntryChanges( operationMeta.Changes, ) } - -// getChangesFromLedgerEntryChanges transforms LedgerEntryChanges to []Change. -// Each `update` and `removed` is preceded with `state` and `create` changes -// are alone, without `state`. The transformation we're doing is to move each -// change (state/update, state/removed or create) to an array of pre/post pairs. -// Then: -// - for create, pre is null and post is a new entry, -// - for update, pre is previous state and post is the current state, -// - for removed, pre is previous state and post is null. -// -// stellar-core source: -// https://github.com/stellar/stellar-core/blob/e584b43/src/ledger/LedgerTxn.cpp#L582 -func getChangesFromLedgerEntryChanges(ledgerEntryChanges xdr.LedgerEntryChanges) []Change { - changes := []Change{} - - for i, entryChange := range ledgerEntryChanges { - switch entryChange.Type { - case xdr.LedgerEntryChangeTypeLedgerEntryCreated: - created := entryChange.MustCreated() - changes = append(changes, Change{ - Type: created.Data.Type, - Pre: nil, - Post: &created, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: - state := ledgerEntryChanges[i-1].MustState() - updated := entryChange.MustUpdated() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: &updated, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: - state := ledgerEntryChanges[i-1].MustState() - changes = append(changes, Change{ - Type: state.Data.Type, - Pre: &state, - Post: nil, - }) - case xdr.LedgerEntryChangeTypeLedgerEntryState: - continue - default: - panic("Invalid LedgerEntryChangeType") - } - } - - return changes -} diff --git a/exp/ingest/io/ledger_transaction_reader.go b/exp/ingest/io/ledger_transaction_reader.go new file mode 100644 index 0000000000..534ca86a72 --- /dev/null +++ b/exp/ingest/io/ledger_transaction_reader.go @@ -0,0 +1,81 @@ +package io + +import ( + "io" + + "github.com/stellar/go/exp/ingest/ledgerbackend" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/xdr" +) + +// LedgerTransactionReader reads transactions for a given ledger sequence from a backend. +// Use NewTransactionReader to create a new instance. +type LedgerTransactionReader struct { + ledgerCloseMeta ledgerbackend.LedgerCloseMeta + transactions []LedgerTransaction + readIdx int +} + +// NewLedgerTransactionReader creates a new TransactionReader instance. +// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines +func NewLedgerTransactionReader(backend ledgerbackend.LedgerBackend, sequence uint32) (*LedgerTransactionReader, error) { + exists, ledgerCloseMeta, err := backend.GetLedger(sequence) + if err != nil { + return nil, errors.Wrap(err, "error getting ledger from the backend") + } + + if !exists { + return nil, ErrNotFound + } + + reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta} + reader.storeTransactions(ledgerCloseMeta) + return reader, nil +} + +// GetSequence returns the sequence number of the ledger data stored by this object. +func (reader *LedgerTransactionReader) GetSequence() uint32 { + return uint32(reader.ledgerCloseMeta.LedgerHeader.Header.LedgerSeq) +} + +// GetHeader returns the XDR Header data associated with the stored ledger. +func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry { + return reader.ledgerCloseMeta.LedgerHeader +} + +// Read returns the next transaction in the ledger, ordered by tx number, each time +// it is called. When there are no more transactions to return, an EOF error is returned. +func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) { + if reader.readIdx < len(reader.transactions) { + reader.readIdx++ + return reader.transactions[reader.readIdx-1], nil + } + return LedgerTransaction{}, io.EOF +} + +// Rewind resets the reader back to the first transaction in the ledger +func (reader *LedgerTransactionReader) Rewind() { + reader.readIdx = 0 +} + +// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide +// a per-transaction view of the data when Read() is called. +func (reader *LedgerTransactionReader) storeTransactions(lcm ledgerbackend.LedgerCloseMeta) { + for i := range lcm.TransactionEnvelope { + reader.transactions = append(reader.transactions, LedgerTransaction{ + Index: uint32(i + 1), // Transactions start at '1' + Envelope: lcm.TransactionEnvelope[i], + Result: lcm.TransactionResult[i], + Meta: lcm.TransactionMeta[i], + FeeChanges: lcm.TransactionFeeChanges[i], + }) + } +} + +// Close should be called when reading is finished. This is especially +// helpful when there are still some transactions available so reader can stop +// streaming them. +func (reader *LedgerTransactionReader) Close() error { + reader.transactions = nil + return nil +} diff --git a/exp/ingest/io/mock_ledger_reader.go b/exp/ingest/io/mock_ledger_reader.go deleted file mode 100644 index aa05bde538..0000000000 --- a/exp/ingest/io/mock_ledger_reader.go +++ /dev/null @@ -1,46 +0,0 @@ -package io - -import ( - "github.com/stellar/go/xdr" - "github.com/stretchr/testify/mock" -) - -var _ LedgerReader = (*MockLedgerReader)(nil) - -type MockLedgerReader struct { - mock.Mock -} - -func (m *MockLedgerReader) GetSequence() uint32 { - args := m.Called() - return args.Get(0).(uint32) -} - -func (m *MockLedgerReader) GetHeader() xdr.LedgerHeaderHistoryEntry { - args := m.Called() - return args.Get(0).(xdr.LedgerHeaderHistoryEntry) -} - -func (m *MockLedgerReader) Read() (LedgerTransaction, error) { - args := m.Called() - return args.Get(0).(LedgerTransaction), args.Error(1) -} - -func (m *MockLedgerReader) ReadUpgradeChange() (Change, error) { - args := m.Called() - return args.Get(0).(Change), args.Error(1) -} - -func (m *MockLedgerReader) GetUpgradeChanges() []Change { - args := m.Called() - return args.Get(0).([]Change) -} - -func (m *MockLedgerReader) IgnoreUpgradeChanges() { - m.Called() -} - -func (m *MockLedgerReader) Close() error { - args := m.Called() - return args.Error(0) -} diff --git a/exp/ingest/io/mock_ledger_transaction_processor.go b/exp/ingest/io/mock_ledger_transaction_processor.go deleted file mode 100644 index 4834682cc5..0000000000 --- a/exp/ingest/io/mock_ledger_transaction_processor.go +++ /dev/null @@ -1,14 +0,0 @@ -package io - -import "github.com/stretchr/testify/mock" - -var _ LedgerTransactionProcessor = (*MockLedgerTransactionProcessor)(nil) - -type MockLedgerTransactionProcessor struct { - mock.Mock -} - -func (m *MockLedgerTransactionProcessor) ProcessTransaction(transaction LedgerTransaction) error { - args := m.Called(transaction) - return args.Error(0) -} diff --git a/exp/ingest/io/processors.go b/exp/ingest/io/processors.go index 6f5271370e..ee4f2e81e1 100644 --- a/exp/ingest/io/processors.go +++ b/exp/ingest/io/processors.go @@ -16,7 +16,7 @@ type LedgerTransactionProcessor interface { func StreamLedgerTransactions( txProcessor LedgerTransactionProcessor, - reader LedgerReader, + reader *LedgerTransactionReader, ) error { for { tx, err := reader.Read() diff --git a/services/horizon/internal/expingest/processor_runner.go b/services/horizon/internal/expingest/processor_runner.go index f7bd978628..658abed151 100644 --- a/services/horizon/internal/expingest/processor_runner.go +++ b/services/horizon/internal/expingest/processor_runner.go @@ -158,20 +158,19 @@ func (s *ProcessorRunner) validateBucketList(ledgerSequence uint32) error { return errors.Wrap(err, "Error getting bucket list hash") } - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledgerSequence, s.ledgerBackend) + exists, ledgerCloseMeta, err := s.ledgerBackend.GetLedger(ledgerSequence) if err != nil { - if err == io.ErrNotFound { - return fmt.Errorf( - "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", - ledgerSequence, - ) - } else { - return errors.Wrap(err, "Error getting ledger") - } + return errors.Wrap(err, "Error getting ledger") + } + + if !exists { + return fmt.Errorf( + "cannot validate bucket hash list. Checkpoint ledger (%d) must exist in Stellar-Core database.", + ledgerSequence, + ) } - ledgerHeader := ledgerReader.GetHeader() - ledgerBucketHashList := ledgerHeader.Header.BucketListHash + ledgerBucketHashList := ledgerCloseMeta.LedgerHeader.Header.BucketListHash if !bytes.Equal(historyBucketListHash[:], ledgerBucketHashList[:]) { return fmt.Errorf( @@ -238,7 +237,7 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( ) error { var changeReader io.ChangeReader var err error - changeReader, err = io.NewLedgerChangeReader(s.ctx, ledger, s.ledgerBackend) + changeReader, err = io.NewLedgerChangeReader(s.ledgerBackend, ledger) if err != nil { return errors.Wrap(err, "Error creating ledger change reader") } @@ -264,13 +263,13 @@ func (s *ProcessorRunner) runChangeProcessorOnLedger( func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger uint32) (io.StatsLedgerTransactionProcessorResults, error) { ledgerTransactionStats := io.StatsLedgerTransactionProcessor{} - ledgerReader, err := io.NewDBLedgerReader(s.ctx, ledger, s.ledgerBackend) + transactionReader, err := io.NewLedgerTransactionReader(s.ledgerBackend, ledger) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error creating ledger reader") } - txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, ledgerReader.GetHeader()) - err = io.StreamLedgerTransactions(txProcessor, ledgerReader) + txProcessor := s.buildTransactionProcessor(&ledgerTransactionStats, transactionReader.GetHeader()) + err = io.StreamLedgerTransactions(txProcessor, transactionReader) if err != nil { return ledgerTransactionStats.GetResults(), errors.Wrap(err, "Error streaming changes from ledger") } From 936bc3adad5fd2abded25cbfcd404106e410caed Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Mon, 15 Jun 2020 19:28:36 +0200 Subject: [PATCH 7/7] services/horizon: Add read-ahead buffer for decoded LedgerCloseMeta (#2693) --- .../ledgerbackend/captive_core_backend.go | 135 ++++++++++++++---- .../stellar_core_runner_posix.go | 3 +- 2 files changed, 111 insertions(+), 27 deletions(-) diff --git a/exp/ingest/ledgerbackend/captive_core_backend.go b/exp/ingest/ledgerbackend/captive_core_backend.go index e08d975928..9a9a8ff620 100644 --- a/exp/ingest/ledgerbackend/captive_core_backend.go +++ b/exp/ingest/ledgerbackend/captive_core_backend.go @@ -3,10 +3,13 @@ package ledgerbackend import ( "io" "sync" + "time" "github.com/pkg/errors" + "github.com/stellar/go/network" "github.com/stellar/go/support/historyarchive" + "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" ) @@ -27,14 +30,18 @@ var _ LedgerBackend = (*captiveStellarCore)(nil) // TODO: switch from history URLs to history archive interface provided from support package, to permit mocking -// In this (crude, initial) sketch, we replay ledgers in blocks of 17,280 -// which is 24 hours worth of ledgers at 5 second intervals. -const ledgersPerProcess = 17280 -const ledgersPerCheckpoint = 64 +const ( + // In this (crude, initial) sketch, we replay ledgers in blocks of 17,280 + // which is 24 hours worth of ledgers at 5 second intervals. + ledgersPerProcess = 17280 + ledgersPerCheckpoint = 64 + + // The number of checkpoints we're willing to scan over and ignore, without + // restarting a subprocess. + numCheckpointsLeeway = 10 -// The number of checkpoints we're willing to scan over and ignore, without -// restarting a subprocess. -const numCheckpointsLeeway = 10 + readAheadBufferSize = 2 +) func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { v := (ledger / ledgersPerCheckpoint) * ledgersPerCheckpoint @@ -45,11 +52,21 @@ func roundDownToFirstReplayAfterCheckpointStart(ledger uint32) uint32 { return v } +type metaResult struct { + *xdr.LedgerCloseMeta + err error +} + type captiveStellarCore struct { networkPassphrase string historyURLs []string lastLedger *uint32 // end of current segment if offline, nil if online + // read-ahead buffer + stop chan struct{} + wait sync.WaitGroup + metaC chan metaResult + stellarCoreRunner stellarCoreRunnerInterface nextLedgerMutex sync.Mutex @@ -158,9 +175,74 @@ func (c *captiveStellarCore) openOfflineReplaySubprocess(nextLedger, lastLedger c.nextLedger = roundDownToFirstReplayAfterCheckpointStart(nextLedger) c.nextLedgerMutex.Unlock() c.lastLedger = &lastLedger + + // read-ahead buffer + c.metaC = make(chan metaResult, readAheadBufferSize) + c.stop = make(chan struct{}) + c.wait.Add(1) + go c.sendLedgerMeta(lastLedger) return nil } +// sendLedgerMeta reads from the captive core pipe, decodes the ledger metadata +// and sends it to the metadata buffered channel +func (c *captiveStellarCore) sendLedgerMeta(untilSequence uint32) { + defer c.wait.Done() + printBufferOccupation := time.NewTicker(5 * time.Second) + defer printBufferOccupation.Stop() + for { + select { + case <-c.stop: + return + case <-printBufferOccupation.C: + log.Debug("captive core read-ahead buffer occupation:", len(c.metaC)) + default: + } + meta, err := c.readLedgerMetaFromPipe() + if err != nil { + select { + case <-c.stop: + case c.metaC <- metaResult{nil, err}: + } + return + } + select { + case <-c.stop: + return + case c.metaC <- metaResult{meta, nil}: + } + seq, err := peekLedgerSequence(meta) + if err != nil { + select { + case <-c.stop: + case c.metaC <- metaResult{nil, err}: + } + return + } + if seq >= untilSequence { + // we are done + return + } + } +} + +func (c *captiveStellarCore) readLedgerMetaFromPipe() (*xdr.LedgerCloseMeta, error) { + metaPipe := c.stellarCoreRunner.getMetaPipe() + if metaPipe == nil { + return nil, errors.New("missing metadata pipe") + } + var xlcm xdr.LedgerCloseMeta + _, e0 := xdr.UnmarshalFramed(metaPipe, &xlcm) + if e0 != nil { + if e0 == io.EOF { + return nil, errors.Wrap(e0, "got EOF from subprocess") + } else { + return nil, errors.Wrap(e0, "unmarshalling framed LedgerCloseMeta") + } + } + return &xlcm, nil +} + func (c *captiveStellarCore) PrepareRange(from uint32, to uint32) error { // `from-1` here because being able to read ledger `from-1` is a confirmation // that the range is ready. This effectively makes getting ledger #1 impossible. @@ -206,27 +288,17 @@ func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, return false, LedgerCloseMeta{}, errors.New("unexpected subprocess next-ledger") } - // ... and open - metaPipe := c.stellarCoreRunner.getMetaPipe() - if metaPipe == nil { - return false, LedgerCloseMeta{}, errors.New("missing metadata pipe") - } - // Now loop along the range until we find the ledger we want. var errOut error +loop: for { - var xlcm xdr.LedgerCloseMeta - _, e0 := xdr.UnmarshalFramed(metaPipe, &xlcm) - if e0 != nil { - if e0 == io.EOF { - errOut = errors.Wrap(e0, "got EOF from subprocess") - break - } else { - errOut = errors.Wrap(e0, "unmarshalling framed LedgerCloseMeta") - break - } + metaResult := <-c.metaC + if metaResult.err != nil { + errOut = metaResult.err + break loop } - seq, e1 := peekLedgerSequence(&xlcm) + + seq, e1 := peekLedgerSequence(metaResult.LedgerCloseMeta) if e1 != nil { errOut = e1 break @@ -243,7 +315,7 @@ func (c *captiveStellarCore) GetLedger(sequence uint32) (bool, LedgerCloseMeta, if seq == sequence { // Found the requested seq var lcm LedgerCloseMeta - e2 := c.copyLedgerCloseMeta(&xlcm, &lcm) + e2 := c.copyLedgerCloseMeta(metaResult.LedgerCloseMeta, &lcm) if e2 != nil { errOut = e2 break @@ -300,6 +372,19 @@ func (c *captiveStellarCore) Close() error { c.nextLedger = 0 c.nextLedgerMutex.Unlock() + if c.stop != nil { + close(c.stop) + // discard pending data in case the goroutine is blocked writing to the channel + select { + case <-c.metaC: + default: + } + // Do not close the communication channel until we know + // the goroutine is done + c.wait.Wait() + close(c.metaC) + } + c.lastLedger = nil err := c.stellarCoreRunner.close() diff --git a/exp/ingest/ledgerbackend/stellar_core_runner_posix.go b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go index a0a088cc52..f0f80e8859 100644 --- a/exp/ingest/ledgerbackend/stellar_core_runner_posix.go +++ b/exp/ingest/ledgerbackend/stellar_core_runner_posix.go @@ -3,7 +3,6 @@ package ledgerbackend import ( - "bufio" "os" "syscall" @@ -50,7 +49,7 @@ func (c *stellarCoreRunner) start() error { cmd := c.cmd go cmd.Wait() - c.metaPipe = bufio.NewReaderSize(readFile, 1024*1024) + c.metaPipe = readFile return nil }