diff --git a/curiosrc/alertmanager/alerts.go b/curiosrc/alertmanager/alerts.go new file mode 100644 index 00000000000..34e8583a237 --- /dev/null +++ b/curiosrc/alertmanager/alerts.go @@ -0,0 +1,288 @@ +package alertmanager + +import ( + "database/sql" + "fmt" + "strings" + + "github.com/BurntSushi/toml" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + + "github.com/filecoin-project/lotus/node/config" +) + +// balanceCheck retrieves the machine details from the database and performs balance checks on unique addresses. +// It populates the alert map with any errors encountered during the process and with any alerts related to low wallet balance and missing wallets. +// The alert map key is "Balance Check". +// It queries the database for the configuration of each layer and decodes it using the toml.Decode function. +// It then iterates over the addresses in the configuration and curates a list of unique addresses. +// If an address is not found in the chain node, it adds an alert to the alert map. +// If the balance of an address is below 5 Fil, it adds an alert to the alert map. +// If there are any errors encountered during the process, the err field of the alert map is populated. +func balanceCheck(al *alerts) { + Name := "Balance Check" + al.alertMap[Name] = &alertOut{} + + // MachineDetails represents the structure of data received from the SQL query. + type machineDetail struct { + ID int + HostAndPort string + Layers string + } + var machineDetails []machineDetail + + // Get all layers in use + err := al.db.Select(al.ctx, &machineDetails, ` + SELECT m.id, m.host_and_port, d.layers + FROM harmony_machines m + LEFT JOIN harmony_machine_details d ON m.id = d.machine_id;`) + if err != nil { + al.alertMap[Name].err = xerrors.Errorf("getting config layers for all machines: %w", err) + return + } + + // UniqueLayers takes an array of MachineDetails and returns a slice of unique layers. + + layerMap := make(map[string]bool) + var uniqueLayers []string + + // Get unique layers in use + for _, machine := range machineDetails { + machine := machine + // Split the Layers field into individual layers + layers := strings.Split(machine.Layers, ",") + for _, layer := range layers { + layer = strings.TrimSpace(layer) + if _, exists := layerMap[layer]; !exists && layer != "" { + layerMap[layer] = true + uniqueLayers = append(uniqueLayers, layer) + } + } + } + + addrMap := make(map[string]bool) + var uniqueAddrs []string + + // Get all unique addresses + for _, layer := range uniqueLayers { + text := "" + cfg := config.DefaultCurioConfig() + err := al.db.QueryRow(al.ctx, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) + if err != nil { + if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { + al.alertMap[Name].err = xerrors.Errorf("missing layer '%s' ", layer) + return + } + al.alertMap[Name].err = fmt.Errorf("could not read layer '%s': %w", layer, err) + return + } + + _, err = toml.Decode(text, cfg) + if err != nil { + al.alertMap[Name].err = fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) + return + } + + for i := range cfg.Addresses { + prec := cfg.Addresses[i].PreCommitControl + com := cfg.Addresses[i].CommitControl + term := cfg.Addresses[i].TerminateControl + if prec != nil { + for j := range prec { + if _, ok := addrMap[prec[j]]; !ok && prec[j] != "" { + addrMap[prec[j]] = true + uniqueAddrs = append(uniqueAddrs, prec[j]) + } + } + } + if com != nil { + for j := range com { + if _, ok := addrMap[com[j]]; !ok && com[j] != "" { + addrMap[com[j]] = true + uniqueAddrs = append(uniqueAddrs, com[j]) + } + } + } + if term != nil { + for j := range term { + if _, ok := addrMap[term[j]]; !ok && term[j] != "" { + addrMap[term[j]] = true + uniqueAddrs = append(uniqueAddrs, term[j]) + } + } + } + } + } + + var ret string + + for _, addrStr := range uniqueAddrs { + addr, err := address.NewFromString(addrStr) + if err != nil { + al.alertMap[Name].err = xerrors.Errorf("failed to parse address: %w", err) + return + } + + has, err := al.api.WalletHas(al.ctx, addr) + if err != nil { + al.alertMap[Name].err = err + return + } + + if !has { + ret += fmt.Sprintf("Wallet %s was not found in chain node. ", addrStr) + } + + balance, err := al.api.WalletBalance(al.ctx, addr) + if err != nil { + al.alertMap[Name].err = err + } + + if abi.TokenAmount(al.cfg.MinimumWalletBalance).GreaterThanEqual(balance) { + ret += fmt.Sprintf("Balance for wallet %s is below 5 Fil. ", addrStr) + } + } + if ret != "" { + al.alertMap[Name].alertString = ret + } + return +} + +// taskFailureCheck retrieves the task failure counts from the database for a specific time period. +// It then checks for specific sealing tasks and tasks with more than 5 failures to generate alerts. +func taskFailureCheck(al *alerts) { + Name := "TaskFailures" + al.alertMap[Name] = &alertOut{} + + type taskFailure struct { + Machine string `db:"completed_by_host_and_port"` + Name string `db:"name"` + Failures int `db:"failed_tasks_count"` + } + + var taskFailures []taskFailure + + err := al.db.Select(al.ctx, &taskFailures, ` + SELECT completed_by_host_and_port, name, COUNT(*) AS failed_count + FROM harmony_task_history + WHERE result = FALSE + AND work_end >= NOW() - $1::interval + GROUP BY completed_by_host_and_port, name + ORDER BY completed_by_host_and_port, name;`, AlertMangerInterval.Minutes()) + if err != nil { + al.alertMap[Name].err = xerrors.Errorf("getting failed task count: %w", err) + return + } + + mmap := make(map[string]int) + tmap := make(map[string]int) + + if len(taskFailures) > 0 { + for _, tf := range taskFailures { + _, ok := tmap[tf.Name] + if !ok { + tmap[tf.Name] = tf.Failures + } else { + tmap[tf.Name] += tf.Failures + } + _, ok = mmap[tf.Machine] + if !ok { + mmap[tf.Machine] = tf.Failures + } else { + mmap[tf.Machine] += tf.Failures + } + } + } + + sealingTasks := []string{"SDR", "TreeD", "TreeRC", "PreCommitSubmit", "PoRep", "Finalize", "MoveStorage", "CommitSubmit", "WdPost", "ParkPiece"} + contains := func(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false + } + + // Alerts for any sealing pipeline failures. Other tasks should have at least 5 failures for an alert + for name, count := range tmap { + if contains(sealingTasks, name) { + al.alertMap[Name].alertString += fmt.Sprintf("Task: %s, Failures: %d. ", name, count) + } + if count > 5 { + al.alertMap[Name].alertString += fmt.Sprintf("Task: %s, Failures: %d. ", name, count) + } + } + + // Alert if a machine failed more than 5 tasks + for name, count := range tmap { + if count > 5 { + al.alertMap[Name].alertString += fmt.Sprintf("Machine: %s, Failures: %d. ", name, count) + } + } + + return +} + +// permanentStorageCheck checks the available storage space for sealing sectors. +// It retrieves the storage paths and the sectors being sealed from the database. +// It calculates the total required storage space by summing the sector sizes of the sectors being sealed. +// It compares the total required space with the total available space. +func permanentStorageCheck(al *alerts) { + Name := "PermanentStorageSpace" + // Get all storage path for permanent storages + type storage struct { + ID string `db:"storage_id"` + Available int64 `db:"available"` + } + + var storages []storage + + err := al.db.Select(al.ctx, &storages, ` + SELECT storage_id, available + FROM storage_path + WHERE can_store = TRUE;`) + if err != nil { + al.alertMap[Name].err = xerrors.Errorf("getting storage details: %w", err) + return + } + + type sector struct { + Miner abi.ActorID `db:"sp_id"` + Number abi.SectorNumber `db:"sector_number"` + Proof abi.RegisteredSealProof `db:"reg_seal_proof"` + } + + var sectors []sector + + err = al.db.Select(al.ctx, §ors, ` + SELECT sp_id, sector_number, reg_seal_proof + FROM sectors_sdr_pipeline + WHERE after_finalize = FALSE;`) + if err != nil { + al.alertMap[Name].err = xerrors.Errorf("getting sectors being sealed: %w", err) + return + } + + var totalRequiredSpace int64 + for _, sec := range sectors { + sec := sec + sectorSize, err := sec.Proof.SectorSize() + if err != nil { + totalRequiredSpace += int64(64 << 30) + } + totalRequiredSpace += int64(sectorSize) + } + + var totalAvailableSpace int64 + for _, storage := range storages { + totalAvailableSpace += storage.Available + } + + if totalAvailableSpace < totalRequiredSpace { + al.alertMap[Name].alertString = fmt.Sprintf("Insufficient storage space for sealing sectors. Required: %d bytes, Available: %d bytes", totalRequiredSpace, totalAvailableSpace) + } +} diff --git a/curiosrc/alertmanager/task_alert.go b/curiosrc/alertmanager/task_alert.go index 4a7f5206a6c..9f8f0c3c944 100644 --- a/curiosrc/alertmanager/task_alert.go +++ b/curiosrc/alertmanager/task_alert.go @@ -1,23 +1,23 @@ +// Nobody associated with this software's development has any business relationship to pagerduty. +// This is provided as a convenient trampoline to SP's alert system of choice. + package alertmanager import ( "bytes" "context" - "database/sql" "encoding/json" "fmt" "io" "net/http" - "strings" "time" - "github.com/BurntSushi/toml" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/lib/harmony/harmonydb" "github.com/filecoin-project/lotus/lib/harmony/harmonytask" @@ -26,14 +26,13 @@ import ( "github.com/filecoin-project/lotus/storage/ctladdr" ) -var MinimumBalance = types.MustParseFIL("5 FIL") - -const AlertMangerInterval = 10 * time.Minute +const AlertMangerInterval = time.Hour var log = logging.Logger("curio/alertmanager") type AlertAPI interface { ctladdr.NodeApi + StateMinerInfo(ctx context.Context, actor address.Address, tsk types.TipSetKey) (api.MinerInfo, error) } type AlertTask struct { @@ -51,6 +50,7 @@ type alerts struct { ctx context.Context api AlertAPI db *harmonydb.DB + cfg config.CurioAlerting alertMap map[string]*alertOut } @@ -75,6 +75,7 @@ type alertFunc func(al *alerts) var alertFuncs = []alertFunc{ balanceCheck, taskFailureCheck, + permanentStorageCheck, } func NewAlertTask(api AlertAPI, db *harmonydb.DB, alertingCfg config.CurioAlerting) *AlertTask { @@ -94,6 +95,7 @@ func (a *AlertTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done ctx: ctx, api: a.api, db: a.db, + cfg: a.cfg, alertMap: alMap, } @@ -176,7 +178,7 @@ func (a *AlertTask) sendAlert(data *pdPayload) error { return fmt.Errorf("error marshaling JSON: %w", err) } - req, err := http.NewRequest("POST", a.cfg.PagerDutyEventURL, bytes.NewBuffer(jsonData)) + req, err := http.NewRequest("POST", a.cfg.PagerDutyEventURL.String(), bytes.NewBuffer(jsonData)) if err != nil { return fmt.Errorf("error creating request: %w", err) } @@ -191,7 +193,7 @@ func (a *AlertTask) sendAlert(data *pdPayload) error { time.Sleep(time.Duration(2*i) * time.Second) // Exponential backoff continue } - defer resp.Body.Close() + defer func() { _ = resp.Body.Close() }() switch resp.StatusCode { case 202: @@ -216,180 +218,3 @@ func (a *AlertTask) sendAlert(data *pdPayload) error { } return fmt.Errorf("after retries, last error: %w", err) } - -// balanceCheck retrieves the machine details from the database and performs balance checks on unique addresses. -// It populates the alert map with any errors encountered during the process and with any alerts related to low wallet balance and missing wallets. -// The alert map key is "Balance Check". -// It queries the database for the configuration of each layer and decodes it using the toml.Decode function. -// It then iterates over the addresses in the configuration and curates a list of unique addresses. -// If an address is not found in the chain node, it adds an alert to the alert map. -// If the balance of an address is below 5 Fil, it adds an alert to the alert map. -// If there are any errors encountered during the process, the err field of the alert map is populated. -func balanceCheck(al *alerts) { - Name := "Balance Check" - al.alertMap[Name] = &alertOut{} - - // MachineDetails represents the structure of data received from the SQL query. - type machineDetail struct { - ID int - HostAndPort string - Layers string - } - var machineDetails []machineDetail - - // Get all layers in use - err := al.db.Select(al.ctx, &machineDetails, ` - SELECT m.id, m.host_and_port, d.layers - FROM harmony_machines m - LEFT JOIN harmony_machine_details d ON m.id = d.machine_id;`) - if err != nil { - al.alertMap[Name].err = xerrors.Errorf("getting config layers for all machines: %w", err) - return - } - - // UniqueLayers takes an array of MachineDetails and returns a slice of unique layers. - - layerMap := make(map[string]bool) - var uniqueLayers []string - - // Get unique layers in use - for _, machine := range machineDetails { - machine := machine - // Split the Layers field into individual layers - layers := strings.Split(machine.Layers, ",") - for _, layer := range layers { - layer = strings.TrimSpace(layer) - if _, exists := layerMap[layer]; !exists && layer != "" { - layerMap[layer] = true - uniqueLayers = append(uniqueLayers, layer) - } - } - } - - addrMap := make(map[string]bool) - var uniqueAddrs []string - - // Get all unique addresses - for _, layer := range uniqueLayers { - text := "" - cfg := config.DefaultCurioConfig() - err := al.db.QueryRow(al.ctx, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - if err != nil { - if strings.Contains(err.Error(), sql.ErrNoRows.Error()) { - al.alertMap[Name].err = xerrors.Errorf("missing layer '%s' ", layer) - return - } - al.alertMap[Name].err = fmt.Errorf("could not read layer '%s': %w", layer, err) - return - } - - _, err = toml.Decode(text, cfg) - if err != nil { - al.alertMap[Name].err = fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) - return - } - - for i := range cfg.Addresses { - prec := cfg.Addresses[i].PreCommitControl - com := cfg.Addresses[i].CommitControl - term := cfg.Addresses[i].TerminateControl - if prec != nil { - for j := range prec { - if _, ok := addrMap[prec[j]]; !ok && prec[j] != "" { - addrMap[prec[j]] = true - uniqueAddrs = append(uniqueAddrs, prec[j]) - } - } - } - if com != nil { - for j := range com { - if _, ok := addrMap[com[j]]; !ok && com[j] != "" { - addrMap[com[j]] = true - uniqueAddrs = append(uniqueAddrs, com[j]) - } - } - } - if term != nil { - for j := range term { - if _, ok := addrMap[term[j]]; !ok && term[j] != "" { - addrMap[term[j]] = true - uniqueAddrs = append(uniqueAddrs, term[j]) - } - } - } - } - } - - var ret string - - for _, addrStr := range uniqueAddrs { - addr, err := address.NewFromString(addrStr) - if err != nil { - al.alertMap[Name].err = xerrors.Errorf("failed to parse address: %w", err) - return - } - - has, err := al.api.WalletHas(al.ctx, addr) - if err != nil { - al.alertMap[Name].err = err - return - } - - if !has { - ret += fmt.Sprintf("Wallet %s was not found in chain node. ", addrStr) - } - - balance, err := al.api.WalletBalance(al.ctx, addr) - if err != nil { - al.alertMap[Name].err = err - } - - if abi.TokenAmount(MinimumBalance).GreaterThanEqual(balance) { - ret += fmt.Sprintf("Balance for wallet %s is below 5 Fil. ", addrStr) - } - } - if ret != "" { - al.alertMap[Name].alertString = ret - } - return -} - -// taskFailureCheck retrieves the count of failed tasks for all machines in the task history table. -// It populates the alert map with the machine name and the number of failures if it exceeds 10. -func taskFailureCheck(al *alerts) { - Name := "TaskFailures" - al.alertMap[Name] = &alertOut{} - - type taskFailure struct { - Machine string `db:"completed_by_host_and_port"` - Failures int `db:"failed_tasks_count"` - } - - var taskFailures []taskFailure - - // Get all layers in use - err := al.db.Select(al.ctx, &taskFailures, ` - SELECT completed_by_host_and_port, COUNT(*) AS failed_tasks_count - FROM harmony_task_history - WHERE result = FALSE - AND work_end >= NOW() - $1::interval - GROUP BY completed_by_host_and_port - ORDER BY failed_tasks_count DESC;`, AlertMangerInterval.Minutes()) - if err != nil { - al.alertMap[Name].err = xerrors.Errorf("getting failed task count for all machines: %w", err) - return - } - - if len(taskFailures) > 0 { - for _, tf := range taskFailures { - if tf.Failures > 10 { - al.alertMap[Name].alertString += fmt.Sprintf("Machine: %s, Failures: %d. ", tf.Machine, tf.Failures) - } - } - } - return -} - -func storageCheck(al *alerts) { - -} diff --git a/curiosrc/gc/storage_endpoint_gc.go b/curiosrc/gc/storage_endpoint_gc.go index 45783f35367..d49c51a1bb3 100644 --- a/curiosrc/gc/storage_endpoint_gc.go +++ b/curiosrc/gc/storage_endpoint_gc.go @@ -209,6 +209,7 @@ func (s *StorageEndpointGC) Do(taskID harmonytask.TaskID, stillOwned func() bool // Remove dead URLs from storage_path entries and handle path cleanup for _, du := range deadURLs { + du := du // Fetch the current URLs for the storage path var URLs string err = tx.QueryRow("SELECT urls FROM storage_path WHERE storage_id = $1", du.StorageID).Scan(&URLs) diff --git a/node/config/def.go b/node/config/def.go index 24ea440574d..f2b70c8dfb6 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -2,6 +2,7 @@ package config import ( "encoding" + "net/url" "os" "strconv" "time" @@ -328,6 +329,14 @@ const ( ResourceFilteringDisabled = ResourceFilteringStrategy("disabled") ) +func PGDutyURL() *url.URL { + ret, err := url.Parse("https://events.pagerduty.com/v2/enqueue") + if err != nil { + return &url.URL{} + } + return ret +} + func DefaultCurioConfig() *CurioConfig { return &CurioConfig{ Subsystems: CurioSubsystemsConfig{ @@ -369,8 +378,9 @@ func DefaultCurioConfig() *CurioConfig { MaxQueuePoRep: 0, // default don't use this limit }, Alerting: CurioAlerting{ - PagerDutyEventURL: "https://events.pagerduty.com/v2/enqueue", + PagerDutyEventURL: PGDutyURL(), PageDutyIntegrationKey: "", + MinimumWalletBalance: types.MustParseFIL("5"), }, } } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index eece466c7cc..1f7358bba07 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -186,10 +186,11 @@ over the worker address if this flag is set.`, "CurioAlerting": { { Name: "PagerDutyEventURL", - Type: "string", + Type: "*url.URL", Comment: `PagerDutyEventURL is URL for PagerDuty's Events API v2 URL. Events sent to this API URL are ultimately -routed to a PagerDuty service and processed.`, +routed to a PagerDuty service and processed. +The default is sufficient for integration with the stock commercial Pager Duty company's service.`, }, { Name: "PageDutyIntegrationKey", @@ -198,6 +199,13 @@ routed to a PagerDuty service and processed.`, Comment: `PageDutyIntegrationKey is the integration key for a pager duty service. You can find this unique service identifier in the integration page for the service.`, }, + { + Name: "MinimumWalletBalance", + Type: "types.FIL", + + Comment: `MinimumWalletBalance is the minimum balance all active wallets. If the balance is below this value, an +alerts will be triggered for the wallet`, + }, }, "CurioConfig": { { diff --git a/node/config/types.go b/node/config/types.go index da56e803be4..149c6b869df 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -1,6 +1,8 @@ package config import ( + "net/url" + "github.com/ipfs/go-cid" "github.com/filecoin-project/lotus/chain/types" @@ -1114,9 +1116,14 @@ type FaultReporterConfig struct { type CurioAlerting struct { // PagerDutyEventURL is URL for PagerDuty's Events API v2 URL. Events sent to this API URL are ultimately // routed to a PagerDuty service and processed. - PagerDutyEventURL string + // The default is sufficient for integration with the stock commercial Pager Duty company's service. + PagerDutyEventURL *url.URL // PageDutyIntegrationKey is the integration key for a pager duty service. You can find this unique service // identifier in the integration page for the service. PageDutyIntegrationKey string + + // MinimumWalletBalance is the minimum balance all active wallets. If the balance is below this value, an + // alerts will be triggered for the wallet + MinimumWalletBalance types.FIL }