Skip to content

Commit

Permalink
index/cmd/single read from txmeta files
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Bellamy committed Apr 26, 2022
1 parent 38652e2 commit ebf37a7
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 95 deletions.
170 changes: 81 additions & 89 deletions exp/lighthorizon/index/cmd/single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
Expand All @@ -26,42 +26,59 @@ var (
)

func main() {
sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files")
targetUrl := flag.String("target", "file://indexes", "where to write indexes")
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
start := flag.Int("start", -1, "ledger to start at (default: earliest)")
end := flag.Int("end", -1, "ledger to end at (default: latest)")
modules := flag.String("modules", "accounts,transactions", "comma-separated list of modules to index (default: all)")
flag.Parse()

log.SetLevel(log.InfoLevel)

indexStore, err := index.NewS3Store(&aws.Config{Region: aws.String("us-east-1")}, "", parallel)
ctx := context.Background()

indexStore, err := index.Connect(*targetUrl)
if err != nil {
panic(err)
}

historyArchive, err := historyarchive.Connect(
// "file:///Users/Bartek/archive",
"s3://history.stellar.org/prd/core-live/core_live_001",
// Simple file os access
source, err := historyarchive.ConnectBackend(
*sourceUrl,
historyarchive.ConnectOptions{
NetworkPassphrase: network.PublicNetworkPassphrase,
S3Region: "eu-west-1",
UnsignedRequests: false,
Context: context.Background(),
NetworkPassphrase: *networkPassphrase,
},
)
if err != nil {
panic(err)
}
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()

startTime := time.Now()

startCheckpoint := uint32(0) //uint32((39680056) / 64)
endCheckpoint := uint32((39685056) / 64)
all := endCheckpoint - startCheckpoint
if *start < 2 {
*start = 2
}
if *end == -1 {
latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
panic(err)
}
*end = int(latest)
}
startLedger := uint32(*start) //uint32((39680056) / 64)
endLedger := uint32(*end)
all := endLedger - startLedger

ctx := context.Background()
wg, ctx := errgroup.WithContext(ctx)

ch := make(chan uint32, parallel)

go func() {
for i := startCheckpoint; i <= endCheckpoint; i++ {
for i := startLedger; i <= endLedger; i++ {
ch <- i
}
close(ch)
Expand All @@ -70,70 +87,67 @@ func main() {
processed := uint64(0)
for i := uint32(0); i < parallel; i++ {
wg.Go(func() error {
for checkpoint := range ch {

startLedger := checkpoint * 64
if startLedger == 0 {
startLedger = 1
for ledgerSeq := range ch {
fmt.Println("Processing ledger", ledgerSeq)
ledger, err := ledgerBackend.GetLedger(ctx, ledgerSeq)
if err != nil {
log.WithField("error", err).Error("error getting ledgers")
ch <- ledgerSeq
continue
}
endLedger := checkpoint*64 - 1 + 64

// fmt.Println("Processing checkpoint", checkpoint, "ledgers", startLedger, endLedger)
checkpoint := ledgerSeq / 64

ledgers, err := historyArchive.GetLedgers(startLedger, endLedger)
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(*networkPassphrase, ledger)
if err != nil {
log.WithField("error", err).Error("error getting ledgers")
ch <- checkpoint
continue
return err
}

for i := startLedger; i <= endLedger; i++ {
ledger, ok := ledgers[i]
if !ok {
return fmt.Errorf("no ledger %d", i)
for {
tx, err := reader.Read()
if err != nil {
if err == io.EOF {
break
}
return err
}

resultMeta := make([]xdr.TransactionResultMeta, len(ledger.TransactionResult.TxResultSet.Results))
for i, result := range ledger.TransactionResult.TxResultSet.Results {
resultMeta[i].Result = result
if strings.Contains(*modules, "transactions") {
indexStore.AddTransactionToIndexes(
toid.New(int32(ledger.LedgerSequence()), int32(tx.Index), 0).ToInt64(),
tx.Result.TransactionHash,
)
}

closeMeta := xdr.LedgerCloseMeta{
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: ledger.Header,
TxSet: ledger.Transaction.TxSet,
TxProcessing: resultMeta,
},
}

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(network.PublicNetworkPassphrase, closeMeta)
if err != nil {
return err
}
if strings.Contains(*modules, "accounts") {
allParticipants, err := participantsForOperations(tx, false)
if err != nil {
return err
}

for {
tx, err := reader.Read()
err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants)
if err != nil {
if err == io.EOF {
break
}
return err
}

if strings.Contains(*modules, "transactions") {
indexStore.AddTransactionToIndexes(
toid.New(int32(closeMeta.LedgerSequence()), int32(tx.Index), 0).ToInt64(),
tx.Result.TransactionHash,
)
paymentsParticipants, err := participantsForOperations(tx, true)
if err != nil {
return err
}

if strings.Contains(*modules, "accounts") {
err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants)
if err != nil {
return err
}

if tx.Result.Successful() {
allParticipants, err := participantsForOperations(tx, false)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants)
err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants)
if err != nil {
return err
}
Expand All @@ -143,50 +157,28 @@ func main() {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants)
err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants)
if err != nil {
return err
}

if tx.Result.Successful() {
allParticipants, err := participantsForOperations(tx, false)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants)
if err != nil {
return err
}

paymentsParticipants, err := participantsForOperations(tx, true)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants)
if err != nil {
return err
}
}
}
}
}
}

nprocessed := atomic.AddUint64(&processed, 1)
nprocessed := atomic.AddUint64(&processed, 1)

if nprocessed%100 == 0 {
log.Infof(
"Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s",
(float64(nprocessed)/float64(all))*100,
time.Since(startTime).Round(1*time.Second),
(time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second),
)
if nprocessed%100 == 0 {
log.Infof(
"Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s",
(float64(nprocessed)/float64(all))*100,
time.Since(startTime).Round(1*time.Second),
(time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second),
)

// Clear indexes to save memory
if err := indexStore.Flush(); err != nil {
return err
}
// Clear indexes to save memory
if err := indexStore.Flush(); err != nil {
return err
}
}
return nil
Expand Down
31 changes: 31 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package index

import (
"errors"
"net/url"
"path/filepath"

"github.com/aws/aws-sdk-go/aws"
)

func Connect(backendUrl string) (Store, error) {
parsed, err := url.Parse(backendUrl)
if err != nil {
return nil, err
}
switch parsed.Scheme {
case "s3":
config := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
config.Region = aws.String(region)
}
return NewS3Store(config, parsed.Path, 20)

case "file":
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), 20)

default:
return nil, errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}
}
2 changes: 2 additions & 0 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type Store interface {
MergeTransactions(prefix string, other *TrieIndex) error
}

// TODO: Use a more standardized filesystem-style backend, so we can re-use
// code
type Backend interface {
Flush(map[string]map[string]*CheckpointIndex) error
FlushAccounts([]string) error
Expand Down
12 changes: 6 additions & 6 deletions exp/lighthorizon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"flag"
"net/http"

"github.com/aws/aws-sdk-go/aws"
"github.com/stellar/go/exp/lighthorizon/actions"
"github.com/stellar/go/exp/lighthorizon/archive"
"github.com/stellar/go/exp/lighthorizon/index"
Expand All @@ -16,11 +15,12 @@ import (
)

func main() {
targetUrl := flag.String("target", "gcs://horizon-archive-poc", "history archive url to read txmeta files")
sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files")
indexesUrl := flag.String("indexes", "file://indexes", "url of the indexes")
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
flag.Parse()

indexStore, err := index.NewS3Store(&aws.Config{Region: aws.String("us-east-1")}, "", 20)
indexStore, err := index.Connect(*indexesUrl)
if err != nil {
panic(err)
}
Expand All @@ -29,8 +29,8 @@ func main() {
log.Info("Starting lighthorizon!")

// Simple file os access
target, err := historyarchive.ConnectBackend(
*targetUrl,
source, err := historyarchive.ConnectBackend(
*sourceUrl,
historyarchive.ConnectOptions{
Context: context.Background(),
NetworkPassphrase: *networkPassphrase,
Expand All @@ -39,7 +39,7 @@ func main() {
if err != nil {
panic(err)
}
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(target)
ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
defer ledgerBackend.Close()
archiveWrapper := archive.Wrapper{Archive: ledgerBackend, Passphrase: *networkPassphrase}
http.HandleFunc("/operations", actions.Operations(archiveWrapper, indexStore))
Expand Down

0 comments on commit ebf37a7

Please sign in to comment.