Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore][receiver/sqlquery] Move reusable logic to internal package #30709

Merged
merged 7 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ internal/kubelet/ @open-telemetry/collect
internal/metadataproviders/ @open-telemetry/collector-contrib-approvers @Aneurysm9 @dashpole
internal/sharedcomponent/ @open-telemetry/collector-contrib-approvers @open-telemetry/collector-approvers
internal/splunk/ @open-telemetry/collector-contrib-approvers @dmitryax
internal/sqlquery/ @open-telemetry/collector-contrib-approvers @crobert-1 @dmitryax
internal/tools/ @open-telemetry/collector-contrib-approvers

pkg/batchperresourceattr/ @open-telemetry/collector-contrib-approvers @atoulme @dmitryax
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/bug_report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ body:
- internal/metadataproviders
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/feature_request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ body:
- internal/metadataproviders
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
Expand Down
1 change: 1 addition & 0 deletions .github/ISSUE_TEMPLATE/other.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ body:
- internal/metadataproviders
- internal/sharedcomponent
- internal/splunk
- internal/sqlquery
- internal/tools
- pkg/batchperresourceattr
- pkg/batchpersignal
Expand Down
3 changes: 3 additions & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kubelet v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.93.0 // indirect
Expand Down Expand Up @@ -1150,3 +1151,5 @@ replace (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ../../connector/servicegraphconnector
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ../../connector/spanmetricsconnector
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ../../internal/sqlquery
1 change: 1 addition & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,4 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampextension => ../../extension/opampextension
- github.com/open-telemetry/opentelemetry-collector-contrib/extension/solarwindsapmsettingsextension => ../../extension/solarwindsapmsettingsextension
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/namedpipereceiver => ../../receiver/namedpipereceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ../../internal/sqlquery
3 changes: 3 additions & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.93.0 // indirect
Expand Down Expand Up @@ -1191,3 +1192,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opam
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/solarwindsapmsettingsextension => ../../extension/solarwindsapmsettingsextension

replace github.com/open-telemetry/opentelemetry-collector-contrib/receiver/namedpipereceiver => ../../receiver/namedpipereceiver

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ../../internal/sqlquery
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchperresourceattr v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.93.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.93.0 // indirect
Expand Down Expand Up @@ -1151,3 +1152,5 @@ replace (
github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector => ./connector/servicegraphconnector
github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector => ./connector/spanmetricsconnector
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery => ./internal/sqlquery
1 change: 1 addition & 0 deletions internal/sqlquery/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
178 changes: 178 additions & 0 deletions internal/sqlquery/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"

import (
"errors"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/receiver/scraperhelper"
"go.uber.org/multierr"
)

type Config struct {
scraperhelper.ScraperControllerSettings `mapstructure:",squash"`
Driver string `mapstructure:"driver"`
DataSource string `mapstructure:"datasource"`
Queries []Query `mapstructure:"queries"`
StorageID *component.ID `mapstructure:"storage"`
Telemetry TelemetryConfig `mapstructure:"telemetry"`
}

func (c Config) Validate() error {
if c.Driver == "" {
return errors.New("'driver' cannot be empty")
}
if c.DataSource == "" {
return errors.New("'datasource' cannot be empty")
}
if len(c.Queries) == 0 {
return errors.New("'queries' cannot be empty")
}
for _, query := range c.Queries {
if err := query.Validate(); err != nil {
return err
}
}
return nil
}

type Query struct {
SQL string `mapstructure:"sql"`
Metrics []MetricCfg `mapstructure:"metrics"`
Logs []LogsCfg `mapstructure:"logs"`
TrackingColumn string `mapstructure:"tracking_column"`
TrackingStartValue string `mapstructure:"tracking_start_value"`
}

func (q Query) Validate() error {
var errs error
if q.SQL == "" {
errs = multierr.Append(errs, errors.New("'query.sql' cannot be empty"))
}
if len(q.Logs) == 0 && len(q.Metrics) == 0 {
errs = multierr.Append(errs, errors.New("at least one of 'query.logs' and 'query.metrics' must not be empty"))
}
for _, logs := range q.Logs {
if err := logs.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
}
for _, metric := range q.Metrics {
if err := metric.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
}
return errs
}

type LogsCfg struct {
BodyColumn string `mapstructure:"body_column"`
}

func (config LogsCfg) Validate() error {
var errs error
if config.BodyColumn == "" {
errs = multierr.Append(errs, errors.New("'body_column' must not be empty"))
}
return errs
}

type MetricCfg struct {
MetricName string `mapstructure:"metric_name"`
ValueColumn string `mapstructure:"value_column"`
AttributeColumns []string `mapstructure:"attribute_columns"`
Monotonic bool `mapstructure:"monotonic"`
ValueType MetricValueType `mapstructure:"value_type"`
DataType MetricType `mapstructure:"data_type"`
Aggregation MetricAggregation `mapstructure:"aggregation"`
Unit string `mapstructure:"unit"`
Description string `mapstructure:"description"`
StaticAttributes map[string]string `mapstructure:"static_attributes"`
StartTsColumn string `mapstructure:"start_ts_column"`
TsColumn string `mapstructure:"ts_column"`
}

func (c MetricCfg) Validate() error {
var errs error
if c.MetricName == "" {
errs = multierr.Append(errs, errors.New("'metric_name' cannot be empty"))
}
if c.ValueColumn == "" {
errs = multierr.Append(errs, errors.New("'value_column' cannot be empty"))
}
if err := c.ValueType.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
if err := c.DataType.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
if err := c.Aggregation.Validate(); err != nil {
errs = multierr.Append(errs, err)
}
if c.DataType == MetricTypeGauge && c.Aggregation != "" {
errs = multierr.Append(errs, fmt.Errorf("aggregation=%s but data_type=%s does not support aggregation", c.Aggregation, c.DataType))
}
if errs != nil && c.MetricName != "" {
errs = multierr.Append(fmt.Errorf("invalid metric config with metric_name '%s'", c.MetricName), errs)
}
return errs
}

type MetricType string

const (
MetricTypeUnspecified MetricType = ""
MetricTypeGauge MetricType = "gauge"
MetricTypeSum MetricType = "sum"
)

func (t MetricType) Validate() error {
switch t {
case MetricTypeUnspecified, MetricTypeGauge, MetricTypeSum:
return nil
}
return fmt.Errorf("metric config has unsupported data_type: '%s'", t)
}

type MetricValueType string

const (
MetricValueTypeUnspecified MetricValueType = ""
MetricValueTypeInt MetricValueType = "int"
MetricValueTypeDouble MetricValueType = "double"
)

func (t MetricValueType) Validate() error {
switch t {
case MetricValueTypeUnspecified, MetricValueTypeInt, MetricValueTypeDouble:
return nil
}
return fmt.Errorf("metric config has unsupported value_type: '%s'", t)
}

type MetricAggregation string

const (
MetricAggregationUnspecified MetricAggregation = ""
MetricAggregationCumulative MetricAggregation = "cumulative"
MetricAggregationDelta MetricAggregation = "delta"
)

func (a MetricAggregation) Validate() error {
switch a {
case MetricAggregationUnspecified, MetricAggregationCumulative, MetricAggregationDelta:
return nil
}
return fmt.Errorf("metric config has unsupported aggregation: '%s'", a)
}

type TelemetryConfig struct {
Logs TelemetryLogsConfig `mapstructure:"logs"`
}

type TelemetryLogsConfig struct {
Query bool `mapstructure:"query"`
}
93 changes: 93 additions & 0 deletions internal/sqlquery/db_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery"

import (
"context"

// register Db drivers
_ "github.com/SAP/go-hdb/driver"
_ "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
_ "github.com/microsoft/go-mssqldb"
_ "github.com/microsoft/go-mssqldb/integratedauth/krb5"
_ "github.com/sijms/go-ora/v2"
_ "github.com/snowflakedb/gosnowflake"
"go.uber.org/multierr"
"go.uber.org/zap"
)

type StringMap map[string]string

type DbClient interface {
QueryRows(ctx context.Context, args ...any) ([]StringMap, error)
}

type DbSQLClient struct {
Db Db
Logger *zap.Logger
Telemetry TelemetryConfig
SQL string
}

func NewDbClient(db Db, sql string, logger *zap.Logger, telemetry TelemetryConfig) DbClient {
return DbSQLClient{
Db: db,
SQL: sql,
Logger: logger,
Telemetry: telemetry,
}
}

func (cl DbSQLClient) QueryRows(ctx context.Context, args ...any) ([]StringMap, error) {
cl.Logger.Debug("Running query", cl.prepareQueryFields(cl.SQL, args)...)
sqlRows, err := cl.Db.QueryContext(ctx, cl.SQL, args...)
if err != nil {
return nil, err
}
var out []StringMap
colTypes, err := sqlRows.ColumnTypes()
if err != nil {
return nil, err
}
scanner := newRowScanner(colTypes)
var warnings error
for sqlRows.Next() {
err = scanner.scan(sqlRows)
if err != nil {
return nil, err
}
sm, scanErr := scanner.toStringMap()
if scanErr != nil {
warnings = multierr.Append(warnings, scanErr)
}
out = append(out, sm)
}
return out, warnings
}

func (cl DbSQLClient) prepareQueryFields(sql string, args []any) []zap.Field {
var logFields []zap.Field
if cl.Telemetry.Logs.Query {
logFields = append(logFields, zap.String("query", sql))
logFields = append(logFields, zap.Any("parameters", args))
}
return logFields
}

// This is only used for testing, but need to be exposed to other packages.
type FakeDBClient struct {
RequestCounter int
StringMaps [][]StringMap
Err error
}

func (c *FakeDBClient) QueryRows(context.Context, ...any) ([]StringMap, error) {
if c.Err != nil {
return nil, c.Err
}
idx := c.RequestCounter
c.RequestCounter++
return c.StringMaps[idx], nil
}
Loading
Loading