Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix():MONITOR-6569 tls connection disable for mssql #30

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 69 additions & 62 deletions x-pack/metricbeat/module/mssql/database/database.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package database


import (
"database/sql"
"fmt"
"strings"

s "github.com/elastic/beats/v7/libbeat/common/schema"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"strconv"
s "github.com/elastic/beats/v7/libbeat/common/schema"
)

type rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) ( error)
type rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error

type databaseCounter struct {
objectName string
Expand Down Expand Up @@ -54,13 +53,21 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, fmt.Errorf("unpack rowConfig failed, err=%s", err)
}

encryptParam := ""
if encryptVal, ok := rowConfig["encrypt"].(string); ok && encryptVal == "disable" {
encryptParam = "&encrypt=disable"
}

dbNameParam := "database=master"
uri := base.HostData().URI
if dbName, ok := rowConfig["database"].(string); ok {
uri = fmt.Sprintf("sqlserver://%s:%s@%s?database=%s",
base.HostData().User, base.HostData().Password,
base.HostData().Host, dbName)
if dbNameVal, ok := rowConfig["database"].(string); ok {
dbNameParam = "database=" + dbNameVal
}

uri = fmt.Sprintf("sqlserver://%s:%s@%s?%s%s",
base.HostData().User, base.HostData().Password,
base.HostData().Host, dbNameParam, encryptParam)

db, err := mssql.NewConnection(uri)
if err != nil {
return nil, fmt.Errorf("could not create connection to db %w", err)
Expand Down Expand Up @@ -188,7 +195,7 @@ func (m *MetricSet) Close() error {
return m.db.Close()
}

func (m *MetricSet) fetchTps(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchTps(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand All @@ -212,7 +219,7 @@ JOIN sys.dm_exec_sessions AS s ON r1.session_id = s.session_id
GROUP BY s.database_id;`

type blockCountCounter struct {
dbName string
dbName string
blockedSessionCount *int64
}

Expand All @@ -231,13 +238,13 @@ GROUP BY s.database_id;`
for dbName, item := range mapStr {
result = append(result, mapstr.M{
"session_block_count": item,
"db_name": dbName,
"db_name": dbName,
})
}
return result
}

func (m *MetricSet) fetchDeadLockCount(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchDeadLockCount(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand All @@ -252,7 +259,7 @@ WHERE
return m.fetchRow(query, reporter)
}

func (m *MetricSet) fetchLockRequestTotal(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchLockRequestTotal(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand Down Expand Up @@ -309,7 +316,7 @@ WHERE
}

return mapstr.M{
"PlanCacheHitRatio": fmt.Sprintf("%f", float64(hitRatio)/float64(hitRatioBase) * 100),
"PlanCacheHitRatio": fmt.Sprintf("%f", float64(hitRatio)/float64(hitRatioBase)*100),
}
}

Expand All @@ -327,7 +334,7 @@ func (m *MetricSet) fetchMemoryPageFault(reporter mb.ReporterV2) mapstr.M {
}

func (m *MetricSet) fetchIOWait(reporter mb.ReporterV2) []mapstr.M {
query :=`SELECT
query := `SELECT
DB_NAME(fs.database_id) AS db_name,
-- CAST(SUM(fs.io_stall)/ 1000.0 AS DECIMAL(18,2)) AS total_io_stall_ms,
-- CAST(SUM(fs.io_stall_read_ms + fs.io_stall_write_ms)/1000.0 AS DECIMAL(18,2)) AS total_io_stall_read_write_ms,
Expand All @@ -338,7 +345,7 @@ INNER JOIN sys.database_files AS df ON df.file_id = fs.file_id
GROUP BY fs.database_id;`

type ioRow struct {
dbName string
dbName string
avgIOWait *float64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -377,11 +384,11 @@ INNER JOIN
sys.master_files AS mf ON vfs.database_id = mf.database_id AND vfs.file_id = mf.file_id;`

type diskRWBytesCounter struct {
dbName string
diskFileName string
typeDesc string
numOfReadBytes *int64
numOfWrittenBytes *int64
dbName string
diskFileName string
typeDesc string
numOfReadBytes *int64
numOfWrittenBytes *int64
avgMilliSecondsPerIO *float64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -409,8 +416,8 @@ INNER JOIN
}
dbName, diskFileName, typeDesc, metricName := keys[0], keys[1], keys[2], keys[3]
result = append(result, mapstr.M{
metricName: item,
"db_name": dbName,
metricName: item,
"db_name": dbName,
"disk_file": diskFileName,
"type_desc": typeDesc,
})
Expand All @@ -434,11 +441,11 @@ FROM
sys.dm_exec_connections;`

type dbIOCounter struct {
dbName string
connectionCount *int64
totalReads *int64
totalWrites *int64
totalReadsBytes *int64
dbName string
connectionCount *int64
totalReads *int64
totalWrites *int64
totalReadsBytes *int64
totalWrittenBytes *int64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -466,7 +473,7 @@ FROM
dbName, metricName := keys[0], keys[1]
result = append(result, mapstr.M{
metricName: item,
"db_name": dbName,
"db_name": dbName,
})
}
return result
Expand All @@ -487,7 +494,7 @@ func (m *MetricSet) fetchConnectionsPct(reporter mb.ReporterV2) mapstr.M {
}
} else {
return mapstr.M{
"connections_used_pct": fmt.Sprintf("%v", float64(userConnections) / float64(maxConnections)),
"connections_used_pct": fmt.Sprintf("%v", float64(userConnections)/float64(maxConnections)),
}
}

Expand Down Expand Up @@ -539,7 +546,7 @@ func (m *MetricSet) fetchTableUsedSpace(reporter mb.ReporterV2) []mapstr.M {

dbNames := make([]string, 0, len(allDbs))
for dbName := range allDbs {
dbNames = append(dbNames, "'" + dbName + "'")
dbNames = append(dbNames, "'"+dbName+"'")
}
dbNameWhereCond := strings.Join(dbNames, ", ")

Expand All @@ -565,10 +572,10 @@ GROUP BY
t.Name, p.Rows, Schemas.TABLE_CATALOG`, dbNameWhereCond)

type tableSpaceRow struct {
dbName string
tableName string
totalSpaceKB *int64
usedSpaceKB *int64
dbName string
tableName string
totalSpaceKB *int64
usedSpaceKB *int64
unusedSpaceKB *int64
}

Expand All @@ -585,9 +592,9 @@ GROUP BY
}
key := fmt.Sprintf("%s-%s", row.dbName, row.tableName)
(*mapStr)[key] = mapstr.M{
"table_total_space": fmt.Sprintf("%v", *row.totalSpaceKB),
"table_used_space": fmt.Sprintf("%v", *row.usedSpaceKB),
"table_unused_space": fmt.Sprintf("%v", *row.unusedSpaceKB),
"table_total_space": fmt.Sprintf("%v", *row.totalSpaceKB),
"table_used_space": fmt.Sprintf("%v", *row.usedSpaceKB),
"table_unused_space": fmt.Sprintf("%v", *row.unusedSpaceKB),
"table_space_used_pct": fmt.Sprintf("%v", spaceUsedPct),
}

Expand All @@ -603,31 +610,31 @@ GROUP BY
}
dbName, tblName := keys[0], keys[1]
var (
dbTotalSpace int64
dbUsedSpace int64
dbUnusedSpace int64
dbTotalSpace int64
dbUsedSpace int64
dbUnusedSpace int64
dbSpaceUsedPct float64 = 0.00
)
for metricName, val := range item.(mapstr.M) {
result = append(result, mapstr.M{
metricName: val,
"db_name": dbName,
metricName: val,
"db_name": dbName,
"table_name": tblName,
})
if metricName == "table_total_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbTotalSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s total space failed, val=%s", dbName, tblName, val))
}
} else if metricName == "table_used_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbUsedSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s used space failed, val=%s", dbName, tblName, val))
}
} else if metricName == "table_unused_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbUnusedSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s unused space failed, val=%s", dbName, tblName, val))
Expand All @@ -637,21 +644,21 @@ GROUP BY

