Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the latest version of the Stellar ingestion library #122

Merged
merged 5 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/export_ledgers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ var executableName = "stellar-etl"
var archiveURL = "http://history.stellar.org/prd/core-live/core_live_001"
var latestLedger = getLastSeqNum()
var update = flag.Bool("update", false, "update the golden files of this test")
var backend, _ = utils.CreateBackend()

type cliTest struct {
name string
Expand All @@ -42,7 +41,6 @@ func TestMain(m *testing.M) {

flag.Parse()
exitCode := m.Run()
backend.Close()
os.Exit(exitCode)
}

Expand Down Expand Up @@ -148,7 +146,10 @@ func removeCoreLogging(loggerOutput string) string {
}

func getLastSeqNum() uint32 {
num, _ := backend.GetLatestLedgerSequence()
num, err := utils.GetLatestLedgerSequence()
if err != nil {
panic(err)
}
return num
}

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ require (
github.com/fatih/structs v1.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gavv/monotime v0.0.0-20190418164738-30dba4353424 // indirect
github.com/gdexlab/go-render v1.0.1
github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a
github.com/gdexlab/go-render v1.0.1 // indirect
github.com/getlantern/deepcopy v0.0.0-20160317154340-7f45deb8130a // indirect
github.com/go-chi/chi v4.1.2+incompatible // indirect
github.com/go-errors/errors v1.1.1 // indirect
github.com/go-sql-driver/mysql v1.5.0 // indirect
Expand Down Expand Up @@ -60,7 +60,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.7.1
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9
github.com/stellar/go v0.0.0-20210210172739-2df91a0a5d4f
github.com/stretchr/objx v0.3.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/ugorji/go v1.1.4 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ github.com/stellar/go v0.0.0-20201217225215-bb1a017ef327 h1:k0aXRJLOYUhiclG4xdDY
github.com/stellar/go v0.0.0-20201217225215-bb1a017ef327/go.mod h1:u39t8VPN26U8w6UaGoVqWN4e9rdTVNCji/Q0HlZqIbY=
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9 h1:frpZn2I+F1M9kpBGX1UgMPmaad1muqa0P14uVvHcu6o=
github.com/stellar/go v0.0.0-20201218151145-990a09044ba9/go.mod h1:u39t8VPN26U8w6UaGoVqWN4e9rdTVNCji/Q0HlZqIbY=
github.com/stellar/go v0.0.0-20210119090801-9791cbfaaa0b h1:xLW3PNG4pI6ReDz0m0cMMLeGQp+ke/gNX7dTmXaEOzU=
github.com/stellar/go v0.0.0-20210119090801-9791cbfaaa0b/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210208222255-f071f3b0df9a h1:e83OUc9ndYZMpCybfNsffruzJa5aHPahmqNVjk4ppu8=
github.com/stellar/go v0.0.0-20210208222255-f071f3b0df9a/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go v0.0.0-20210210172739-2df91a0a5d4f h1:ESi1imNB0dljhgn0/NM9dQqYzUNbXPksrBhSD6c5N7Y=
github.com/stellar/go v0.0.0-20210210172739-2df91a0a5d4f/go.mod h1:YWQ5olwDH1GXH6tPMvmJb6Fw4hBn73JYw+hOSfky9qU=
github.com/stellar/go-xdr v0.0.0-20200331223602-71a1e6d555f2 h1:K9H+A+eWe8ZlnpNha+pXbEK+jtIluQp/2dKxkK8k7OE=
github.com/stellar/go-xdr v0.0.0-20200331223602-71a1e6d555f2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps=
github.com/stellar/go-xdr v0.0.0-20201028102745-f80a23dac78a h1:GnM0ArRp7EDbaTiFhSp/CLgyk2cacXxdUklqJmdJs1Q=
Expand Down
22 changes: 6 additions & 16 deletions internal/input/assets.go
Original file line number Diff line number Diff line change
@@ -1,40 +1,30 @@
package input

import (
ingestio "github.com/stellar/go/ingest/io"
"io"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
"github.com/stellar/stellar-etl/internal/utils"
)

// GetPaymentOperations returns a slice of payment operations that can include new assets from the ledgers in the provided range (inclusive on both ends)
func GetPaymentOperations(start, end uint32, limit int64) ([]OperationTransformInput, error) {
backend, err := utils.CreateBackend()
if err != nil {
return []OperationTransformInput{}, err
}

defer backend.Close()

latestNum, err := backend.GetLatestLedgerSequence()
if err != nil {
return []OperationTransformInput{}, err
}

err = validateLedgerRange(start, end, latestNum)
backend, err := utils.CreateBackend(start, end)
if err != nil {
return []OperationTransformInput{}, err
}

opSlice := []OperationTransformInput{}
for seq := start; seq <= end; seq++ {
txReader, err := ingestio.NewLedgerTransactionReader(backend, publicPassword, seq)
txReader, err := ingest.NewLedgerTransactionReader(backend, publicPassword, seq)
if err != nil {
return []OperationTransformInput{}, err
}

for int64(len(opSlice)) < limit || limit < 0 {
tx, err := txReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
break
}

Expand Down
32 changes: 15 additions & 17 deletions internal/input/bucketlist_entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,60 @@ package input

import (
"context"
"io"

"github.com/stellar/stellar-etl/internal/utils"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest/adapters"
ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

var archiveStellarURL = "http://history.stellar.org/prd/core-live/core_live_001"

// GetEntriesFromGenesis returns a slice of ledger entries of the specified type for the ledgers starting from the genesis ledger and ending at end (inclusive)
func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType) ([]ingestio.Change, error) {
func GetEntriesFromGenesis(end uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) {
archive, err := historyarchive.Connect(
archiveStellarURL,
historyarchive.ConnectOptions{Context: context.Background()},
)
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

historyAdapter := adapters.MakeHistoryArchiveAdapter(archive)
latestNum, err := historyAdapter.GetLatestLedgerSequence()
latestNum, err := utils.GetLatestLedgerSequence()
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

err = validateLedgerRange(1, end, latestNum)
if err != nil {
return []ingestio.Change{}, err
if err = utils.ValidateLedgerRange(1, end, latestNum); err != nil {
return []ingest.Change{}, err
}

checkpointSeq, err := utils.GetCheckpointNum(end, latestNum)
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

return readBucketList(archive, checkpointSeq, entryType)
}

func readBucketList(archive *historyarchive.Archive, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingestio.Change, error) {
changeReader, err := ingestio.MakeSingleLedgerStateReader(context.Background(), archive, checkpointSeq)
func readBucketList(archive *historyarchive.Archive, checkpointSeq uint32, entryType xdr.LedgerEntryType) ([]ingest.Change, error) {
changeReader, err := ingest.NewCheckpointChangeReader(context.Background(), archive, checkpointSeq)
defer changeReader.Close()
if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

entrySlice := []ingestio.Change{}
entrySlice := []ingest.Change{}
for {
change, err := changeReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
break
}

if err != nil {
return []ingestio.Change{}, err
return []ingest.Change{}, err
}

if change.Type == entryType {
Expand Down
38 changes: 11 additions & 27 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package input

import (
"fmt"
"io"
"math"

ingestio "github.com/stellar/go/ingest/io"
ingest "github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
Expand All @@ -17,28 +18,12 @@ const password = network.PublicNetworkPassphrase

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes []ingestio.Change
Changes []ingest.Change
BatchStart uint32
BatchEnd uint32
Type xdr.LedgerEntryType
}

func getLatestLedgerNumber() (uint32, error) {
backend, err := utils.CreateBackend()
if err != nil {
return 0, err
}

defer backend.Close()

latestNum, err := backend.GetLatestLedgerSequence()
if err != nil {
return 0, err
}

return latestNum, nil
}

// PrepareCaptiveCore creates a new captive core instance and prepares it with the given range. The range is unbounded when end = 0, and is bounded and validated otherwise
func PrepareCaptiveCore(execPath, configPath string, start, end uint32) (*ledgerbackend.CaptiveStellarCore, error) {
captiveBackend, err := ledgerbackend.NewCaptive(
Expand All @@ -57,13 +42,12 @@ func PrepareCaptiveCore(execPath, configPath string, start, end uint32) (*ledger

if end != 0 {
ledgerRange = ledgerbackend.BoundedRange(start, end)
latest, err := getLatestLedgerNumber()
latest, err := utils.GetLatestLedgerSequence()
if err != nil {
return &ledgerbackend.CaptiveStellarCore{}, err
}

err = validateLedgerRange(start, end, latest)
if err != nil {
if err = utils.ValidateLedgerRange(start, end, latest); err != nil {
return &ledgerbackend.CaptiveStellarCore{}, err
}
}
Expand Down Expand Up @@ -112,10 +96,10 @@ func closeChannels(accChannel, offChannel, trustChannel chan ChangeBatch) {
}
}

func addLedgerChangesToCache(changeReader *ingestio.LedgerChangeReader, accCache, offCache, trustCache *ingestio.LedgerEntryChangeCache) error {
func addLedgerChangesToCache(changeReader *ingest.LedgerChangeReader, accCache, offCache, trustCache *ingest.ChangeCompactor) error {
for {
change, err := changeReader.Read()
if err == ingestio.EOF {
if err == io.EOF {
return nil
}

Expand Down Expand Up @@ -147,9 +131,9 @@ func addLedgerChangesToCache(changeReader *ingestio.LedgerChangeReader, accCache

// exportBatch gets the changes from the ledgers in the range [batchStart, batchEnd), compacts them, and sends them to the proper channels
func exportBatch(batchStart, batchEnd uint32, core *ledgerbackend.CaptiveStellarCore, accChannel, offChannel, trustChannel chan ChangeBatch, logger *log.Entry) {
accChanges := ingestio.NewLedgerEntryChangeCache()
offChanges := ingestio.NewLedgerEntryChangeCache()
trustChanges := ingestio.NewLedgerEntryChangeCache()
accChanges := ingest.NewChangeCompactor()
offChanges := ingest.NewChangeCompactor()
trustChanges := ingest.NewChangeCompactor()
for seq := batchStart; seq < batchEnd; {
latestLedger, err := core.GetLatestLedgerSequence()
if err != nil {
Expand All @@ -159,7 +143,7 @@ func exportBatch(batchStart, batchEnd uint32, core *ledgerbackend.CaptiveStellar
// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented and we try again on the next iteration of the loop
if seq <= latestLedger {
changeReader, err := ingestio.NewLedgerChangeReader(core, password, seq)
changeReader, err := ingest.NewLedgerChangeReader(core, password, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
Expand Down
6 changes: 3 additions & 3 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/stretchr/testify/assert"

ingestio "github.com/stellar/go/ingest/io"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -114,8 +114,8 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entry xdr.LedgerEntry) ChangeBatch {
changes := []ingestio.Change{
ingestio.Change{Type: entry.Data.Type, Post: &entry},
changes := []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
}
return ChangeBatch{
Changes: changes,
Expand Down
24 changes: 7 additions & 17 deletions internal/input/ledger_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package input

import (
"fmt"
"github.com/stellar/go/historyarchive"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to next group.

"time"

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/stellar-etl/internal/utils"
)

Expand All @@ -20,7 +20,7 @@ retrieve new graphPoints as necessary. As the sequence number increases, so does
sequence numbers that correspond to a given close time fairly easily.
*/
type graph struct {
Backend *ledgerbackend.HistoryArchiveBackend
Client historyarchive.ArchiveInterface
BeginPoint graphPoint
EndPoint graphPoint
}
Expand All @@ -41,8 +41,6 @@ func GetLedgerRange(startTime, endTime time.Time) (int64, int64, error) {
return 0, 0, err
}

defer graph.close()

err = graph.limitLedgerRange(&startTime, &endTime)
if err != nil {
return 0, 0, err
Expand All @@ -63,19 +61,15 @@ func GetLedgerRange(startTime, endTime time.Time) (int64, int64, error) {
return startLedger, endLedger, nil
}

func (g graph) close() {
g.Backend.Close()
}

// createNewGraph makes a new graph with the endpoints equal to the network's endpoints
func createNewGraph() (graph, error) {
graph := graph{}
archive, err := utils.CreateBackend()
archive, err := utils.CreateHistoryArchiveClient()
if err != nil {
return graph, err
}

graph.Backend = archive
graph.Client = archive

secondLedgerPoint, err := graph.getGraphPoint(2) // the second ledger has a real close time, unlike the 1970s close time of the genesis ledger
if err != nil {
Expand All @@ -84,12 +78,12 @@ func createNewGraph() (graph, error) {

graph.BeginPoint = secondLedgerPoint

latestNum, err := graph.Backend.GetLatestLedgerSequence()
root, err := graph.Client.GetRootHAS()
if err != nil {
return graph, err
}

latestPoint, err := graph.getGraphPoint(int64(latestNum))
latestPoint, err := graph.getGraphPoint(int64(root.CurrentLedger))
if err != nil {
return graph, err
}
Expand Down Expand Up @@ -212,11 +206,7 @@ func (g graph) limitLedgerRange(start, end *time.Time) error {

// getGraphPoint gets the graphPoint representation of the ledger with the provided sequence number
func (g graph) getGraphPoint(sequence int64) (graphPoint, error) {
ok, ledger, err := g.Backend.GetLedger(uint32(sequence))
if !ok {
return graphPoint{}, fmt.Errorf("ledger %d does not exist in history archive", sequence)
}

ledger, err := g.Client.GetLedgerHeader(uint32(sequence))
if err != nil {
return graphPoint{}, fmt.Errorf(fmt.Sprintf("unable to get ledger %d: ", sequence), err)
}
Expand Down
Loading