Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ln/merge mem reduction into feature pos #588

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ jobs:
go test -run TestAssociations -v ./lib &&
go test -run TestDAOCoinLimitOrder -v ./lib &&
go test -run TestFreezingPosts -v ./lib &&
go test -run TestBalanceModelAssociations -v ./lib
go test -run TestBalanceModelAssociations -v ./lib &&
go test -run TestPGGenesisBlock -v ./lib

- name: Rollback migrations
run: go run scripts/migrate.go rollback
3 changes: 2 additions & 1 deletion integration_testing/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ func listenForBlockHeight(t *testing.T, node *cmd.Node, height uint32, signal ch
}()
}

// listenForBlockHeight busy-waits until the node's block tip reaches provided height.
// disconnectAtBlockHeight busy-waits until the node's block tip reaches provided height, and then disconnects
// from the provided bridge.
func disconnectAtBlockHeight(t *testing.T, syncingNode *cmd.Node, bridge *ConnectionBridge, height uint32) {
listener := make(chan bool)
listenForBlockHeight(t, syncingNode, height, listener)
Expand Down
10 changes: 5 additions & 5 deletions lib/block_view_bitcoin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func _dumpAndLoadMempool(t *testing.T, mempool *DeSoMempool) {
newMempool := NewDeSoMempool(
mempool.bc, 0, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
mempool.dataDir, "")
mempool.dataDir, "", mempool.useDefaultBadgerOptions)
mempool.mempoolDir = ""
newMempool.mempoolDir = mempoolDir
newMempool.LoadTxnsFromDB()
Expand Down Expand Up @@ -370,7 +370,7 @@ func TestBitcoinExchange(t *testing.T) {
newMP := NewDeSoMempool(chain, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)

// Validating the first Bitcoin burn transaction via a UtxoView should
Expand Down Expand Up @@ -1076,7 +1076,7 @@ func TestBitcoinExchangeGlobalParams(t *testing.T) {
newMP := NewDeSoMempool(chain, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)

//// Validating the first Bitcoin burn transaction via a UtxoView should
Expand Down Expand Up @@ -1808,7 +1808,7 @@ func TestSpendOffOfUnminedTxnsBitcoinExchange(t *testing.T) {
newMP := NewDeSoMempool(chain, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)

// The amount of work on the first burn transaction should be zero.
Expand Down Expand Up @@ -2419,7 +2419,7 @@ func TestBitcoinExchangeWithAmountNanosNonZeroAtGenesis(t *testing.T) {
newMP := NewDeSoMempool(chain, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)

// The amount of work on the first burn transaction should be zero.
Expand Down
4 changes: 2 additions & 2 deletions lib/block_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func (tes *transactionTestSuite) InitializeChainAndGetTestMeta(useBadger bool, u
pgPort = config.testPostgresPort
}
chain, params, embpg = NewLowDifficultyBlockchainWithParamsAndDb(tes.t, &DeSoTestnetParams,
true, pgPort)
true, pgPort, false)
mempool, miner = NewTestMiner(config.t, chain, params, true /*isSender*/)
pg = chain.postgres
db = chain.db
Expand Down Expand Up @@ -1431,7 +1431,7 @@ func TestUpdateGlobalParams(t *testing.T) {
txn := _assembleBasicTransferTxnFullySigned(t, chain, 200, 200, m0Pub, moneyPkString, m0Priv, mempool)
txn.TxnNonce.ExpirationBlockHeight = uint64(chain.blockTip().Height + 1 + 5001)
_signTxn(t, txn, m0Priv)
newMP := NewDeSoMempool(chain, 0, 0, "", true, "", "")
newMP := NewDeSoMempool(chain, 0, 0, "", true, "", "", true)
_, _, err = newMP.TryAcceptTransaction(txn, false, false)
require.Error(err)
require.Contains(err.Error(), TxErrorNonceExpirationBlockHeightOffsetExceeded)
Expand Down
2 changes: 1 addition & 1 deletion lib/block_view_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func TestUtxoEntryEncodeDecode(t *testing.T) {
newMP := NewDeSoMempool(chain, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)

// Validating the first Bitcoin burn transaction via a UtxoView should
Expand Down
6 changes: 4 additions & 2 deletions lib/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,11 @@ func (bc *Blockchain) _initChain() error {

if bc.postgres != nil {
err = bc.postgres.InitGenesisBlock(bc.params, bc.db)
} else {
err = InitDbWithDeSoGenesisBlock(bc.params, bc.db, bc.eventManager, bc.snapshot)
if err != nil {
return errors.Wrapf(err, "_initChain: Problem initializing postgres with genesis block")
}
}
err = InitDbWithDeSoGenesisBlock(bc.params, bc.db, bc.eventManager, bc.snapshot, bc.postgres)
if err != nil {
return errors.Wrapf(err, "_initChain: Problem initializing db with genesis block")
}
Expand Down
34 changes: 28 additions & 6 deletions lib/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func AppendToMemLog(t *testing.T, prefix string) {
f, err := os.OpenFile("mem.log", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err == nil {
defer f.Close()
if _, err := f.WriteString(fmt.Sprintf("%s\t%s\tMemory Usage\t%v\n", prefix, t.Name(), float64(mem.Alloc)/float64(1e9))); err != nil {
if _, err := f.WriteString(fmt.Sprintf("%s\t%s\tMemory Usage\t%v\tTotal Alloc\t%v\n", prefix, t.Name(), float64(mem.Alloc)/float64(1e9), float64(mem.TotalAlloc)/float64(1e9))); err != nil {
log.Println(err)
}
}
Expand All @@ -205,11 +205,11 @@ func NewLowDifficultyBlockchainWithParams(t *testing.T, params *DeSoParams) (
// Set the number of txns per view regeneration to one while creating the txns
ReadOnlyUtxoViewRegenerationIntervalTxns = 1

chain, params, _ := NewLowDifficultyBlockchainWithParamsAndDb(t, params, len(os.Getenv("POSTGRES_URI")) > 0, 0)
chain, params, _ := NewLowDifficultyBlockchainWithParamsAndDb(t, params, len(os.Getenv("POSTGRES_URI")) > 0, 0, false)
return chain, params, chain.db
}

func NewLowDifficultyBlockchainWithParamsAndDb(t *testing.T, params *DeSoParams, usePostgres bool, postgresPort uint32) (
func NewLowDifficultyBlockchainWithParamsAndDb(t *testing.T, params *DeSoParams, usePostgres bool, postgresPort uint32, useProvidedParams bool) (
*Blockchain, *DeSoParams, *embeddedpostgres.EmbeddedPostgres) {
TestDeSoEncoderSetup(t)
AppendToMemLog(t, "START")
Expand Down Expand Up @@ -237,13 +237,16 @@ func NewLowDifficultyBlockchainWithParamsAndDb(t *testing.T, params *DeSoParams,
}

timesource := chainlib.NewMedianTime()
testParams := NewTestParams(params)
testParams := *params
if !useProvidedParams {
testParams = NewTestParams(params)
}

// Temporarily modify the seed balances to make a specific public
// key have some DeSo
var snap *Snapshot
if !usePostgres {
snap, err, _ = NewSnapshot(db, dbDir, SnapshotBlockHeightPeriod, false, false, &testParams, false)
snap, err, _ = NewSnapshot(db, dbDir, SnapshotBlockHeightPeriod, false, false, &testParams, false, true)
if err != nil {
log.Fatal(err)
}
Expand All @@ -260,6 +263,12 @@ func NewLowDifficultyBlockchainWithParamsAndDb(t *testing.T, params *DeSoParams,
snap.Stop()
CleanUpBadger(snap.SnapshotDb)
}
if embpg != nil {
err = embpg.Stop()
if err != nil {
glog.Errorf("Error stopping embedded pg: %v", err)
}
}
CleanUpBadger(db)
TestDeSoEncoderShutdown(t)
AppendToMemLog(t, "CLEANUP_END")
Expand Down Expand Up @@ -324,7 +333,7 @@ func NewTestMiner(t *testing.T, chain *Blockchain, params *DeSoParams, isSender
mempool := NewDeSoMempool(
chain, 0, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
minerPubKeys := []string{}
if isSender {
minerPubKeys = append(minerPubKeys, senderPkString)
Expand Down Expand Up @@ -1742,3 +1751,16 @@ func TestForbiddenBlockSignaturePubKey(t *testing.T) {
require.Error(err)
require.Contains(err.Error(), RuleErrorForbiddenBlockProducerPublicKey)
}

func TestPGGenesisBlock(t *testing.T) {
// We skip this test in buildkite CI, but include it in GH actions postgres testing.
// Comment out this conditional to test locally.
if len(os.Getenv("POSTGRES_URI")) == 0 {
return
}
chain, params, _ := NewLowDifficultyBlockchainWithParamsAndDb(t, &DeSoTestnetParams, true, 5435, true)
for _, seedBalance := range params.SeedBalances {
bal := chain.postgres.GetBalance(NewPublicKey(seedBalance.PublicKey))
require.Equal(t, bal, seedBalance.AmountNanos)
}
}
9 changes: 6 additions & 3 deletions lib/db_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5096,7 +5096,7 @@ func DbBulkDeleteHeightHashToNodeInfo(handle *badger.DB, snap *Snapshot,
// InitDbWithGenesisBlock initializes the database to contain only the genesis
// block.
func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB,
eventManager *EventManager, snap *Snapshot) error {
eventManager *EventManager, snap *Snapshot, postgres *Postgres) error {
// Construct a node for the genesis block. Its height is zero and it has
// no parents. Its difficulty should be set to the initial
// difficulty specified in the parameters and it should be assumed to be
Expand Down Expand Up @@ -5154,7 +5154,7 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB,
// think things are initialized because we set the best block hash at the
// top. We should fix this at some point so that an error in this step
// wipes out the best hash.
utxoView, err := NewUtxoView(handle, params, nil, snap)
utxoView, err := NewUtxoView(handle, params, postgres, snap)
if err != nil {
return fmt.Errorf(
"InitDbWithDeSoGenesisBlock: Error initializing UtxoView")
Expand Down Expand Up @@ -5224,7 +5224,6 @@ func InitDbWithDeSoGenesisBlock(params *DeSoParams, handle *badger.DB,
UtxoOps: utxoOpsForBlock,
})
}

// Flush all the data in the view.
err = utxoView.FlushToDb(0)
if err != nil {
Expand Down Expand Up @@ -9849,6 +9848,10 @@ func PerformanceBadgerOptions(dir string) badger.Options {
return opts
}

func DefaultBadgerOptions(dir string) badger.Options {
return badger.DefaultOptions(dir)
}

// ---------------------------------------------
// Associations
// ---------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions lib/db_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func GetTestBadgerDb() (_db *badger.DB, _dir string) {
}

// Open a badgerdb in a temporary directory.
opts := PerformanceBadgerOptions(dir)
opts := DefaultBadgerOptions(dir)
opts.Dir = dir
opts.ValueDir = dir
// Turn off logging for tests.
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestInitDbWithGenesisBlock(t *testing.T) {
db, _ := GetTestBadgerDb()
defer CleanUpBadger(db)

err := InitDbWithDeSoGenesisBlock(&DeSoTestnetParams, db, nil, nil)
err := InitDbWithDeSoGenesisBlock(&DeSoTestnetParams, db, nil, nil, nil)
require.NoError(err)

// Check the block index.
Expand Down
2 changes: 1 addition & 1 deletion lib/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestComputeMaxTPS(t *testing.T) {
newMP := NewDeSoMempool(mempool.bc, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
mempool.resetPool(newMP)
{
timeStart := time.Now()
Expand Down
25 changes: 19 additions & 6 deletions lib/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,17 @@ type DeSoMempool struct {
// We pass a copy of the data dir flag to the tx pool so that we can instantiate
// temp badger db instances and dump mempool txns to them.
dataDir string

// set to true for tests. This lowers the memory requirements of the mempool
// by using DefaultBadgerOptions instead of PerformanceBadgerOptions.
useDefaultBadgerOptions bool
}

func (mp *DeSoMempool) getBadgerOptions(dir string) badger.Options {
if mp.useDefaultBadgerOptions {
return DefaultBadgerOptions(dir)
}
return PerformanceBadgerOptions(dir)
}

// See comment on RemoveUnconnectedTxn. The mempool lock must be called for writing
Expand Down Expand Up @@ -321,6 +332,7 @@ func (mp *DeSoMempool) resetPool(newPool *DeSoMempool) {
mp.backupUniversalUtxoView = newPool.backupUniversalUtxoView
mp.universalUtxoView = newPool.universalUtxoView
mp.universalTransactionList = newPool.universalTransactionList
mp.useDefaultBadgerOptions = newPool.useDefaultBadgerOptions

// We don't adjust blockCypherAPIKey or blockCypherCheckDoubleSpendChan
// since those should be unaffected
Expand Down Expand Up @@ -381,7 +393,7 @@ func (mp *DeSoMempool) UpdateAfterConnectBlock(blk *MsgDeSoBlock) (_txnsAddedToM
0, /* minFeeRateNanosPerKB */
"", /*blockCypherAPIKey*/
false, /*runReadOnlyViewUpdater*/
"" /*dataDir*/, "")
"" /*dataDir*/, "", mp.useDefaultBadgerOptions)

// Get all the transactions from the old pool object.
oldMempoolTxns, oldUnconnectedTxns, err := mp._getTransactionsOrderedByTimeAdded()
Expand Down Expand Up @@ -485,7 +497,7 @@ func (mp *DeSoMempool) UpdateAfterDisconnectBlock(blk *MsgDeSoBlock) {
newPool := NewDeSoMempool(mp.bc, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", mp.useDefaultBadgerOptions)

// Add the transactions from the block to the new pool (except for the block reward,
// which should always be the first transaction). Break out if we encounter
Expand Down Expand Up @@ -767,7 +779,7 @@ func (mp *DeSoMempool) OpenTempDBAndDumpTxns() error {
if err != nil {
return fmt.Errorf("OpenTempDBAndDumpTxns: Error making top-level dir: %v", err)
}
tempMempoolDBOpts := PerformanceBadgerOptions(tempMempoolDBDir)
tempMempoolDBOpts := mp.getBadgerOptions(tempMempoolDBDir)
tempMempoolDBOpts.ValueDir = tempMempoolDBDir
tempMempoolDB, err := badger.Open(tempMempoolDBOpts)
if err != nil {
Expand Down Expand Up @@ -2388,7 +2400,7 @@ func (mp *DeSoMempool) inefficientRemoveTransaction(tx *MsgDeSoTxn) {
newPool := NewDeSoMempool(mp.bc, 0, /* rateLimitFeeRateNanosPerKB */
0, /* minFeeRateNanosPerKB */
"" /*blockCypherAPIKey*/, false,
"" /*dataDir*/, "")
"" /*dataDir*/, "", mp.useDefaultBadgerOptions)
// At this point the block txns have been added to the new pool. Now we need to
// add the txns from the original pool. Start by fetching them in slice form.
oldMempoolTxns, oldUnconnectedTxns, err := mp._getTransactionsOrderedByTimeAdded()
Expand Down Expand Up @@ -2562,7 +2574,7 @@ func (mp *DeSoMempool) LoadTxnsFromDB() {
}

// If we make it this far, we found a mempool dump to load. Woohoo!
tempMempoolDBOpts := PerformanceBadgerOptions(savedTxnsDir)
tempMempoolDBOpts := mp.getBadgerOptions(savedTxnsDir)
tempMempoolDBOpts.ValueDir = savedTxnsDir
glog.Infof("LoadTxnsFrom: Opening new temp db %v", savedTxnsDir)
tempMempoolDB, err := badger.Open(tempMempoolDBOpts)
Expand Down Expand Up @@ -2600,7 +2612,7 @@ func (mp *DeSoMempool) Stop() {
// Create a new pool with no transactions in it.
func NewDeSoMempool(_bc *Blockchain, _rateLimitFeerateNanosPerKB uint64,
_minFeerateNanosPerKB uint64, _blockCypherAPIKey string,
_runReadOnlyViewUpdater bool, _dataDir string, _mempoolDumpDir string) *DeSoMempool {
_runReadOnlyViewUpdater bool, _dataDir string, _mempoolDumpDir string, useDefaultBadgerOptions bool) *DeSoMempool {

utxoView, _ := NewUtxoView(_bc.db, _bc.params, _bc.postgres, _bc.snapshot)
backupUtxoView, _ := NewUtxoView(_bc.db, _bc.params, _bc.postgres, _bc.snapshot)
Expand All @@ -2624,6 +2636,7 @@ func NewDeSoMempool(_bc *Blockchain, _rateLimitFeerateNanosPerKB uint64,
readOnlyUniversalTransactionMap: make(map[BlockHash]*MempoolTx),
readOnlyOutpoints: make(map[UtxoKey]*MsgDeSoTxn),
dataDir: _dataDir,
useDefaultBadgerOptions: useDefaultBadgerOptions,
}

if newPool.mempoolDir != "" {
Expand Down
10 changes: 5 additions & 5 deletions lib/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMempoolLongChainOfDependencies(t *testing.T) {
mp := NewDeSoMempool(
chain, 0, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
_, err := mp.processTransaction(txn1, false /*allowUnconnectedTxn*/, false /*rateLimit*/, 0 /*peerID*/, true /*verifySignatures*/)
require.NoError(err)

Expand Down Expand Up @@ -118,7 +118,7 @@ func TestMempoolRateLimit(t *testing.T) {
mpNoMinFees := NewDeSoMempool(
chain, 0, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)

// Create a transaction that sends 1 DeSo to the recipient as its
// zeroth output.
Expand All @@ -134,7 +134,7 @@ func TestMempoolRateLimit(t *testing.T) {
mpWithMinFee := NewDeSoMempool(
chain, 0, /* rateLimitFeeRateNanosPerKB */
100 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
_, err = mpWithMinFee.processTransaction(txn1, false /*allowUnconnectedTxn*/, true /*rateLimit*/, 0 /*peerID*/, false /*verifySignatures*/)
require.Error(err)
require.Contains(err.Error(), TxErrorInsufficientFeeMinFee)
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestMempoolRateLimit(t *testing.T) {
mpWithRateLimit := NewDeSoMempool(
chain, 100, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)
processingErrors := []error{}
for _, txn := range txnsCreated {
_, err := mpWithRateLimit.processTransaction(txn, false /*allowUnconnectedTxn*/, true /*rateLimit*/, 0 /*peerID*/, false /*verifySignatures*/)
Expand Down Expand Up @@ -311,7 +311,7 @@ func TestMempoolAugmentedUtxoViewTransactionChain(t *testing.T) {
mp := NewDeSoMempool(
chain, 0, /* rateLimitFeeRateNanosPerKB */
0 /* minFeeRateNanosPerKB */, "", true,
"" /*dataDir*/, "")
"" /*dataDir*/, "", true)

// Process the first transaction.
mempoolTx1, err := mp.processTransaction(txn1, false /*allowUnconnectedTxn*/, false /*rateLimit*/, 0 /*peerID*/, true /*verifySignatures*/)
Expand Down
Loading