diff --git a/beacon-depositors-transactions/beacon_depositors_transactions.go b/beacon-depositors-transactions/beacon_depositors_transactions.go index 6504051..f9e12a2 100644 --- a/beacon-depositors-transactions/beacon_depositors_transactions.go +++ b/beacon-depositors-transactions/beacon_depositors_transactions.go @@ -72,7 +72,7 @@ func NewBeaconDepositorsTransactions(pCtx context.Context, iConfig *config.Beaco ethClient: elClient, contractABI: contractABI, alchemyClient: alchemyClient, - routineClosed: make(chan struct{}), + routineClosed: make(chan struct{}, 1), checkpointsProcessed: &atomic.Uint64{}, }, nil } diff --git a/cmd/beacon-depositors-transactions/cmd.go b/cmd/beacon-depositors-transactions/cmd.go index 6c00025..e8f6a9d 100644 --- a/cmd/beacon-depositors-transactions/cmd.go +++ b/cmd/beacon-depositors-transactions/cmd.go @@ -29,7 +29,7 @@ var BeaconDepositorsTransactionsCommand = &cli.Command{ Name: "db-url", Usage: "Database where to store transactions", EnvVars: []string{"DB_URL"}, - DefaultText: "postgres://user:password@localhost:5432/goteth", + DefaultText: "postgres://user:password@localhost:5432/dbName", }, &cli.StringFlag{ Name: "log-level", diff --git a/cmd/identify/cmd.go b/cmd/identify/cmd.go index 136011d..930e724 100644 --- a/cmd/identify/cmd.go +++ b/cmd/identify/cmd.go @@ -29,7 +29,7 @@ var IdentifyCommand = &cli.Command{ Name: "db-url", Usage: "Database where to store transactions", EnvVars: []string{"DB_URL"}, - DefaultText: "postgres://user:password@localhost:5432/goteth", + DefaultText: "postgres://user:password@localhost:5432/dbName", }, &cli.StringFlag{ Name: "log-level", diff --git a/db/beacon_depositors_transactions.go b/db/beacon_depositors_transactions.go index 34525e0..4a7e84c 100644 --- a/db/beacon_depositors_transactions.go +++ b/db/beacon_depositors_transactions.go @@ -13,6 +13,7 @@ import ( "github.com/google/uuid" pgx "github.com/jackc/pgx/v5" "github.com/migalabs/eth-pokhar/models" + "github.com/pkg/errors" ) // Postgres intregration variables @@ -32,7 +33,13 @@ var ( ) func (p *PostgresDBService) ObtainCheckpointPerDepositor() ([]models.DepositorCheckpoint, error) { - rows, err := p.psqlPool.Query(p.ctx, selectCheckpointPerDepositor) + conn, err := p.psqlPool.Acquire(p.ctx) + if err != nil { + return nil, errors.Wrap(err, "error acquiring database connection") + } + defer conn.Release() + + rows, err := conn.Query(p.ctx, selectCheckpointPerDepositor) if err != nil { rows.Close() return nil, err diff --git a/db/beacon_deposits.go b/db/beacon_deposits.go index bc2d8d4..72d71e4 100644 --- a/db/beacon_deposits.go +++ b/db/beacon_deposits.go @@ -23,7 +23,12 @@ var ( ) func (p *PostgresDBService) ObtainLastDeposit() (models.BeaconDeposit, error) { - rows, err := p.psqlPool.Query(p.ctx, selectLastDeposit) + conn, err := p.psqlPool.Acquire(p.ctx) + if err != nil { + return models.BeaconDeposit{}, errors.Wrap(err, "error acquiring connection") + } + defer conn.Release() + rows, err := conn.Query(p.ctx, selectLastDeposit) if err != nil { rows.Close() return models.BeaconDeposit{}, errors.Wrap(err, "error obtaining last epoch from database") @@ -54,8 +59,14 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in deposits := beaconDepositToCopyData(rowSrc) + conn, err := p.psqlPool.Acquire(p.ctx) + if err != nil { + wlog.Fatalf("could not acquire connection: %s", err.Error()) + } + defer conn.Release() + // Create a temporary table with a unique constraint - _, err := p.psqlPool.Exec(p.ctx, ` + _, err = conn.Exec(p.ctx, ` CREATE TEMP TABLE IF NOT EXISTS `+tempTableName+` ( f_block_num bigint, f_depositor text, @@ -68,7 +79,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in } // Copy data into the temporary table, ignoring duplicates - _, err = p.psqlPool.CopyFrom( + _, err = conn.CopyFrom( p.ctx, pgx.Identifier{tempTableName}, []string{"f_block_num", "f_depositor", "f_tx_hash", "f_validator_pubkey"}, @@ -79,7 +90,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in } // Insert non-duplicate rows from the temporary table into the target table - count, err := p.psqlPool.Exec(p.ctx, ` + count, err := conn.Exec(p.ctx, ` INSERT INTO t_beacon_deposits (f_block_num, f_depositor, f_tx_hash, f_validator_pubkey) SELECT f_block_num, f_depositor, f_tx_hash, f_validator_pubkey FROM `+tempTableName+` @@ -90,7 +101,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in } // Drop the temporary table - _, err = p.psqlPool.Exec(p.ctx, `DROP TABLE IF EXISTS `+tempTableName) + _, err = conn.Exec(p.ctx, `DROP TABLE IF EXISTS `+tempTableName) if err != nil { wlog.Fatalf("could not drop temporary table: %s", err.Error()) }