Skip to content

Commit

Permalink
Reuse connections on sql module (elastic#16001)
Browse files Browse the repository at this point in the history
Keep a connection open with the database instead of opening
and closing connections on each fetch. this is consistent with
other specific SQL modules we have.
  • Loading branch information
jsoriano authored Feb 3, 2020
1 parent e9b0be3 commit 583a89c
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822]
- Add citadel metricset for Istio Metricbeat module {pull}15990[15990]
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]
- Reuse connections in SQL module. {pull}16001[16001]

*Packetbeat*

Expand Down
40 changes: 31 additions & 9 deletions x-pack/metricbeat/module/sql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package query

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -43,6 +44,8 @@ type MetricSet struct {
Driver string
Query string
ResponseFormat string

db *sqlx.DB
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand Down Expand Up @@ -77,19 +80,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
// It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response
// format of the query.
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
db, err := sqlx.Open(m.Driver, m.HostData().URI)
func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error {
db, err := m.DB()
if err != nil {
return errors.Wrap(err, "error opening connection")
}
defer db.Close()

err = db.Ping()
if err != nil {
return errors.Wrap(err, "error testing connection")
}

rows, err := db.Queryx(m.Query)
rows, err := db.QueryxContext(ctx, m.Query)
if err != nil {
return errors.Wrap(err, "error executing query")
}
Expand All @@ -102,6 +99,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
return m.fetchVariableMode(rows, report)
}

// DB gets a client ready to query the database
func (m *MetricSet) DB() (*sqlx.DB, error) {
if m.db == nil {
db, err := sqlx.Open(m.Driver, m.HostData().URI)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
err = db.Ping()
if err != nil {
return nil, errors.Wrap(err, "testing connection")
}

m.db = db
}
return m.db, nil
}

// fetchTableMode scan the rows and publishes the event for querys that return the response in a table format.
func (m *MetricSet) fetchTableMode(rows *sqlx.Rows, report mb.ReporterV2) error {

Expand Down Expand Up @@ -229,3 +243,11 @@ func getValue(pval *interface{}) string {
return fmt.Sprint(v)
}
}

// Close closes the connection pool releasing its resources
func (m *MetricSet) Close() error {
if m.db == nil {
return nil
}
return errors.Wrap(m.db.Close(), "closing connection")
}

0 comments on commit 583a89c

Please sign in to comment.