Skip to content

Commit

Permalink
feat: Add Prometheus metric to track sql stmts per transaction (#29)
Browse files Browse the repository at this point in the history
* Added a new summary metric `transaction_sql_stmts` to track the number of exec/querys stmts executed per transaction
* Introduced a new struct `managedTx` to to hold the context and connection of a transaction
* Introduced a new field `execStmtsCounter` and `queryStmtsCounter` in the `managedConn` to track the number of exec querys in a transaction
* When SQL lib opens a transaction we are storing the connection for that transaction and context of the transaction in the `managedTx`
* Context is used to store the required grpc labels for the metric. i.e. grpc method, grpc service
* Added a UnaryInterceptor to the grpc server to store the grpc method and grpc service in the context
* On commit or rollback of the transaction we are removing the connection from the `managedTx` and resetting the counters to 0
* On commit of the transaction we are observing the `transaction_sql_stmts` metric by the number of exec/querys got executed in the transaction
  • Loading branch information
yvardhineni authored Jun 19, 2024
1 parent a9750df commit 09c77f9
Show file tree
Hide file tree
Showing 9 changed files with 394 additions and 26 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ get:
fmt: get
go fmt ./...

tidy:
go mod tidy

# assert that there is no difference after running format
no-diff:
git diff --exit-code
Expand All @@ -25,7 +28,7 @@ test: vet get-ginkgo


# test target which includes the no-diff fail condition
ci-test: fmt no-diff test
ci-test: fmt tidy no-diff test

test-docker:
docker build -f Dockerfile.test .
Expand Down
38 changes: 37 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ type managedConn struct {
reset bool
killed bool
mu sync.RWMutex

execStmtsCounter int // count the number of exec calls in a transaction
queryStmtsCounter int // count the number of query calls in a transaction
}

// BeginTx calls the underlying BeginTx method unless the supervising context
Expand All @@ -34,7 +37,12 @@ func (c *managedConn) BeginTx(ctx context.Context, opts driver.TxOptions) (drive
}

if conn, ok := c.conn.(driver.ConnBeginTx); ok {
return conn.BeginTx(ctx, opts)
tx, err := conn.BeginTx(ctx, opts)
if err != nil {
return nil, err
}

return &managedTx{tx: tx, conn: c, ctx: ctx}, nil
}

// same as is defined in go sql package to call Begin method if the TxOptions are default
Expand Down Expand Up @@ -71,6 +79,7 @@ func (c *managedConn) Exec(query string, args []driver.Value) (driver.Result, er
if !ok {
return nil, driver.ErrSkip
}
c.incExecStmtsCounter() //increment the exec counter to keep track of the number of exec calls
return conn.Exec(query, args)
}

Expand All @@ -79,6 +88,7 @@ func (c *managedConn) ExecContext(ctx context.Context, query string, args []driv
if !ok {
return nil, driver.ErrSkip
}
c.incExecStmtsCounter() //increment the exec counter to keep track of the number of exec calls
return conn.ExecContext(ctx, query, args)
}

Expand All @@ -95,6 +105,7 @@ func (c *managedConn) Query(query string, args []driver.Value) (driver.Rows, err
if !ok {
return nil, driver.ErrSkip
}
c.incQueryStmtsCounter() //increment the query counter to keep track of the number of query calls
return conn.Query(query, args)
}

Expand All @@ -103,6 +114,7 @@ func (c *managedConn) QueryContext(ctx context.Context, query string, args []dri
if !ok {
return nil, driver.ErrSkip
}
c.incQueryStmtsCounter() //increment the query counter to keep track of the number of query calls
return conn.QueryContext(ctx, query, args)
}

Expand Down Expand Up @@ -184,3 +196,27 @@ func (c *managedConn) GetKill() bool {
defer c.mu.RUnlock()
return c.killed
}

func (c *managedConn) incExecStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.execStmtsCounter++
}

func (c *managedConn) resetExecStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.execStmtsCounter = 0
}

func (c *managedConn) incQueryStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.queryStmtsCounter++
}

func (c *managedConn) resetQueryStmtsCounter() {
c.mu.Lock()
defer c.mu.Unlock()
c.queryStmtsCounter = 0
}
202 changes: 201 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package hotload

import (
"context"
"database/sql/driver"
"io"
"strings"
"sync"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"sync"
"github.com/prometheus/client_golang/prometheus/testutil"
)

var _ = Describe("managedConn", func() {
Expand Down Expand Up @@ -34,3 +40,197 @@ var _ = Describe("managedConn", func() {
Consistently(readLockAcquired).Should(BeFalse())
})
})

