Skip to content

Commit

Permalink
/services/horizon/ingest: add SKIP_TXMETA (stellar#5208)
Browse files Browse the repository at this point in the history
* Revert "services/horizon: Add DISABLE_SOROBAN_INGEST flag to skip soroban ingestion processing (stellar#5176)"

This reverts commit bfaf9e1.

* stellar#5189: added optional SKIP_TXMETA parameter to not persist tx meta in transaction model, removed DISABLE_SOROBAN_INGEST, use SKIP_META instead
  • Loading branch information
sreuland committed Feb 19, 2024
1 parent 798da16 commit cae5b85
Show file tree
Hide file tree
Showing 24 changed files with 634 additions and 625 deletions.
9 changes: 9 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## 2.28.3

### Added
- New optional config `SKIP_TXMETA` ([5189](https://github.com/stellar/go/issues/5189)). Defaults to `FALSE`, when `TRUE` the following will occur:
* history_transactions.tx_meta column will have serialized xdr that equates to empty for any protocol version, such as for `xdr.TransactionMeta.V3`, `Operations`, `TxChangesAfter`, `TxChangesBefore` will be empty arrays and `SorobanMeta` will be nil.

### Breaking Changes
- Removed `DISABLE_SOROBAN_INGEST` configuration parameter, use the new `SKIP_TXMETA` parameter instead.

## 2.28.2

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
SkipSorobanIngestion: config.SkipSorobanIngestion,
SkipTxmeta: config.SkipTxmeta,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ type Config struct {
Network string
// DisableTxSub disables transaction submission functionality for Horizon.
DisableTxSub bool
// SkipSorobanIngestion skips Soroban related ingestion processing.
SkipSorobanIngestion bool
// SkipTxmeta, when enabled, will not store meta xdr in history transaction table
SkipTxmeta bool
}
10 changes: 5 additions & 5 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ const (
EnableIngestionFilteringFlagName = "exp-enable-ingestion-filtering"
// DisableTxSubFlagName is the command line flag for disabling transaction submission feature of Horizon
DisableTxSubFlagName = "disable-tx-sub"
// SkipSorobanIngestionFlagName is the command line flag for disabling Soroban related ingestion processing
SkipSorobanIngestionFlagName = "disable-soroban-ingest"
// SkipTxmeta is the command line flag for disabling persistence of tx meta in history transaction table
SkipTxmeta = "skip-txmeta"

// StellarPubnet is a constant representing the Stellar public network
StellarPubnet = "pubnet"
Expand Down Expand Up @@ -740,12 +740,12 @@ func Flags() (*Config, support.ConfigOptions) {
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: SkipSorobanIngestionFlagName,
ConfigKey: &config.SkipSorobanIngestion,
Name: SkipTxmeta,
ConfigKey: &config.SkipTxmeta,
OptType: types.Bool,
FlagDefault: false,
Required: false,
Usage: "excludes Soroban data during ingestion processing",
Usage: "excludes tx meta from persistence on transaction history",
UsedInCommands: IngestionCommands,
},
}
Expand Down
3 changes: 1 addition & 2 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Config struct {

EnableIngestionFiltering bool
MaxLedgerPerFlush uint32

SkipSorobanIngestion bool
SkipTxmeta bool
}

const (
Expand Down
14 changes: 4 additions & 10 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ func buildChangeProcessor(
source ingestionSource,
ledgerSequence uint32,
networkPassphrase string,
skipSorobanIngestion bool,
) *groupChangeProcessors {
statsChangeProcessor := &statsChangeProcessor{
StatsChangeProcessor: changeStats,
Expand Down Expand Up @@ -145,13 +144,13 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipSorobanIngestion),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipTxmeta),
processors.NewClaimableBalancesTransactionProcessor(cbLoader,
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
Expand All @@ -173,10 +172,7 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(
s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(),
s.config.SkipSorobanIngestion,
)
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta)
p = append(p, txSubProc)
}

Expand Down Expand Up @@ -239,7 +235,6 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
historyArchiveSource,
checkpointLedger,
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)

if checkpointLedger == 1 {
Expand Down Expand Up @@ -498,7 +493,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledgerSource,
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)
err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "", false)
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -201,7 +201,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "", false)
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -271,7 +271,6 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
EnableIngestionFiltering: true,
SkipSorobanIngestion: false,
}

q := &mockDBQ{}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) TestEmptyClaimabl

