Skip to content

Commit

Permalink
Merge pull request #30 from easyops-cn/henryxie/mssql
Browse files Browse the repository at this point in the history
fix():MONITOR-6569 tls connection disable for mssql
  • Loading branch information
XieJCHenry authored Aug 8, 2023
2 parents 9d72572 + 91119fd commit f708478
Showing 1 changed file with 69 additions and 62 deletions.
131 changes: 69 additions & 62 deletions x-pack/metricbeat/module/mssql/database/database.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package database


import (
"database/sql"
"fmt"
"strings"

s "github.com/elastic/beats/v7/libbeat/common/schema"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/mssql"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"strconv"
s "github.com/elastic/beats/v7/libbeat/common/schema"
)

type rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) ( error)
type rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error

type databaseCounter struct {
objectName string
Expand Down Expand Up @@ -54,13 +53,21 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return nil, fmt.Errorf("unpack rowConfig failed, err=%s", err)
}

encryptParam := ""
if encryptVal, ok := rowConfig["encrypt"].(string); ok && encryptVal == "disable" {
encryptParam = "&encrypt=disable"
}

dbNameParam := "database=master"
uri := base.HostData().URI
if dbName, ok := rowConfig["database"].(string); ok {
uri = fmt.Sprintf("sqlserver://%s:%s@%s?database=%s",
base.HostData().User, base.HostData().Password,
base.HostData().Host, dbName)
if dbNameVal, ok := rowConfig["database"].(string); ok {
dbNameParam = "database=" + dbNameVal
}

uri = fmt.Sprintf("sqlserver://%s:%s@%s?%s%s",
base.HostData().User, base.HostData().Password,
base.HostData().Host, dbNameParam, encryptParam)

db, err := mssql.NewConnection(uri)
if err != nil {
return nil, fmt.Errorf("could not create connection to db %w", err)
Expand Down Expand Up @@ -188,7 +195,7 @@ func (m *MetricSet) Close() error {
return m.db.Close()
}

func (m *MetricSet) fetchTps(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchTps(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand All @@ -212,7 +219,7 @@ JOIN sys.dm_exec_sessions AS s ON r1.session_id = s.session_id
GROUP BY s.database_id;`

type blockCountCounter struct {
dbName string
dbName string
blockedSessionCount *int64
}

Expand All @@ -231,13 +238,13 @@ GROUP BY s.database_id;`
for dbName, item := range mapStr {
result = append(result, mapstr.M{
"session_block_count": item,
"db_name": dbName,
"db_name": dbName,
})
}
return result
}

func (m *MetricSet) fetchDeadLockCount(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchDeadLockCount(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand All @@ -252,7 +259,7 @@ WHERE
return m.fetchRow(query, reporter)
}

func (m *MetricSet) fetchLockRequestTotal(reporter mb.ReporterV2) (mapstr.M) {
func (m *MetricSet) fetchLockRequestTotal(reporter mb.ReporterV2) mapstr.M {
query := `SELECT
object_name,
instance_name,
Expand Down Expand Up @@ -309,7 +316,7 @@ WHERE
}

return mapstr.M{
"PlanCacheHitRatio": fmt.Sprintf("%f", float64(hitRatio)/float64(hitRatioBase) * 100),
"PlanCacheHitRatio": fmt.Sprintf("%f", float64(hitRatio)/float64(hitRatioBase)*100),
}
}

Expand All @@ -327,7 +334,7 @@ func (m *MetricSet) fetchMemoryPageFault(reporter mb.ReporterV2) mapstr.M {
}

func (m *MetricSet) fetchIOWait(reporter mb.ReporterV2) []mapstr.M {
query :=`SELECT
query := `SELECT
DB_NAME(fs.database_id) AS db_name,
-- CAST(SUM(fs.io_stall)/ 1000.0 AS DECIMAL(18,2)) AS total_io_stall_ms,
-- CAST(SUM(fs.io_stall_read_ms + fs.io_stall_write_ms)/1000.0 AS DECIMAL(18,2)) AS total_io_stall_read_write_ms,
Expand All @@ -338,7 +345,7 @@ INNER JOIN sys.database_files AS df ON df.file_id = fs.file_id
GROUP BY fs.database_id;`

type ioRow struct {
dbName string
dbName string
avgIOWait *float64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -377,11 +384,11 @@ INNER JOIN
sys.master_files AS mf ON vfs.database_id = mf.database_id AND vfs.file_id = mf.file_id;`

type diskRWBytesCounter struct {
dbName string
diskFileName string
typeDesc string
numOfReadBytes *int64
numOfWrittenBytes *int64
dbName string
diskFileName string
typeDesc string
numOfReadBytes *int64
numOfWrittenBytes *int64
avgMilliSecondsPerIO *float64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -409,8 +416,8 @@ INNER JOIN
}
dbName, diskFileName, typeDesc, metricName := keys[0], keys[1], keys[2], keys[3]
result = append(result, mapstr.M{
metricName: item,
"db_name": dbName,
metricName: item,
"db_name": dbName,
"disk_file": diskFileName,
"type_desc": typeDesc,
})
Expand All @@ -434,11 +441,11 @@ FROM
sys.dm_exec_connections;`

type dbIOCounter struct {
dbName string
connectionCount *int64
totalReads *int64
totalWrites *int64
totalReadsBytes *int64
dbName string
connectionCount *int64
totalReads *int64
totalWrites *int64
totalReadsBytes *int64
totalWrittenBytes *int64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand Down Expand Up @@ -466,7 +473,7 @@ FROM
dbName, metricName := keys[0], keys[1]
result = append(result, mapstr.M{
metricName: item,
"db_name": dbName,
"db_name": dbName,
})
}
return result
Expand All @@ -487,7 +494,7 @@ func (m *MetricSet) fetchConnectionsPct(reporter mb.ReporterV2) mapstr.M {
}
} else {
return mapstr.M{
"connections_used_pct": fmt.Sprintf("%v", float64(userConnections) / float64(maxConnections)),
"connections_used_pct": fmt.Sprintf("%v", float64(userConnections)/float64(maxConnections)),
}
}

Expand Down Expand Up @@ -539,7 +546,7 @@ func (m *MetricSet) fetchTableUsedSpace(reporter mb.ReporterV2) []mapstr.M {

dbNames := make([]string, 0, len(allDbs))
for dbName := range allDbs {
dbNames = append(dbNames, "'" + dbName + "'")
dbNames = append(dbNames, "'"+dbName+"'")
}
dbNameWhereCond := strings.Join(dbNames, ", ")

Expand All @@ -565,10 +572,10 @@ GROUP BY
t.Name, p.Rows, Schemas.TABLE_CATALOG`, dbNameWhereCond)

type tableSpaceRow struct {
dbName string
tableName string
totalSpaceKB *int64
usedSpaceKB *int64
dbName string
tableName string
totalSpaceKB *int64
usedSpaceKB *int64
unusedSpaceKB *int64
}

Expand All @@ -585,9 +592,9 @@ GROUP BY
}
key := fmt.Sprintf("%s-%s", row.dbName, row.tableName)
(*mapStr)[key] = mapstr.M{
"table_total_space": fmt.Sprintf("%v", *row.totalSpaceKB),
"table_used_space": fmt.Sprintf("%v", *row.usedSpaceKB),
"table_unused_space": fmt.Sprintf("%v", *row.unusedSpaceKB),
"table_total_space": fmt.Sprintf("%v", *row.totalSpaceKB),
"table_used_space": fmt.Sprintf("%v", *row.usedSpaceKB),
"table_unused_space": fmt.Sprintf("%v", *row.unusedSpaceKB),
"table_space_used_pct": fmt.Sprintf("%v", spaceUsedPct),
}

Expand All @@ -603,31 +610,31 @@ GROUP BY
}
dbName, tblName := keys[0], keys[1]
var (
dbTotalSpace int64
dbUsedSpace int64
dbUnusedSpace int64
dbTotalSpace int64
dbUsedSpace int64
dbUnusedSpace int64
dbSpaceUsedPct float64 = 0.00
)
for metricName, val := range item.(mapstr.M) {
result = append(result, mapstr.M{
metricName: val,
"db_name": dbName,
metricName: val,
"db_name": dbName,
"table_name": tblName,
})
if metricName == "table_total_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbTotalSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s total space failed, val=%s", dbName, tblName, val))
}
} else if metricName == "table_used_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbUsedSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s used space failed, val=%s", dbName, tblName, val))
}
} else if metricName == "table_unused_space" {
if v, err := strconv.Atoi(val.(string)); err == nil{
if v, err := strconv.Atoi(val.(string)); err == nil {
dbUnusedSpace += int64(v)
} else {
reporter.Error(fmt.Errorf("parse table %s-%s unused space failed, val=%s", dbName, tblName, val))
Expand All @@ -637,21 +644,21 @@ GROUP BY

result = append(result, mapstr.M{
"used_space": fmt.Sprintf("%v", dbUsedSpace),
"db_name": dbName,
"db_name": dbName,
})
result = append(result, mapstr.M{
"unused_space": fmt.Sprintf("%v", dbUnusedSpace),
"db_name": dbName,
"db_name": dbName,
})
result = append(result, mapstr.M{
"total_space": fmt.Sprintf("%v", dbTotalSpace),
"db_name": dbName,
"db_name": dbName,
})
if dbTotalSpace != 0 {
dbSpaceUsedPct = float64(dbUsedSpace) / float64(dbTotalSpace)
result = append(result, mapstr.M{
"space_used_pct": fmt.Sprintf("%v", dbSpaceUsedPct),
"db_name": dbName,
"db_name": dbName,
})
}
}
Expand All @@ -677,7 +684,7 @@ func (m *MetricSet) fetchAllDbs(reporter mb.ReporterV2) mapstr.M {
}
return nil
}
return m.fetchRowsWithRowCounter(queryAllDbs, reporter,allDbsCounter)
return m.fetchRowsWithRowCounter(queryAllDbs, reporter, allDbsCounter)
}

func (m *MetricSet) fetchTableIndexSize(reporter mb.ReporterV2) []mapstr.M {
Expand Down Expand Up @@ -705,9 +712,9 @@ func (m *MetricSet) fetchTableIndexSize(reporter mb.ReporterV2) []mapstr.M {
FROM tbl_page_count;`

type tableIndexRow struct {
dbName string
tableName string
indexName *string
dbName string
tableName string
indexName *string
indexSizeKB *int64
}

Expand Down Expand Up @@ -749,14 +756,14 @@ FROM tbl_page_count;`
for tblName, indexSize := range tables {
results = append(results, mapstr.M{
"table_index_size": fmt.Sprintf("%v", indexSize),
"table_name": tblName,
"db_name": dbName,
"table_name": tblName,
"db_name": dbName,
})
dbIndexSize+=indexSize
dbIndexSize += indexSize
}
results = append(results, mapstr.M{
"index_size": fmt.Sprintf("%v", dbIndexSize),
"db_name": dbName,
"db_name": dbName,
})
}

Expand All @@ -774,7 +781,7 @@ WHERE database_id = DB_ID()
GROUP BY database_id`

type logSizeRow struct {
dbName string
dbName string
logSizeKB *int64
}
var counter rowCounter = func(rows *sql.Rows, mapStr *mapstr.M) error {
Expand All @@ -796,7 +803,7 @@ GROUP BY database_id`
metric := item.(mapstr.M)
result = append(result, mapstr.M{
"log_size": fmt.Sprintf("%v", metric["log_size"]),
"db_name": dbName,
"db_name": dbName,
})
}

Expand Down Expand Up @@ -824,7 +831,7 @@ func (m *MetricSet) fetchRow(query string, reporter mb.ReporterV2) mapstr.M {

func (m *MetricSet) fetchRows(query string, reporter mb.ReporterV2) mapstr.M {
var (
err error
err error
rows *sql.Rows
)

Expand Down Expand Up @@ -858,9 +865,9 @@ func (m *MetricSet) fetchRows(query string, reporter mb.ReporterV2) mapstr.M {
return mapStr
}

func (m * MetricSet) fetchRowsWithRowCounter(query string, reporter mb.ReporterV2, counter rowCounter) mapstr.M {
func (m *MetricSet) fetchRowsWithRowCounter(query string, reporter mb.ReporterV2, counter rowCounter) mapstr.M {
var (
err error
err error
rows *sql.Rows
)

Expand Down

0 comments on commit f708478

Please sign in to comment.