/**** Mocks for Prometheus Metrics ****/

type mockDriverConn struct{}

type mockTx struct{}

func (mockTx) Commit() error {
return nil
}

func (mockTx) Rollback() error {
return nil
}

func (mockDriverConn) Prepare(query string) (driver.Stmt, error) {
return nil, nil
}

func (mockDriverConn) Begin() (driver.Tx, error) {
return mockTx{}, nil
}

func (mockDriverConn) Close() error {
return nil
}

func (mockDriverConn) IsValid() bool {
return true
}

func (mockDriverConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
return mockTx{}, nil
}

func (mockDriverConn) Exec(query string, args []driver.Value) (driver.Result, error) {
return nil, nil
}

func (mockDriverConn) Query(query string, args []driver.Value) (driver.Rows, error) {
return nil, nil
}

func (mockDriverConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
return nil, nil
}

func (mockDriverConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
return nil, nil
}

/**** End Mocks for Prometheus Metrics ****/

var _ = Describe("PrometheusMetrics", func() {
const help = `
# HELP transaction_sql_stmts The number of sql stmts called in a transaction by statement type per grpc service and method
# TYPE transaction_sql_stmts summary
`

var service1Metrics = `
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 3
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="query"} 3
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="query"} 1
`

var service2Metrics = `
transaction_sql_stmts_sum{grpc_method="method_2",grpc_service="service_2",stmt="exec"} 4
transaction_sql_stmts_count{grpc_method="method_2",grpc_service="service_2",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="method_2",grpc_service="service_2",stmt="query"} 4
transaction_sql_stmts_count{grpc_method="method_2",grpc_service="service_2",stmt="query"} 1
`

var service1RerunMetrics = `
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 4
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="exec"} 2
transaction_sql_stmts_sum{grpc_method="method_1",grpc_service="service_1",stmt="query"} 4
transaction_sql_stmts_count{grpc_method="method_1",grpc_service="service_1",stmt="query"} 2
`

var noMethodMetrics = `
transaction_sql_stmts_sum{grpc_method="",grpc_service="",stmt="exec"} 1
transaction_sql_stmts_count{grpc_method="",grpc_service="",stmt="exec"} 1
transaction_sql_stmts_sum{grpc_method="",grpc_service="",stmt="query"} 1
transaction_sql_stmts_count{grpc_method="",grpc_service="",stmt="query"} 1
`

It("Should emit the correct metrics", func() {
mc := newManagedConn(context.Background(), mockDriverConn{})

ctx := ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_1", "grpc_service": "service_1"})

// begin a transaction
tx, err := mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})

// query a statement
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1Metrics))
Expect(err).ShouldNot(HaveOccurred())

// reset the metrics
// new context
ctx = ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_2", "grpc_service": "service_2"})
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})
mc.Exec("INSERT INTO table (column) VALUES (?)", []driver.Value{"value"})

// query a statement
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})
mc.Query("SELECT * FROM table WHERE column = ?", []driver.Value{"value"})

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1Metrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())

// rerun with initial metrics
ctx = ContextWithExecLabels(context.Background(), map[string]string{"grpc_method": "method_1", "grpc_service": "service_1"})
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec a statement with context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// rollback the transaction
err = tx.Rollback()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+service1RerunMetrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())

// non labeled context
ctx = context.Background()
// begin a transaction
tx, err = mc.BeginTx(ctx, driver.TxOptions{})
Expect(err).ShouldNot(HaveOccurred())

// exec query context
mc.ExecContext(ctx, "INSERT INTO table (column) VALUES (?)", []driver.NamedValue{{Value: "value"}})

// query a statement with context
mc.QueryContext(ctx, "SELECT * FROM table WHERE column = ?", []driver.NamedValue{{Value: "value"}})

// commit the transaction
err = tx.Commit()
Expect(err).ShouldNot(HaveOccurred())

// collect and compare metrics
err = testutil.CollectAndCompare(sqlStmtsSummary, strings.NewReader(help+noMethodMetrics+service1RerunMetrics+service2Metrics))
Expect(err).ShouldNot(HaveOccurred())
})
})

func CollectAndCompareMetrics(r io.Reader) error {
return testutil.CollectAndCompare(sqlStmtsSummary, r)
}
21 changes: 15 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,26 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.18.0
github.com/sirupsen/logrus v1.9.0
)

require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/nxadm/tail v1.4.8 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/tools v0.7.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 09c77f9

Please sign in to comment.