result = append(result, mapstr.M{
"used_space": fmt.Sprintf("%v", dbUsedSpace),
"db_name": dbName,
"db_name": dbName,
})
result = append(result, mapstr.M{
"unused_space": fmt.Sprintf("%v", dbUnusedSpace),
"db_name": dbName,
"db_name": dbName,
})
result = append(result, mapstr.M{
"total_space": fmt.Sprintf("%v", dbTotalSpace),
"db_name": dbName,
"db_name": dbName,
})
if dbTotalSpace != 0 {
dbSpaceUsedPct = float64(dbUsedSpace) / float64(dbTotalSpace)
result = append(result, mapstr.M{
"space_used_pct": fmt.Sprintf("%v", dbSpaceUsedPct),
"db_name": dbName,
"db_name": dbName,
})
}
}
Expand All @@ -677,7 +684,7 @@ func (m *MetricSet) fetchAllDbs(reporter mb.ReporterV2) mapstr.M {
}
return nil
}
return m.fetchRowsWithRowCounter(queryAllDbs, reporter,allDbsCounter)
return m.fetchRowsWithRowCounter(queryAllDbs, reporter, allDbsCounter)
}

func (m *MetricSet) fetchTableIndexSize(reporter mb.ReporterV2) []mapstr.M {
Expand Down Expand Up @@ -705,9 +712,9 @@ func (m *MetricSet) fetchTableIndexSize(reporter mb.ReporterV2) []mapstr.M {
FROM tbl_page_count;`

type tableIndexRow struct {
dbName string
tableName string
indexName *string
dbName string
tableName string
indexName *string
indexSizeKB *int64
}

Expand Down Expand Up @@ -749,14 +756,14 @@ FROM tbl_page_count;`
for tblName, indexSize := range tables {
results = append(results, mapstr.M{
"table_index_size": fmt.Sprintf("%v", indexSize),
"table_name": tblName,
"db_name": dbName,
"table_name": tblName,
"db_name": dbName,
})
dbIndexSize+=indexSize
dbIndexSize += indexSize
}
results = append(results, mapstr.M{
"index_size": fmt.Sprintf("%v", dbIndexSize),
"db_name": dbName,
"db_name": dbName,
})
}

Expand All @@ -774,7 +781,7 @@ WHERE database_id = DB_ID()
GROUP BY database_id`

type logSizeRow struct {
dbName string
dbName string
logSizeKB *int64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand All @@ -796,7 +803,7 @@ GROUP BY database_id`
metric := item.(mapstr.M)
result = append(result, mapstr.M{
"log_size": fmt.Sprintf("%v", metric["log_size"]),
"db_name": dbName,
"db_name": dbName,
})
}

Expand Down Expand Up @@ -824,7 +831,7 @@ func (m *MetricSet) fetchRow(query string, reporter mb.ReporterV2) mapstr.M {

func (m *MetricSet) fetchRows(query string, reporter mb.ReporterV2) mapstr.M {
var (
err error
err error
rows *sql.Rows
)

Expand Down Expand Up @@ -858,9 +865,9 @@ func (m *MetricSet) fetchRows(query string, reporter mb.ReporterV2) mapstr.M {
return mapStr
}

func (m * MetricSet) fetchRowsWithRowCounter(query string, reporter mb.ReporterV2, counter rowCounter) mapstr.M {
func (m *MetricSet) fetchRowsWithRowCounter(query string, reporter mb.ReporterV2, counter rowCounter) mapstr.M {
var (
err error
err error
rows *sql.Rows
)

Expand Down