Skip to content

Commit

Permalink
fixed issue with connections building up
Browse files Browse the repository at this point in the history
  • Loading branch information
joshghent committed Oct 9, 2024
1 parent c56e5e5 commit 796a9a0
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 27 deletions.
4 changes: 2 additions & 2 deletions Ango/Get code prod droplet.bru
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ meta {
}

post {
url: http://e80048okk804gs0k8o8c8css.209.97.180.192.sslip.io/api/v1/code/redeem
url: http://kw4484wowwk8cwogwscgooo0.209.97.180.192.sslip.io/api/v1/code/redeem
body: json
auth: none
}
Expand All @@ -14,6 +14,6 @@ body:json {
{
"batchid": "11111111-1111-1111-1111-111111111111",
"clientid": "217be7c8-679c-4e08-bffc-db3451bdcdbf",
"customerid": "88e92f73-e6a9-4ab6-bb04-3861e715e9f8"
"customerid": "bc22e256-e902-4449-b660-ba9bebc1e7dc"
}
}
2 changes: 1 addition & 1 deletion Ango/Get code.bru
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ body:json {
{
"batchid": "11111111-1111-1111-1111-111111111111",
"clientid": "217be7c8-679c-4e08-bffc-db3451bdcdbf",
"customerid": "50b0b41b-c665-4409-a2bb-a4fc18828dc2"
"customerid": "fba9230a-a521-430e-aaf8-8aefbf588071"
}
}
11 changes: 11 additions & 0 deletions Ango/Healthcheck.bru
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
meta {
name: Healthcheck
type: http
seq: 6
}

get {
url: http://localhost:3000/healthcheck
body: none
auth: none
}
14 changes: 6 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,16 @@ func connectToDB() (*pgxpool.Pool, error) {
}

func monitorDBConnections(pool *pgxpool.Pool) {
ticker := time.NewTicker(30 * time.Second) // Increased frequency
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for range ticker.C {
stats := pool.Stat()
log.Printf("DB Pool Stats - Total: %d, Idle: %d, In Use: %d, Max: %d",
stats.TotalConns(), stats.IdleConns(), stats.AcquiredConns(), stats.MaxConns())

// Close idle connections
pool.AcquireAllIdle(context.Background())

// Check for stalled connections and reset them
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// Check and reset stalled connections with extended timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := pool.AcquireFunc(ctx, func(conn *pgxpool.Conn) error {
_, err := conn.Exec(ctx, "SELECT 1")
if err != nil {
Expand All @@ -120,8 +117,8 @@ func monitorDBConnections(pool *pgxpool.Pool) {
log.Printf("Error checking for stalled connections: %v", err)
}

// Check for long-running transactions
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
// Terminate long-running idle connections with extended timeout
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
_, err = pool.Exec(ctx, `
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
Expand All @@ -136,6 +133,7 @@ func monitorDBConnections(pool *pgxpool.Pool) {
}
}


func testDBConnection(db *pgxpool.Pool) error {
// Check if the required tables exist
tables := []string{"batches", "codes"}
Expand Down
33 changes: 17 additions & 16 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,12 @@ func getCode(ctx context.Context, req Request) (string, error) {
}
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second) // Extended timeout
defer cancel()

tx, err := db.Begin(ctx)
if err != nil {
return "", err
}
defer tx.Rollback(ctx)

selectCodeTime := time.Now()

// First, check if the batch is expired
// Check batch expiration outside of the transaction
var batchExpired bool
err = tx.QueryRow(ctx, `
err := db.QueryRow(ctx, `
SELECT expired
FROM batches
WHERE id = $1
Expand All @@ -83,13 +75,22 @@ func getCode(ctx context.Context, req Request) (string, error) {
return "", ErrBatchExpired
}

// If batch is not expired, proceed to select a code
// Begin transaction after initial check
tx, err := db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return "", err
}
defer tx.Rollback(ctx) // Ensure rollback if not committed

selectCodeTime := time.Now()

// Attempt to acquire a code
var code string
err = tx.QueryRow(ctx, `
SELECT code
FROM codes
WHERE batch_id = $1 AND client_id = $2 AND customer_id IS NULL
FOR UPDATE SKIP LOCKED
FOR NO KEY UPDATE SKIP LOCKED
LIMIT 1
`, req.BatchID, req.ClientID).Scan(&code)
if err != nil {
Expand All @@ -100,7 +101,7 @@ func getCode(ctx context.Context, req Request) (string, error) {
}

if time.Since(selectCodeTime) > 100*time.Millisecond {
log.Printf("Queries for checking batch expiration and selecting code took too long (%v)ms", time.Since(selectCodeTime))
log.Printf("Queries for selecting code took too long (%v)ms", time.Since(selectCodeTime))
}

// Retrieve rules from cache or database
Expand All @@ -119,7 +120,7 @@ func getCode(ctx context.Context, req Request) (string, error) {
return "", err
}
if time.Since(updateCodesTime) > 100*time.Millisecond {
log.Printf("Query for updating codes took long (%v)ms", time.Since(updateCodesTime))
log.Printf("Query for updating codes took too long (%v)ms", time.Since(updateCodesTime))
}

insertCodeUsageTime := time.Now()
Expand All @@ -128,7 +129,7 @@ func getCode(ctx context.Context, req Request) (string, error) {
return "", err
}
if time.Since(insertCodeUsageTime) > 100*time.Millisecond {
log.Printf("Query for inserting codes took long (%v)ms", time.Since(insertCodeUsageTime))
log.Printf("Query for inserting code usage took too long (%v)ms", time.Since(insertCodeUsageTime))
}

if err = tx.Commit(ctx); err != nil {
Expand Down

0 comments on commit 796a9a0

Please sign in to comment.