Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse connections on sql module #16001

Merged
merged 2 commits into from
Feb 3, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add STAN dashboard {pull}15654[15654]
- Add support for Unix socket in Memcached metricbeat module. {issue}13685[13685] {pull}15822[15822]
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]
- Reuse connections in SQL module. {pull}[]

*Packetbeat*

Expand Down
40 changes: 31 additions & 9 deletions x-pack/metricbeat/module/sql/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package query

import (
"context"
"fmt"
"strconv"
"strings"
Expand Down Expand Up @@ -43,6 +44,8 @@ type MetricSet struct {
Driver string
Query string
ResponseFormat string

db *sqlx.DB
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
Expand Down Expand Up @@ -77,19 +80,13 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// of an error set the Error field of mb.Event or simply call report.Error().
// It calls m.fetchTableMode() or m.fetchVariableMode() depending on the response
// format of the query.
func (m *MetricSet) Fetch(report mb.ReporterV2) error {
db, err := sqlx.Open(m.Driver, m.HostData().URI)
func (m *MetricSet) Fetch(ctx context.Context, report mb.ReporterV2) error {
db, err := m.DB()
if err != nil {
return errors.Wrap(err, "error opening connection")
}
defer db.Close()

err = db.Ping()
if err != nil {
return errors.Wrap(err, "error testing connection")
}

rows, err := db.Queryx(m.Query)
rows, err := db.QueryxContext(ctx, m.Query)
if err != nil {
return errors.Wrap(err, "error executing query")
}
Expand All @@ -102,6 +99,23 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error {
return m.fetchVariableMode(rows, report)
}

// DB gets a client ready to query the database
func (m *MetricSet) DB() (*sqlx.DB, error) {
if m.db == nil {
db, err := sqlx.Open(m.Driver, m.HostData().URI)
if err != nil {
return nil, errors.Wrap(err, "opening connection")
}
err = db.Ping()
if err != nil {
return nil, errors.Wrap(err, "testing connection")
}

m.db = db
}
return m.db, nil
}

// fetchTableMode scan the rows and publishes the event for querys that return the response in a table format.
func (m *MetricSet) fetchTableMode(rows *sqlx.Rows, report mb.ReporterV2) error {

Expand Down Expand Up @@ -229,3 +243,11 @@ func getValue(pval *interface{}) string {
return fmt.Sprint(v)
}
}

// Close closes the connection pool releasing its resources
func (m *MetricSet) Close() error {
if m.db == nil {
return nil
}
return errors.Wrap(m.db.Close(), "closing connection")
}