func (s *ClaimableBalancesTransactionProcessorTestSuiteLedger) testOperationInserts(balanceID xdr.ClaimableBalanceId, body xdr.OperationBody, change xdr.LedgerEntryChange) {
// Setup the transaction
txn := createTransaction(true, 1)
txn := createTransaction(true, 1, 2)
txn.Envelope.Operations()[0].Body = body
txn.UnsafeMeta.V = 2
txn.UnsafeMeta.V2.Operations = []xdr.OperationMeta{
Expand Down
22 changes: 2 additions & 20 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,17 @@ type EffectProcessor struct {
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
skipSoroban bool
}

func NewEffectProcessor(
accountLoader *history.AccountLoader,
batch history.EffectBatchInsertBuilder,
network string,
skipSoroban bool,
) *EffectProcessor {
return &EffectProcessor{
accountLoader: accountLoader,
batch: batch,
network: network,
skipSoroban: skipSoroban,
}
}

Expand All @@ -53,29 +50,14 @@ func (p *EffectProcessor) ProcessTransaction(
return nil
}

elidedTransaction := transaction

if p.skipSoroban &&
elidedTransaction.UnsafeMeta.V == 3 &&
elidedTransaction.UnsafeMeta.V3.SorobanMeta != nil {
elidedTransaction.UnsafeMeta.V3 = &xdr.TransactionMetaV3{
Ext: xdr.ExtensionPoint{},
TxChangesBefore: xdr.LedgerEntryChanges{},
Operations: []xdr.OperationMeta{},
TxChangesAfter: xdr.LedgerEntryChanges{},
SorobanMeta: nil,
}
}

for opi, op := range elidedTransaction.Envelope.Operations() {
for opi, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(opi),
transaction: elidedTransaction,
transaction: transaction,
operation: op,
ledgerSequence: uint32(lcm.LedgerSequence()),
network: p.network,
}

if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil {
return errors.Wrapf(err, "reading operation %v effects", operation.ID())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() {
s.accountLoader,
s.mockBatchInsertBuilder,
networkPassphrase,
false,
)

s.txs = []ingest.LedgerTransaction{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestLedgersProcessorTestSuiteLedger(t *testing.T) {
suite.Run(t, new(LedgersProcessorTestSuiteLedger))
}

func createTransaction(successful bool, numOps int) ingest.LedgerTransaction {
func createTransaction(successful bool, numOps int, metaVer int32) ingest.LedgerTransaction {
code := xdr.TransactionResultCodeTxSuccess
if !successful {
code = xdr.TransactionResultCodeTxFailed
Expand All @@ -50,6 +50,32 @@ func createTransaction(successful bool, numOps int) ingest.LedgerTransaction {
})
}
sourceAID := xdr.MustAddress("GAUJETIZVEP2NRYLUESJ3LS66NVCEGMON4UDCBCSBEVPIID773P2W6AY")

var txMeta xdr.TransactionMeta
switch metaVer {
case 3:
txMeta = xdr.TransactionMeta{
V: 3,
V3: &xdr.TransactionMetaV3{
Operations: make([]xdr.OperationMeta, numOps, numOps),
},
}
case 2:
txMeta = xdr.TransactionMeta{
V: 2,
V2: &xdr.TransactionMetaV2{
Operations: make([]xdr.OperationMeta, numOps, numOps),
},
}
case 1:
txMeta = xdr.TransactionMeta{
V: 1,
V1: &xdr.TransactionMetaV1{
Operations: make([]xdr.OperationMeta, numOps, numOps),
},
}
}

return ingest.LedgerTransaction{
Result: xdr.TransactionResultPair{
TransactionHash: xdr.Hash{},
Expand All @@ -70,12 +96,7 @@ func createTransaction(successful bool, numOps int) ingest.LedgerTransaction {
},
},
},
UnsafeMeta: xdr.TransactionMeta{
V: 2,
V2: &xdr.TransactionMetaV2{
Operations: make([]xdr.OperationMeta, numOps, numOps),
},
},
UnsafeMeta: txMeta,
}
}

Expand All @@ -94,9 +115,9 @@ func (s *LedgersProcessorTestSuiteLedger) SetupTest() {
)

s.txs = []ingest.LedgerTransaction{
createTransaction(true, 1),
createTransaction(false, 3),
createTransaction(true, 4),
createTransaction(true, 1, 1),
createTransaction(false, 3, 2),
createTransaction(true, 4, 3),
}

s.successCount = 2
Expand Down Expand Up @@ -127,8 +148,8 @@ func (s *LedgersProcessorTestSuiteLedger) TestInsertLedgerSucceeds() {
},
}
nextTransactions := []ingest.LedgerTransaction{
createTransaction(true, 1),
createTransaction(false, 2),
createTransaction(true, 1, 2),
createTransaction(false, 2, 2),
}
for _, tx := range nextTransactions {
err := s.processor.ProcessTransaction(xdr.LedgerCloseMeta{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) TestEmptyLiquidityPo

func (s *LiquidityPoolsTransactionProcessorTestSuiteLedger) testOperationInserts(poolID xdr.PoolId, body xdr.OperationBody, change xdr.LedgerEntryChange) {
// Setup the transaction
txn := createTransaction(true, 1)
txn := createTransaction(true, 1, 2)
txn.Envelope.Operations()[0].Body = body
txn.UnsafeMeta.V = 2
txn.UnsafeMeta.V2.Operations = []xdr.OperationMeta{
Expand Down
Loading

0 comments on commit cae5b85

Please sign in to comment.