From f87d2329f144e6bf4b259727cb3e6c5942991b01 Mon Sep 17 00:00:00 2001 From: Kaarel Moppel Date: Fri, 29 Oct 2021 18:03:59 +0300 Subject: [PATCH] Gatherer - full redesign of connection pooling, relying on *sqlx.DB A new --max-parallel-connections-per-db param to control max simultaneous parallel queries to a monitered DB. --- ENV_VARIABLES.md | 1 + pgwatch2/logparse.go | 4 +- pgwatch2/pgwatch2.go | 503 +++++++++++++++++++++++++++---------------- pgwatch2/prom.go | 10 +- 4 files changed, 324 insertions(+), 194 deletions(-) diff --git a/ENV_VARIABLES.md b/ENV_VARIABLES.md index 818b0763..37eb8923 100644 --- a/ENV_VARIABLES.md +++ b/ENV_VARIABLES.md @@ -65,6 +65,7 @@ NB! Some variables influence multiple components. Command line parameters overri - **PW2_INSTANCE_LEVEL_CACHE_MAX_SECONDS** Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable. Default: 30 - **PW2_DIRECT_OS_STATS** Extract OS related psutil statistics not via PL/Python wrappers but directly on host, i.e. assumes "push" setup. Default: off. - **PW2_MIN_DB_SIZE_MB** Smaller size DBs will be ignored and not monitored until they reach the threshold. Default: 0 (no size-based limiting). +- **PW2_MAX_PARALLEL_CONNECTIONS_PER_DB** Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances. Default: 2 ## Web UI diff --git a/pgwatch2/logparse.go b/pgwatch2/logparse.go index 410d4864..f84923ff 100644 --- a/pgwatch2/logparse.go +++ b/pgwatch2/logparse.go @@ -372,7 +372,7 @@ func tryDetermineLogFolder(mdb MonitoredDatabase) string { sql := `select current_setting('data_directory') as dd, current_setting('log_directory') as ld` log.Infof("[%s] Trying to determine server logs folder via SQL as host_config.logs_glob_path not specified...", mdb.DBUniqueName) - data, err, _ := DBExecReadByDbUniqueName(mdb.DBUniqueName, "", false, 0, sql) + data, err, _ := DBExecReadByDbUniqueName(mdb.DBUniqueName, "", 0, sql) if err != nil { log.Errorf("[%s] Failed to query data_directory and log_directory settings...are you superuser or have pg_monitor grant?", mdb.DBUniqueName) return "" @@ -390,7 +390,7 @@ func tryDetermineLogMessagesLanguage(mdb MonitoredDatabase) string { sql := `select current_setting('lc_messages')::varchar(2) as lc_messages;` log.Debugf("[%s] Trying to determine server log messages language...", mdb.DBUniqueName) - data, err, _ := DBExecReadByDbUniqueName(mdb.DBUniqueName, "", false, 0, sql) + data, err, _ := DBExecReadByDbUniqueName(mdb.DBUniqueName, "", 0, sql) if err != nil { log.Errorf("[%s] Failed to lc_messages settings: %s", mdb.DBUniqueName, err) return "" diff --git a/pgwatch2/pgwatch2.go b/pgwatch2/pgwatch2.go index 9f3bbe49..4c7174c0 100644 --- a/pgwatch2/pgwatch2.go +++ b/pgwatch2/pgwatch2.go @@ -217,9 +217,8 @@ const DATASTORE_POSTGRES = "postgres" const DATASTORE_PROMETHEUS = "prometheus" const PRESET_CONFIG_YAML_FILE = "preset-configs.yaml" const FILE_BASED_METRIC_HELPERS_DIR = "00_helpers" -const PG_CONN_RECYCLE_SECONDS = 1800 // applies for monitored nodes -const APPLICATION_NAME = "pgwatch2" // will be set on all opened PG connections for informative purposes -const MAX_PG_CONNECTIONS_PER_MONITORED_DB = 2 // for limiting max concurrent queries on a single DB, sql.DB maxPoolSize cannot be fully trusted +const PG_CONN_RECYCLE_SECONDS = 1800 // applies for monitored nodes +const APPLICATION_NAME = "pgwatch2" // will be set on all opened PG connections for informative purposes const GATHERER_STATUS_START = "START" const GATHERER_STATUS_STOP = "STOP" const METRICDB_IDENT = "metricDb" @@ -252,6 +251,7 @@ const METRIC_PSUTIL_MEM = "psutil_mem" const DEFAULT_METRICS_DEFINITION_PATH_PKG = "/etc/pgwatch2/metrics" // prebuilt packages / Docker default location const DEFAULT_METRICS_DEFINITION_PATH_DOCKER = "/pgwatch2/metrics" // prebuilt packages / Docker default location const DB_SIZE_CACHING_INTERVAL = 10 * time.Minute +const DB_METRIC_JOIN_STR = "¤¤¤" // just some unlikely string for a DB name to avoid using maps of maps for DB+metric data var dbTypeMap = map[string]bool{DBTYPE_PG: true, DBTYPE_PG_CONT: true, DBTYPE_BOUNCER: true, DBTYPE_PATRONI: true, DBTYPE_PATRONI_CONT: true, DBTYPE_PGPOOL: true, DBTYPE_PATRONI_NAMESPACE_DISCOVERY: true} var dbTypes = []string{DBTYPE_PG, DBTYPE_PG_CONT, DBTYPE_BOUNCER, DBTYPE_PATRONI, DBTYPE_PATRONI_CONT, DBTYPE_PATRONI_NAMESPACE_DISCOVERY} // used for informational purposes @@ -273,8 +273,6 @@ var monitored_db_cache map[string]MonitoredDatabase var monitored_db_cache_lock sync.RWMutex var monitored_db_conn_cache map[string]*sqlx.DB = make(map[string]*sqlx.DB) var monitored_db_conn_cache_lock = sync.RWMutex{} -var db_conn_limiting_channel = make(map[string](chan bool)) -var db_conn_limiting_channel_lock = sync.RWMutex{} var last_sql_fetch_error sync.Map var influx_host_count = 1 var InfluxConnectStrings [2]string // Max. 2 Influx metrics stores currently supported @@ -325,12 +323,19 @@ var prevCPULoadTimeStats cpu.TimesStat var prevCPULoadTimestamp time.Time // Async Prom cache -var promAsyncMetricCache = make(map[string][]MetricStoreMessage) // [dbUnique+metric]lastly_fetched_data +var promAsyncMetricCache = make(map[string]map[string][]MetricStoreMessage) // [dbUnique][metric]lastly_fetched_data var promAsyncMetricCacheLock = sync.RWMutex{} var promAsyncMode = false var lastDBSizeMB = make(map[string]int64) var lastDBSizeFetchTime = make(map[string]time.Time) // cached for DB_SIZE_CACHING_INTERVAL var lastDBSizeCheckLock sync.RWMutex +var mainLoopInitialized int32 // 0/1 + +var prevLoopMonitoredDBs []MonitoredDatabase // to be able to detect DBs removed from config +var undersizedDBs = make(map[string]bool) // DBs below the --min-db-size-mb limit, if set +var undersizedDBsLock = sync.RWMutex{} +var recoveryIgnoredDBs = make(map[string]bool) // DBs in recovery state and OnlyIfMaster specified in config +var recoveryIgnoredDBsLock = sync.RWMutex{} func IsPostgresDBType(dbType string) bool { if dbType == DBTYPE_BOUNCER || dbType == DBTYPE_PGPOOL { @@ -485,6 +490,97 @@ func InitAndTestMetricStoreConnection(connStr string, failOnErr bool) error { return nil } +// every DB under monitoring should have exactly 1 sql.DB connection assigned, that will internally limit parallel access +func InitSqlConnPoolForMonitoredDBIfNil(md MonitoredDatabase) error { + monitored_db_conn_cache_lock.Lock() + defer monitored_db_conn_cache_lock.Unlock() + + conn, ok := monitored_db_conn_cache[md.DBUniqueName] + if ok && conn != nil { + return nil + } + + if md.DBType == DBTYPE_BOUNCER { + md.DBName = "pgbouncer" + } + + conn, err := GetPostgresDBConnection(md.LibPQConnStr, md.Host, md.Port, md.DBName, md.User, md.Password, + md.SslMode, md.SslRootCAPath, md.SslClientCertPath, md.SslClientKeyPath) + if err != nil { + return err + } + + if useConnPooling { + conn.SetMaxIdleConns(opts.MaxParallelConnectionsPerDb) + } else { + conn.SetMaxIdleConns(0) + } + conn.SetMaxOpenConns(opts.MaxParallelConnectionsPerDb) + // recycling periodically makes sense as long sessions might bloat memory or maybe conn info (password) was changed + conn.SetConnMaxLifetime(time.Second * time.Duration(PG_CONN_RECYCLE_SECONDS)) + + monitored_db_conn_cache[md.DBUniqueName] = conn + log.Debugf("[%s] Connection pool initialized with max %d parallel connections. Conn pooling: %v", md.DBUniqueName, opts.MaxParallelConnectionsPerDb, useConnPooling) + + return nil +} + +func CloseOrLimitSqlConnPoolForMonitoredDBIfAny(dbUnique string) { + monitored_db_conn_cache_lock.Lock() + defer monitored_db_conn_cache_lock.Unlock() + + conn, ok := monitored_db_conn_cache[dbUnique] + if !ok || conn == nil { + return + } + + if (IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique)) && useConnPooling { + + s := conn.Stats() + if s.MaxOpenConnections > 1 { + log.Debugf("[%s] Limiting SQL connection pool to max 1 connection due to dormant state ...", dbUnique) + conn.SetMaxIdleConns(1) + conn.SetMaxOpenConns(1) + } + + } else { // removed from config + log.Debugf("[%s] Closing SQL connection pool ...", dbUnique) + err := conn.Close() + if err != nil { + log.Error("[%s] Failed to close connection pool to %s nicely. Err: %v", dbUnique, err) + } + delete(monitored_db_conn_cache, dbUnique) + } +} + +func RestoreSqlConnPoolLimitsForPreviouslyDormantDB(dbUnique string) { + if !useConnPooling { + return + } + monitored_db_conn_cache_lock.Lock() + defer monitored_db_conn_cache_lock.Unlock() + + conn, ok := monitored_db_conn_cache[dbUnique] + if !ok || conn == nil { + log.Error("DB conn to re-instate pool limits not found, should not happen") + return + } + + log.Debugf("[%s] Re-instating SQL connection pool max connections ...", dbUnique) + + conn.SetMaxIdleConns(opts.MaxParallelConnectionsPerDb) + conn.SetMaxOpenConns(opts.MaxParallelConnectionsPerDb) + +} + +func InitPGVersionInfoFetchingLockIfNil(md MonitoredDatabase) { + db_pg_version_map_lock.Lock() + if _, ok := db_get_pg_version_map_lock[md.DBUniqueName]; !ok { + db_get_pg_version_map_lock[md.DBUniqueName] = sync.RWMutex{} + } + db_pg_version_map_lock.Unlock() +} + func DBExecRead(conn *sqlx.DB, host_ident, sql string, args ...interface{}) ([](map[string]interface{}), error) { ret := make([]map[string]interface{}, 0) var rows *sqlx.Rows @@ -497,21 +593,8 @@ func DBExecRead(conn *sqlx.DB, host_ident, sql string, args ...interface{}) ([]( rows, err = conn.Queryx(sql, args...) if err != nil { - if !(host_ident == METRICDB_IDENT || host_ident == CONFIGDB_IDENT) { - if conn != nil { - conn.Close() - } - monitored_db_conn_cache_lock.Lock() - defer monitored_db_conn_cache_lock.Unlock() - if _, ok := monitored_db_conn_cache[host_ident]; ok { - // do not overwrite new already reopened pool (after conn.Close) - if monitored_db_conn_cache[host_ident] == conn { - monitored_db_conn_cache[host_ident] = nil - } - } - // connection problems or bad queries etc are quite common so caller should decide if to output something - log.Debug("failed to query", host_ident, "sql:", sql, "err:", err) - } + // connection problems or bad queries etc are quite common so caller should decide if to output something + log.Debug("failed to query", host_ident, "sql:", sql, "err:", err) return nil, err } defer rows.Close() @@ -533,13 +616,13 @@ func DBExecRead(conn *sqlx.DB, host_ident, sql string, args ...interface{}) ([]( return ret, err } -func DBExecReadByDbUniqueName(dbUnique, metricName string, useCache bool, stmtTimeoutOverride int64, sql string, args ...interface{}) ([](map[string]interface{}), error, time.Duration) { +func DBExecReadByDbUniqueName(dbUnique, metricName string, stmtTimeoutOverride int64, sql string, args ...interface{}) ([](map[string]interface{}), error, time.Duration) { var conn *sqlx.DB - var libPQConnStr string - var exists bool var md MonitoredDatabase var err error var duration time.Duration + var exists bool + var sqlStmtTimeout string if strings.TrimSpace(sql) == "" { return nil, errors.New("empty SQL"), duration @@ -550,73 +633,13 @@ func DBExecReadByDbUniqueName(dbUnique, metricName string, useCache bool, stmtTi return nil, err, duration } - db_conn_limiting_channel_lock.RLock() - conn_limit_channel, ok := db_conn_limiting_channel[dbUnique] - db_conn_limiting_channel_lock.RUnlock() - if !ok { - log.Fatal("db_conn_limiting_channel not initialized for ", dbUnique) - } - - //log.Debugf("Waiting for SQL token [%s:%s]...", msg.DBUniqueName, msg.MetricName) - token := <-conn_limit_channel - defer func() { - conn_limit_channel <- token - }() - - libPQConnStr = md.LibPQConnStr - if opts.AdHocConnString != "" { - libPQConnStr = opts.AdHocConnString - } - - if !useCache { - if md.DBType == DBTYPE_BOUNCER { - md.DBName = "pgbouncer" - } - - conn, err = GetPostgresDBConnection(libPQConnStr, md.Host, md.Port, md.DBName, md.User, md.Password, - md.SslMode, md.SslRootCAPath, md.SslClientCertPath, md.SslClientKeyPath) - if err != nil { - return nil, err, duration - } - defer conn.Close() - - } else { - var dbStats go_sql.DBStats - monitored_db_conn_cache_lock.RLock() - conn, exists = monitored_db_conn_cache[dbUnique] - monitored_db_conn_cache_lock.RUnlock() - if conn != nil { - dbStats = conn.Stats() - } - - if !exists || conn == nil || dbStats.OpenConnections == 0 { - - if md.DBType == DBTYPE_BOUNCER { - md.DBName = "pgbouncer" - } - - // only one metric can create connection in cache at a time - monitored_db_conn_cache_lock.Lock() - // recheck after lock acquire, pool can already be created - conn, exists = monitored_db_conn_cache[dbUnique] - if !exists || conn == nil { - conn, err = GetPostgresDBConnection(libPQConnStr, md.Host, md.Port, md.DBName, md.User, md.Password, - md.SslMode, md.SslRootCAPath, md.SslClientCertPath, md.SslClientKeyPath) - if err != nil { - monitored_db_conn_cache_lock.Unlock() - return nil, err, duration - } - - conn.SetMaxIdleConns(1) - conn.SetMaxOpenConns(MAX_PG_CONNECTIONS_PER_MONITORED_DB) - // recycling periodically makes sense as long sessions might bloat memory or maybe conn info (password) was changed - conn.SetConnMaxLifetime(time.Second * time.Duration(PG_CONN_RECYCLE_SECONDS)) - - monitored_db_conn_cache[dbUnique] = conn - } - monitored_db_conn_cache_lock.Unlock() - } - + monitored_db_conn_cache_lock.RLock() + // sqlx.DB itself is parallel safe + conn, exists = monitored_db_conn_cache[dbUnique] + monitored_db_conn_cache_lock.RUnlock() + if !exists || conn == nil { + log.Errorf("SQL connection for dbUnique %s not found or nil", dbUnique) // Should always be initialized in the main loop DB discovery code ... + return nil, errors.New("SQL connection not found or nil"), duration } if !adHocMode && IsPostgresDBType(md.DBType) { @@ -625,7 +648,7 @@ func DBExecReadByDbUniqueName(dbUnique, metricName string, useCache bool, stmtTi stmtTimeout = stmtTimeoutOverride } if stmtTimeout > 0 { // 0 = don't change, use DB level settings - _, err = DBExecRead(conn, dbUnique, fmt.Sprintf("SET statement_timeout TO '%ds'", stmtTimeout)) + sqlStmtTimeout = fmt.Sprintf("SET statement_timeout TO '%ds';", stmtTimeout) // bundle with SQL to avoid transaction round-trip time } if err != nil { atomic.AddUint64(&totalMetricFetchFailuresCounter, 1) @@ -634,7 +657,7 @@ func DBExecReadByDbUniqueName(dbUnique, metricName string, useCache bool, stmtTi } t1 := time.Now() - data, err := DBExecRead(conn, dbUnique, sql, args...) + data, err := DBExecRead(conn, dbUnique, sqlStmtTimeout+sql, args...) t2 := time.Now() if err != nil { atomic.AddUint64(&totalMetricFetchFailuresCounter, 1) @@ -1727,17 +1750,15 @@ func GetMonitoredDatabaseByUniqueName(name string) (MonitoredDatabase, error) { } func UpdateMonitoredDBCache(data []MonitoredDatabase) { - if len(data) > 0 { - monitored_db_cache_new := make(map[string]MonitoredDatabase) + monitored_db_cache_new := make(map[string]MonitoredDatabase) - for _, row := range data { - monitored_db_cache_new[row.DBUniqueName] = row - } - - monitored_db_cache_lock.Lock() - monitored_db_cache = monitored_db_cache_new - monitored_db_cache_lock.Unlock() + for _, row := range data { + monitored_db_cache_new[row.DBUniqueName] = row } + + monitored_db_cache_lock.Lock() + monitored_db_cache = monitored_db_cache_new + monitored_db_cache_lock.Unlock() } func ProcessRetryQueue(data_source, conn_str, conn_ident string, retry_queue *list.List, limit int) error { @@ -1900,9 +1921,7 @@ func MetricsPersister(data_store string, storage_ch <-chan []MetricStoreMessage) continue } msg := msg_arr[0] - promAsyncMetricCacheLock.Lock() - promAsyncMetricCache[msg.DBUniqueName+msg.MetricName] = msg_arr - promAsyncMetricCacheLock.Unlock() + PromAsyncCacheAddMetricData(msg.DBUniqueName, msg.MetricName, msg_arr) log.Infof("[%s:%s] Added %d rows to Prom cache", msg.DBUniqueName, msg.MetricName, len(msg.Data)) } else if data_store == DATASTORE_INFLUX { err = SendToInflux(InfluxConnectStrings[i], strconv.Itoa(i), msg_arr) @@ -1987,7 +2006,7 @@ func DBGetSizeMB(dbUnique string) (int64, error) { if !ok || lastDBSizeCheckTime.Add(DB_SIZE_CACHING_INTERVAL).Before(time.Now()) { log.Debugf("[%s] determining DB size ...", dbUnique) - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 60, sql_db_size) // can take some time on ancient FS, use 60s stmt timeout + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 60, sql_db_size) // can take some time on ancient FS, use 60s stmt timeout if err != nil { log.Errorf("[%s] failed to determine DB size...cannot apply --min-db-size-mb flag. err: %v ...", dbUnique, err) return 0, err @@ -2022,13 +2041,14 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE sql_extensions := `select /* pgwatch2_generated */ extname::text, (regexp_matches(extversion, $$\d+\.?\d+?$$))[1]::text as extversion from pg_extension order by 1;` pgpool_version := `SHOW POOL_VERSION` // supported from pgpool2 v3.0 - db_pg_version_map_lock.RLock() + db_pg_version_map_lock.Lock() get_ver_lock, ok := db_get_pg_version_map_lock[dbUnique] if !ok { - log.Fatal("db_get_pg_version_map_lock uninitialized") + db_get_pg_version_map_lock[dbUnique] = sync.RWMutex{} + get_ver_lock = db_get_pg_version_map_lock[dbUnique] } ver, ok = db_pg_version_map[dbUnique] - db_pg_version_map_lock.RUnlock() + db_pg_version_map_lock.Unlock() if !noCache && ok && ver.LastCheckedOn.After(time.Now().Add(time.Minute*-2)) { // use cached version for 2 min //log.Debugf("using cached postgres version %s for %s", ver.Version.String(), dbUnique) @@ -2043,7 +2063,7 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE } if dbType == DBTYPE_BOUNCER { - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", false, 0, "show version") + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, "show version") if err != nil { return verNew, err } @@ -2061,7 +2081,7 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE verNew.Version, _ = decimal.NewFromString(matches[0]) } } else if dbType == DBTYPE_PGPOOL { - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", false, 0, pgpool_version) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, pgpool_version) if err != nil { return verNew, err } @@ -2078,12 +2098,12 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE verNew.Version, _ = decimal.NewFromString(matches[0]) } } else { - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, sql) if err != nil { if noCache { return ver, err } else { - log.Info("DBGetPGVersion failed, using old cached value", err) + log.Info("DBGetPGVersion failed, using old cached value. err:", err) return ver, nil } } @@ -2094,14 +2114,14 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE if verNew.Version.GreaterThanOrEqual(decimal.NewFromFloat(10)) && addSystemIdentifier { log.Debugf("[%s] determining system identifier version (pg ver: %v)", dbUnique, verNew.VersionStr) - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, sql_sysid) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, sql_sysid) if err == nil && len(data) > 0 { verNew.SystemIdentifier = data[0]["system_identifier"].(string) } } log.Debugf("[%s] determining if monitoring user is a superuser...", dbUnique) - data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, sql_su) + data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", 0, sql_su) if err == nil { verNew.IsSuperuser = data[0]["rolsuper"].(bool) } @@ -2109,7 +2129,7 @@ func DBGetPGVersion(dbUnique string, dbType string, noCache bool) (DBVersionMapE if verNew.Version.GreaterThanOrEqual(MinExtensionInfoAvailable) { //log.Debugf("[%s] determining installed extensions info...", dbUnique) - data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, sql_extensions) + data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", 0, sql_extensions) if err != nil { log.Errorf("[%s] failed to determine installed extensions info: %v", dbUnique, err) } else { @@ -2242,14 +2262,14 @@ func DetectSprocChanges(dbUnique string, vme DBVersionMapEntry, storage_ch chan< return change_counts } - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "sproc_hashes", useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "sproc_hashes", mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) if err != nil { log.Error("could not read sproc_hashes from monitored host: ", dbUnique, ", err:", err) return change_counts } for _, dr := range data { - obj_ident := dr["tag_sproc"].(string) + ":" + dr["tag_oid"].(string) + obj_ident := dr["tag_sproc"].(string) + DB_METRIC_JOIN_STR + dr["tag_oid"].(string) prev_hash, ok := host_state["sproc_hashes"][obj_ident] if ok { // we have existing state if prev_hash != dr["md5"].(string) { @@ -2275,12 +2295,12 @@ func DetectSprocChanges(dbUnique string, vme DBVersionMapEntry, storage_ch chan< // turn resultset to map => [oid]=true for faster checks current_oid_map := make(map[string]bool) for _, dr := range data { - current_oid_map[dr["tag_sproc"].(string)+":"+dr["tag_oid"].(string)] = true + current_oid_map[dr["tag_sproc"].(string)+DB_METRIC_JOIN_STR+dr["tag_oid"].(string)] = true } for sproc_ident := range host_state["sproc_hashes"] { _, ok := current_oid_map[sproc_ident] if !ok { - splits := strings.Split(sproc_ident, ":") + splits := strings.Split(sproc_ident, DB_METRIC_JOIN_STR) log.Info("detected delete of sproc:", splits[0], ", oid:", splits[1]) influx_entry := make(map[string]interface{}) influx_entry["event"] = "drop" @@ -2328,7 +2348,7 @@ func DetectTableChanges(dbUnique string, vme DBVersionMapEntry, storage_ch chan< return change_counts } - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "table_hashes", useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "table_hashes", mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) if err != nil { log.Error("could not read table_hashes from monitored host:", dbUnique, ", err:", err) return change_counts @@ -2414,7 +2434,7 @@ func DetectIndexChanges(dbUnique string, vme DBVersionMapEntry, storage_ch chan< return change_counts } - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "index_hashes", useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "index_hashes", mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) if err != nil { log.Error("could not read index_hashes from monitored host:", dbUnique, ", err:", err) return change_counts @@ -2499,7 +2519,7 @@ func DetectPrivilegeChanges(dbUnique string, vme DBVersionMapEntry, storage_ch c } // returns rows of: object_type, tag_role, tag_object, privilege_type - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "privilege_changes", useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "privilege_changes", mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) if err != nil { log.Errorf("[%s][%s] failed to fetch object privileges info: %v", dbUnique, SPECIAL_METRIC_CHANGE_EVENTS, err) return change_counts @@ -2577,7 +2597,7 @@ func DetectConfigurationChanges(dbUnique string, vme DBVersionMapEntry, storage_ return change_counts } - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "configuration_hashes", useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "configuration_hashes", mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) if err != nil { log.Errorf("[%s][%s] could not read configuration_hashes from monitored host: %v", dbUnique, SPECIAL_METRIC_CHANGE_EVENTS, err) return change_counts @@ -2648,7 +2668,7 @@ func GetRecommendations(dbUnique string, vme DBVersionMapEntry) ([]map[string]in log.Debugf("Processing %d recommendation metrics for \"%s\"", len(reco_metrics), dbUnique) for m, mvp := range reco_metrics { - data, err, duration := DBExecReadByDbUniqueName(dbUnique, m, useConnPooling, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) + data, err, duration := DBExecReadByDbUniqueName(dbUnique, m, mvp.MetricAttrs.StatementTimeoutSeconds, mvp.Sql) total_duration += duration if err != nil { if strings.Contains(err.Error(), "does not exist") { // some more exotic extensions missing is expected, don't pollute the error log @@ -2774,7 +2794,7 @@ func FetchMetricsPgpool(msg MetricFetchMessage, vme DBVersionMapEntry, mvp Metri for _, sql := range sql_lines { if strings.HasPrefix(sql, "SHOW POOL_NODES") { - data, err, dur := DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, useConnPooling, 0, sql) + data, err, dur := DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, 0, sql) duration = duration + dur if err != nil { log.Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err) @@ -2829,7 +2849,7 @@ func FetchMetricsPgpool(msg MetricFetchMessage, vme DBVersionMapEntry, mvp Metri continue } - data, err, dur := DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, useConnPooling, 0, sql) + data, err, dur := DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, 0, sql) duration = duration + dur if err != nil { log.Errorf("[%s][%s] Could not fetch PgPool statistics: %v", msg.DBUniqueName, msg.MetricName, err) @@ -2886,10 +2906,10 @@ func FetchMetrics(msg MetricFetchMessage, host_state map[string]map[string]strin mvp, err := GetMetricVersionProperties(msg.MetricName, vme, nil) if err != nil && msg.MetricName != RECO_METRIC_NAME { - epoch, ok := last_sql_fetch_error.Load(msg.MetricName + ":" + db_pg_version.String()) + epoch, ok := last_sql_fetch_error.Load(msg.MetricName + DB_METRIC_JOIN_STR + db_pg_version.String()) if !ok || ((time.Now().Unix() - epoch.(int64)) > 3600) { // complain only 1x per hour log.Infof("Failed to get SQL for metric '%s', version '%s': %v", msg.MetricName, vme.VersionStr, err) - last_sql_fetch_error.Store(msg.MetricName+":"+db_pg_version.String(), time.Now().Unix()) + last_sql_fetch_error.Store(msg.MetricName+DB_METRIC_JOIN_STR+db_pg_version.String(), time.Now().Unix()) } if strings.Contains(err.Error(), "too old") { return nil, nil @@ -2931,7 +2951,7 @@ retry_with_superuser_sql: // if 1st fetch with normal SQL fails, try with SU SQL } else if msg.DBType == DBTYPE_PGPOOL { data, _, duration = FetchMetricsPgpool(msg, vme, mvp) } else { - data, err, duration = DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, useConnPooling, msg.StmtTimeoutOverride, sql) + data, err, duration = DBExecReadByDbUniqueName(msg.DBUniqueName, msg.MetricName, msg.StmtTimeoutOverride, sql) if err != nil { // let's soften errors to "info" from functions that expect the server to be a primary to reduce noise @@ -3034,8 +3054,13 @@ func ClearDBUnreachableStateIfAny(dbUnique string) { func PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, metric string) { if promAsyncMode { promAsyncMetricCacheLock.Lock() - delete(promAsyncMetricCache, dbUnique+metric) - promAsyncMetricCacheLock.Unlock() + defer promAsyncMetricCacheLock.Unlock() + + if metric == "" { + delete(promAsyncMetricCache, dbUnique) // whole host removed from config + } else { + delete(promAsyncMetricCache[dbUnique], metric) + } } } @@ -3806,7 +3831,7 @@ retry: func DoesFunctionExists(dbUnique, functionName string) bool { log.Debug("Checking for function existence", dbUnique, functionName) sql := fmt.Sprintf("select /* pgwatch2_generated */ 1 from pg_proc join pg_namespace n on pronamespace = n.oid where proname = '%s' and n.nspname = 'public'", functionName) - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, sql) if err != nil { log.Error("Failed to check for function existence", dbUnique, functionName, err) return false @@ -3847,7 +3872,7 @@ func TryCreateMetricsFetchingHelpers(dbUnique string) error { log.Warning("Could not find query text for", dbUnique, helperName) continue } - _, err, _ = DBExecReadByDbUniqueName(dbUnique, "", useConnPooling, 0, mvp.Sql) + _, err, _ = DBExecReadByDbUniqueName(dbUnique, "", 0, mvp.Sql) if err != nil { log.Warning("Failed to create a metric fetching helper for", dbUnique, helperName) log.Warning(err) @@ -3879,7 +3904,7 @@ func TryCreateMetricsFetchingHelpers(dbUnique string) error { log.Warning("Could not find query text for", dbUnique, metric) continue } - _, err, _ = DBExecReadByDbUniqueName(dbUnique, "", true, 0, mvp.Sql) + _, err, _ = DBExecReadByDbUniqueName(dbUnique, "", 0, mvp.Sql) if err != nil { log.Warning("Failed to create a metric fetching helper for", dbUnique, metric) log.Warning(err) @@ -4617,7 +4642,7 @@ func GetGoPsutilDiskPG(dbUnique string) ([]map[string]interface{}, error) { sqlTS := `select spcname::text as name, pg_catalog.pg_tablespace_location(oid) as location from pg_catalog.pg_tablespace where not spcname like any(array[E'pg\\_%'])` var ddDevice, ldDevice, walDevice uint64 - data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", false, 0, sql) + data, err, _ := DBExecReadByDbUniqueName(dbUnique, "", 0, sql) if err != nil || len(data) == 0 { log.Errorf("Failed to determine relevant PG disk paths via SQL: %v", err) return nil, err @@ -4705,7 +4730,7 @@ func GetGoPsutilDiskPG(dbUnique string) ([]map[string]interface{}, error) { } } - data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", false, 0, sqlTS) + data, err, _ = DBExecReadByDbUniqueName(dbUnique, "", 0, sqlTS) if err != nil { log.Infof("Failed to determine relevant PG tablespace paths via SQL: %v", err) } else if len(data) > 0 { @@ -4758,15 +4783,98 @@ func GetLoadAvgLocal() ([]map[string]interface{}, error) { } func shouldDbBeMonitoredBasedOnCurrentState(md MonitoredDatabase) bool { - vme, err := DBGetPGVersion(md.DBUniqueName, md.DBType, false) - if err == nil && vme.IsInRecovery && md.OnlyIfMaster { - return false + return !IsDBDormant(md.DBUniqueName) +} + +func ControlChannelsMapToList(control_channels map[string]chan ControlMessage) []string { + control_channel_list := make([]string, len(control_channels)) + i := 0 + for key := range control_channels { + control_channel_list[i] = key + i++ } - dbSize, err := DBGetSizeMB(md.DBUniqueName) - if err == nil && dbSize < opts.MinDbSizeMB { - return false + return control_channel_list +} + +func DoCloseResourcesForRemovedMonitoredDBIfAny(dbUnique string) { + + CloseOrLimitSqlConnPoolForMonitoredDBIfAny(dbUnique) + + PurgeMetricsFromPromAsyncCacheIfAny(dbUnique, "") +} + +func CloseResourcesForRemovedMonitoredDBs(currentDBs, prevLoopDBs []MonitoredDatabase, shutDownDueToRoleChange map[string]bool) { + var curDBsMap = make(map[string]bool) + + for _, curDB := range currentDBs { + curDBsMap[curDB.DBUniqueName] = true + } + + for _, prevDB := range prevLoopDBs { + if _, ok := curDBsMap[prevDB.DBUniqueName]; !ok { // removed from config + DoCloseResourcesForRemovedMonitoredDBIfAny(prevDB.DBUniqueName) + } + } + + // or to be ignored due to current instance state + for roleChangedDB := range shutDownDueToRoleChange { + DoCloseResourcesForRemovedMonitoredDBIfAny(roleChangedDB) } - return true +} + +func PromAsyncCacheInitIfRequired(dbUnique, metric string) { // cache structure: [dbUnique][metric]lastly_fetched_data + if opts.Datastore == DATASTORE_PROMETHEUS && promAsyncMode { + promAsyncMetricCacheLock.Lock() + defer promAsyncMetricCacheLock.Unlock() + if _, ok := promAsyncMetricCache[dbUnique]; !ok { + metricMap := make(map[string][]MetricStoreMessage) + promAsyncMetricCache[dbUnique] = metricMap + } + } +} + +func PromAsyncCacheAddMetricData(dbUnique, metric string, msgArr []MetricStoreMessage) { // cache structure: [dbUnique][metric]lastly_fetched_data + promAsyncMetricCacheLock.Lock() + defer promAsyncMetricCacheLock.Unlock() + if _, ok := promAsyncMetricCache[dbUnique]; ok { + promAsyncMetricCache[dbUnique][metric] = msgArr + } +} + +func SetUndersizedDBState(dbUnique string, state bool) { + undersizedDBsLock.Lock() + undersizedDBs[dbUnique] = state + undersizedDBsLock.Unlock() +} + +func IsDBUndersized(dbUnique string) bool { + undersizedDBsLock.RLock() + defer undersizedDBsLock.RUnlock() + undersized, ok := undersizedDBs[dbUnique] + if ok { + return undersized + } + return false +} + +func SetRecoveryIgnoredDBState(dbUnique string, state bool) { + recoveryIgnoredDBsLock.Lock() + recoveryIgnoredDBs[dbUnique] = state + recoveryIgnoredDBsLock.Unlock() +} + +func IsDBIgnoredBasedOnRecoveryState(dbUnique string) bool { + recoveryIgnoredDBsLock.RLock() + defer recoveryIgnoredDBsLock.RUnlock() + recoveryIgnored, ok := undersizedDBs[dbUnique] + if ok { + return recoveryIgnored + } + return false +} + +func IsDBDormant(dbUnique string) bool { + return IsDBUndersized(dbUnique) || IsDBIgnoredBasedOnRecoveryState(dbUnique) } type Options struct { @@ -4831,6 +4939,7 @@ type Options struct { ServersRefreshLoopSeconds int `long:"servers-refresh-loop-seconds" description:"Sleep time for the main loop" env:"PW2_SERVERS_REFRESH_LOOP_SECONDS" default:"120"` InstanceLevelCacheMaxSeconds int64 `long:"instance-level-cache-max-seconds" description:"Max allowed staleness for instance level metric data shared between DBs of an instance. Affects 'continuous' host types only. Set to 0 to disable" env:"PW2_INSTANCE_LEVEL_CACHE_MAX_SECONDS" default:"30"` MinDbSizeMB int64 `long:"min-db-size-mb" description:"Smaller size DBs will be ignored and not monitored until they reach the threshold." env:"PW2_MIN_DB_SIZE_MB" default:"0"` + MaxParallelConnectionsPerDb int `long:"max-parallel-connections-per-db" description:"Max parallel metric fetches per DB. Note the multiplication effect on multi-DB instances" env:"PW2_MAX_PARALLEL_CONNECTIONS_PER_DB" default:"2"` Version bool `long:"version" description:"Show Git build version and exit" env:"PW2_VERSION"` Ping bool `long:"ping" description:"Try to connect to all configured DB-s, report errors and then exit" env:"PW2_PING"` } @@ -4879,6 +4988,10 @@ func main() { log.Fatal("--servers-refresh-loop-seconds must be greater than 1") } + if opts.MaxParallelConnectionsPerDb < 1 { + log.Fatal("--max-parallel-connections-per-db must be >= 1") + } + if len(opts.InfluxSSLSkipVerify) > 0 { var err error InfluxSkipSSLCertVerify, err = strconv.ParseBool(opts.InfluxSSLSkipVerify) @@ -5172,6 +5285,7 @@ func main() { } first_loop := true + mainLoopCount := 0 var monitored_dbs []MonitoredDatabase var last_metrics_refresh_time int64 var metrics map[string]map[decimal.Decimal]MetricVersionProperties @@ -5180,6 +5294,8 @@ func main() { for { //main loop hostsToShutDownDueToRoleChange := make(map[string]bool) // hosts went from master to standby and have "only if master" set + var control_channel_name_list []string + gatherers_shut_down := 0 if time.Now().Unix()-last_metrics_refresh_time > METRIC_DEFINITION_REFRESH_TIME { //metrics @@ -5246,9 +5362,9 @@ func main() { log.Debugf("Found %d databases to monitor from %d config items...", len(monitored_dbs), len(mc)) } else { if first_loop { - log.Fatalf("Could not read/parse monitoring config from path: %s", opts.Config) + log.Fatalf("Could not read/parse monitoring config from path: %s. err: %v", opts.Config, err) } else { - log.Errorf("Could not read/parse monitoring config from path: %s", opts.Config) + log.Errorf("Could not read/parse monitoring config from path: %s. using last valid config data. err: %v", opts.Config, err) } time.Sleep(time.Second * time.Duration(opts.ServersRefreshLoopSeconds)) continue @@ -5260,7 +5376,7 @@ func main() { if first_loop { log.Fatal("could not fetch active hosts - check config!", err) } else { - log.Error("could not fetch active hosts:", err) + log.Error("could not fetch active hosts, using last valid config data. err:", err) time.Sleep(time.Second * time.Duration(opts.ServersRefreshLoopSeconds)) continue } @@ -5297,38 +5413,26 @@ func main() { db_unique_orig := host.DBUniqueNameOrig db_type := host.DBType metric_config = host.Metrics + wasInstancePreviouslyDormant := IsDBDormant(db_unique) if host.PasswordType == "aes-gcm-256" && len(opts.AesGcmKeyphrase) == 0 && len(opts.AesGcmKeyphraseFile) == 0 { // Warn if any encrypted hosts found but no keyphrase given log.Warningf("Encrypted password type found for host \"%s\", but no decryption keyphrase specified. Use --aes-gcm-keyphrase or --aes-gcm-keyphrase-file params", db_unique) } - db_conn_limiting_channel_lock.RLock() - _, exists := db_conn_limiting_channel[db_unique] - db_conn_limiting_channel_lock.RUnlock() - - if !exists { // new host, initialize DB connection limiting structure - db_conn_limiting_channel_lock.Lock() - db_conn_limiting_channel[db_unique] = make(chan bool, MAX_PG_CONNECTIONS_PER_MONITORED_DB) - i := 0 - for i < MAX_PG_CONNECTIONS_PER_MONITORED_DB { - //log.Debugf("initializing db_conn_limiting_channel %d for [%s]", i, db_unique) - db_conn_limiting_channel[db_unique] <- true - i++ - } - db_conn_limiting_channel_lock.Unlock() - - db_pg_version_map_lock.Lock() - db_get_pg_version_map_lock[db_unique] = sync.RWMutex{} - db_pg_version_map_lock.Unlock() + err := InitSqlConnPoolForMonitoredDBIfNil(host) + if err != nil { + log.Warningf("Could not init SQL connection pool for %s, retrying on next main loop. Err: %v", db_unique, err) + continue } + InitPGVersionInfoFetchingLockIfNil(host) + _, connectFailedSoFar := failedInitialConnectHosts[db_unique] - if !exists || connectFailedSoFar { + if connectFailedSoFar { // idea is not to spwan any runners before we've successfully pinged the DB var err error var ver DBVersionMapEntry - metric_config = make(map[string]float64) if connectFailedSoFar { log.Infof("retrying to connect to uninitialized DB \"%s\"...", db_unique) @@ -5371,12 +5475,19 @@ func main() { } if IsPostgresDBType(host.DBType) { + var DBSizeMB int64 + if opts.MinDbSizeMB >= 8 { // an empty DB is a bit less than 8MB - DBSizeMB, _ := DBGetSizeMB(db_unique) // ignore errors, i.e. only remove from montoring when we're certain it's under the threshold - if DBSizeMB != 0 && DBSizeMB < opts.MinDbSizeMB { - log.Infof("[%s] DB will be ignored due to the --min-db-size-mb filter. Current (up to %v cached) DB size = %d MB", db_unique, DB_SIZE_CACHING_INTERVAL, DBSizeMB) - hostsToShutDownDueToRoleChange[db_unique] = true // for the case when DB size was previosly above the threshold - continue + DBSizeMB, _ = DBGetSizeMB(db_unique) // ignore errors, i.e. only remove from montoring when we're certain it's under the threshold + if DBSizeMB != 0 { + if DBSizeMB < opts.MinDbSizeMB { + log.Infof("[%s] DB will be ignored due to the --min-db-size-mb filter. Current (up to %v cached) DB size = %d MB", db_unique, DB_SIZE_CACHING_INTERVAL, DBSizeMB) + hostsToShutDownDueToRoleChange[db_unique] = true // for the case when DB size was previosly above the threshold + SetUndersizedDBState(db_unique, true) + continue + } else { + SetUndersizedDBState(db_unique, false) + } } } ver, err := DBGetPGVersion(db_unique, host.DBType, false) @@ -5385,6 +5496,7 @@ func main() { if ver.IsInRecovery && host.OnlyIfMaster { log.Infof("[%s] to be removed from monitoring due to 'master only' property and status change", db_unique) hostsToShutDownDueToRoleChange[db_unique] = true + SetRecoveryIgnoredDBState(db_unique, true) continue } else if lastKnownStatusInRecovery != ver.IsInRecovery { if ver.IsInRecovery && len(host.MetricsStandby) > 0 { @@ -5395,16 +5507,24 @@ func main() { log.Warningf("Switching metrics collection for \"%s\" to primary config...", db_unique) metric_config = host.Metrics hostLastKnownStatusInRecovery[db_unique] = false + SetRecoveryIgnoredDBState(db_unique, false) } } } + + if wasInstancePreviouslyDormant && !IsDBDormant(db_unique) { + RestoreSqlConnPoolLimitsForPreviouslyDormantDB(db_unique) + } } - if (opts.Datastore == DATASTORE_PROMETHEUS && !promAsyncMode) || opts.Ping { - continue // don't launch fetching threads + if opts.Ping { + continue // don't launch metric fetching threads } for metric_name := range metric_config { + if opts.Datastore == DATASTORE_PROMETHEUS && !promAsyncMode { + continue // normal (non-async, no background fetching) Prom mode means only per-scrape fetching + } metric := metric_name metric_def_ok := false @@ -5421,7 +5541,7 @@ func main() { metric_def_map_lock.RUnlock() } - var db_metric string = db_unique + ":" + metric + var db_metric string = db_unique + DB_METRIC_JOIN_STR + metric _, ch_ok := control_channels[db_metric] if metric_def_ok && !ch_ok { // initialize a new per db/per metric control channel @@ -5429,6 +5549,7 @@ func main() { host_metric_interval_map[db_metric] = interval log.Infof("starting gatherer for [%s:%s] with interval %v s", db_unique, metric, interval) control_channels[db_metric] = make(chan ControlMessage, 1) + PromAsyncCacheInitIfRequired(db_unique, metric) if opts.BatchingDelayMs > 0 { go MetricGathererLoop(db_unique, db_unique_orig, db_type, metric, metric_config, control_channels[db_metric], buffered_persist_ch) } else { @@ -5457,6 +5578,8 @@ func main() { } } + atomic.StoreInt32(&mainLoopInitialized, 1) // to hold off scraping until metric fetching runners have been initialized + if opts.Ping { if len(failedInitialConnectHosts) > 0 { log.Errorf("Could not reach %d configured DB host out of %d", len(failedInitialConnectHosts), len(monitored_dbs)) @@ -5465,11 +5588,6 @@ func main() { log.Infof("All configured %d DB hosts were reachable", len(monitored_dbs)) os.Exit(0) } - if opts.Datastore == DATASTORE_PROMETHEUS && !promAsyncMode { // special behaviour, no "ahead of time" metric collection - log.Debugf("main sleeping %ds...", opts.ServersRefreshLoopSeconds) - time.Sleep(time.Second * time.Duration(opts.ServersRefreshLoopSeconds)) - continue - } if opts.TestdataDays != 0 { log.Info("Waiting for all metrics generation goroutines to stop ...") @@ -5489,22 +5607,21 @@ func main() { } } + if mainLoopCount == 0 { + goto MainLoopSleep + } + // loop over existing channels and stop workers if DB or metric removed from config + // or state change makes it uninteresting log.Debug("checking if any workers need to be shut down...") - control_channel_list := make([]string, len(control_channels)) - i := 0 - for key := range control_channels { - control_channel_list[i] = key - i++ - } - gatherers_shut_down := 0 + control_channel_name_list = ControlChannelsMapToList(control_channels) - for _, db_metric := range control_channel_list { + for _, db_metric := range control_channel_name_list { var currentMetricConfig map[string]float64 var dbInfo MonitoredDatabase var ok, dbRemovedFromConfig bool singleMetricDisabled := false - splits := strings.Split(db_metric, ":") + splits := strings.Split(db_metric, DB_METRIC_JOIN_STR) db := splits[0] metric := splits[1] //log.Debugf("Checking if need to shut down worker for [%s:%s]...", db, metric) @@ -5551,10 +5668,18 @@ func main() { PurgeMetricsFromPromAsyncCacheIfAny(db, metric) } } + if gatherers_shut_down > 0 { log.Warningf("sent STOP message to %d gatherers (it might take some minutes for them to stop though)", gatherers_shut_down) } + // Destroy conn pools, Prom async cache + CloseResourcesForRemovedMonitoredDBs(monitored_dbs, prevLoopMonitoredDBs, hostsToShutDownDueToRoleChange) + + MainLoopSleep: + mainLoopCount++ + prevLoopMonitoredDBs = monitored_dbs + log.Debugf("main sleeping %ds...", opts.ServersRefreshLoopSeconds) time.Sleep(time.Second * time.Duration(opts.ServersRefreshLoopSeconds)) } diff --git a/pgwatch2/prom.go b/pgwatch2/prom.go index a3849843..b41a47fb 100644 --- a/pgwatch2/prom.go +++ b/pgwatch2/prom.go @@ -50,6 +50,11 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { e.totalScrapes.Add(1) ch <- e.totalScrapes + isInitialized := atomic.LoadInt32(&mainLoopInitialized) + if isInitialized == 0 { + log.Warning("Main loop not yet initialized, not scraping DBs") + return + } monitoredDatabases := getMonitoredDatabasesSnapshot() if len(monitoredDatabases) == 0 { log.Warning("No dbs configured for monitoring. Check config") @@ -82,7 +87,7 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { if promAsyncMode { promAsyncMetricCacheLock.RLock() - metricStoreMessages, ok = promAsyncMetricCache[md.DBUniqueName+metric] + metricStoreMessages, ok = promAsyncMetricCache[md.DBUniqueName][metric] promAsyncMetricCacheLock.RUnlock() if !ok { log.Debugf("[%s:%s] could not find data from the prom cache. maybe gathering interval not yet reached or zero rows returned, ignoring", md.DBUniqueName, metric) @@ -127,12 +132,11 @@ func (e *Exporter) Collect(ch chan<- prometheus.Metric) { func setInstanceUpDownState(ch chan<- prometheus.Metric, md MonitoredDatabase) { log.Debugf("checking availability of configured DB [%s:%s]...", md.DBUniqueName, PROM_INSTANCE_UP_STATE_METRIC) - vme, err := DBGetPGVersion(md.DBUniqueName, md.DBType, true) + vme, err := DBGetPGVersion(md.DBUniqueName, md.DBType, !promAsyncMode) // NB! in async mode 2min cache can mask smaller downtimes! data := make(map[string]interface{}) if err != nil { data[PROM_INSTANCE_UP_STATE_METRIC] = 0 log.Errorf("[%s:%s] could not determine instance version, reporting as 'down': %v", md.DBUniqueName, PROM_INSTANCE_UP_STATE_METRIC, err) - //return } else { data[PROM_INSTANCE_UP_STATE_METRIC] = 1 }