Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/beacon depositors transactions doesnt end #7

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func NewBeaconDepositorsTransactions(pCtx context.Context, iConfig *config.Beaco
ethClient: elClient,
contractABI: contractABI,
alchemyClient: alchemyClient,
routineClosed: make(chan struct{}),
routineClosed: make(chan struct{}, 1),
checkpointsProcessed: &atomic.Uint64{},
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/beacon-depositors-transactions/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var BeaconDepositorsTransactionsCommand = &cli.Command{
Name: "db-url",
Usage: "Database where to store transactions",
EnvVars: []string{"DB_URL"},
DefaultText: "postgres://user:password@localhost:5432/goteth",
DefaultText: "postgres://user:password@localhost:5432/dbName",
},
&cli.StringFlag{
Name: "log-level",
Expand Down
2 changes: 1 addition & 1 deletion cmd/identify/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var IdentifyCommand = &cli.Command{
Name: "db-url",
Usage: "Database where to store transactions",
EnvVars: []string{"DB_URL"},
DefaultText: "postgres://user:password@localhost:5432/goteth",
DefaultText: "postgres://user:password@localhost:5432/dbName",
},
&cli.StringFlag{
Name: "log-level",
Expand Down
9 changes: 8 additions & 1 deletion db/beacon_depositors_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
pgx "github.com/jackc/pgx/v5"
"github.com/migalabs/eth-pokhar/models"
"github.com/pkg/errors"
)

// Postgres intregration variables
Expand All @@ -32,7 +33,13 @@ var (
)

func (p *PostgresDBService) ObtainCheckpointPerDepositor() ([]models.DepositorCheckpoint, error) {
rows, err := p.psqlPool.Query(p.ctx, selectCheckpointPerDepositor)
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return nil, errors.Wrap(err, "error acquiring database connection")
}
defer conn.Release()

rows, err := conn.Query(p.ctx, selectCheckpointPerDepositor)
if err != nil {
rows.Close()
return nil, err
Expand Down
21 changes: 16 additions & 5 deletions db/beacon_deposits.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ var (
)

func (p *PostgresDBService) ObtainLastDeposit() (models.BeaconDeposit, error) {
rows, err := p.psqlPool.Query(p.ctx, selectLastDeposit)
conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
return models.BeaconDeposit{}, errors.Wrap(err, "error acquiring connection")
}
defer conn.Release()
rows, err := conn.Query(p.ctx, selectLastDeposit)
if err != nil {
rows.Close()
return models.BeaconDeposit{}, errors.Wrap(err, "error obtaining last epoch from database")
Expand Down Expand Up @@ -54,8 +59,14 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in

deposits := beaconDepositToCopyData(rowSrc)

conn, err := p.psqlPool.Acquire(p.ctx)
if err != nil {
wlog.Fatalf("could not acquire connection: %s", err.Error())
}
defer conn.Release()

// Create a temporary table with a unique constraint
_, err := p.psqlPool.Exec(p.ctx, `
_, err = conn.Exec(p.ctx, `
CREATE TEMP TABLE IF NOT EXISTS `+tempTableName+` (
f_block_num bigint,
f_depositor text,
Expand All @@ -68,7 +79,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in
}

// Copy data into the temporary table, ignoring duplicates
_, err = p.psqlPool.CopyFrom(
_, err = conn.CopyFrom(
p.ctx,
pgx.Identifier{tempTableName},
[]string{"f_block_num", "f_depositor", "f_tx_hash", "f_validator_pubkey"},
Expand All @@ -79,7 +90,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in
}

// Insert non-duplicate rows from the temporary table into the target table
count, err := p.psqlPool.Exec(p.ctx, `
count, err := conn.Exec(p.ctx, `
INSERT INTO t_beacon_deposits (f_block_num, f_depositor, f_tx_hash, f_validator_pubkey)
SELECT f_block_num, f_depositor, f_tx_hash, f_validator_pubkey
FROM `+tempTableName+`
Expand All @@ -90,7 +101,7 @@ func (p *PostgresDBService) CopyBeaconDeposits(rowSrc []models.BeaconDeposit) in
}

// Drop the temporary table
_, err = p.psqlPool.Exec(p.ctx, `DROP TABLE IF EXISTS `+tempTableName)
_, err = conn.Exec(p.ctx, `DROP TABLE IF EXISTS `+tempTableName)
if err != nil {
wlog.Fatalf("could not drop temporary table: %s", err.Error())
}
Expand Down