diff --git a/pkg/device/sysfs_linux.go b/pkg/device/sysfs_linux.go index 1c3f90e1..a4a306af 100644 --- a/pkg/device/sysfs_linux.go +++ b/pkg/device/sysfs_linux.go @@ -20,8 +20,10 @@ package device import ( "bufio" + "errors" "fmt" "os" + "path" "strconv" "strings" ) @@ -106,3 +108,49 @@ func getHolders(name string) ([]string, error) { func getDMName(name string) (string, error) { return readFirstLine("/sys/class/block/" + name + "/dm/name") } + +// GetStat returns statistics for a given device name. +func GetStat(name string) (stats []uint64, err error) { + line, err := readFirstLine("/sys/class/block/" + name + "/stat") + if err != nil { + return nil, err + } + + for _, token := range strings.Split(line, " ") { + token = strings.TrimSpace(token) + ui64, err := strconv.ParseUint(token, 10, 64) + if err != nil { + return nil, err + } + stats = append(stats, ui64) + } + + return stats, nil +} + +// GetHardwareSectorSize returns hardware sector size of associated drive. +func GetHardwareSectorSize(name string) (uint64, error) { + if _, err := os.Lstat("/sys/block/" + name); err != nil { + if !errors.Is(err, os.ErrNotExist) { + return 0, err + } + + partPath := "/sys/class/block/" + name + if _, err = os.Stat(partPath + "/partition"); err != nil { + return 0, err + } + + linkPath, err := os.Readlink(partPath) + if err != nil { + return 0, err + } + + name = path.Base(path.Dir(linkPath)) + } + + s, err := readFirstLine("/sys/block/" + name + "/queue/hw_sector_size") + if err != nil || s == "" { + return 0, err + } + return strconv.ParseUint(s, 10, 64) +} diff --git a/pkg/metrics/collector.go b/pkg/metrics/collector.go index 2567eabd..d82a8a8d 100644 --- a/pkg/metrics/collector.go +++ b/pkg/metrics/collector.go @@ -18,17 +18,57 @@ package metrics import ( "context" + "fmt" directpvtypes "github.com/minio/directpv/pkg/apis/directpv.min.io/types" "github.com/minio/directpv/pkg/client" "github.com/minio/directpv/pkg/consts" + "github.com/minio/directpv/pkg/device" "github.com/minio/directpv/pkg/sys" "github.com/minio/directpv/pkg/types" + "github.com/minio/directpv/pkg/utils" "github.com/minio/directpv/pkg/xfs" "github.com/prometheus/client_golang/prometheus" "k8s.io/klog/v2" ) +type driveStats struct { + readSectorBytes float64 + readTicks float64 + writeSectorBytes float64 + writeTicks float64 + timeInQueue float64 +} + +func getDriveStats(driveName string) (*driveStats, error) { + stat, err := device.GetStat(driveName) + switch { + case err != nil: + return nil, err + case len(stat) == 0: + return nil, fmt.Errorf("unable to read stat from drive %v", driveName) + case len(stat) < 10: + return nil, fmt.Errorf("invalid stat format from drive %v", driveName) + } + + hardwareSectorSize, err := device.GetHardwareSectorSize(driveName) + switch { + case err != nil: + return nil, err + case hardwareSectorSize == 0: + hardwareSectorSize = 512 // Use default value + } + + // Refer https://www.kernel.org/doc/Documentation/block/stat.txt for meaning of each field. + return &driveStats{ + readSectorBytes: float64(stat[2] * hardwareSectorSize), + readTicks: float64(stat[3]), + writeSectorBytes: float64(stat[6] * hardwareSectorSize), + writeTicks: float64(stat[7]), + timeInQueue: float64(stat[9]), + }, nil +} + type metricsCollector struct { nodeID directpvtypes.NodeID desc *prometheus.Desc @@ -95,21 +135,138 @@ func (c *metricsCollector) publishVolumeStats(ctx context.Context, volume *types ) } +func (c *metricsCollector) publishDriveStats(drive *types.Drive, ch chan<- prometheus.Metric) { + deviceID, err := c.getDeviceByFSUUID(drive.Status.FSUUID) + if err != nil { + klog.ErrorS( + err, + "unable to find device by FSUUID; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + " on the host to reload", + "FSUUID", drive.Status.FSUUID) + client.Eventf( + drive, client.EventTypeWarning, client.EventReasonMetrics, + "unable to find device by FSUUID %v; "+ + "either device is removed or run command "+ + "`sudo udevadm control --reload-rules && sudo udevadm trigger`"+ + " on the host to reload", drive.Status.FSUUID) + + return + } + deviceName := utils.TrimDevPrefix(deviceID) + + status := float64(1) // Online + driveStat, err := getDriveStats(deviceName) + if err != nil { + klog.ErrorS(err, "unable to read drive statistics") + status = float64(0) // Offline + } + + // Metrics + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_ready"), + "Drive Online/Offline Status", + []string{"drive"}, nil), + prometheus.GaugeValue, + status, drive.Name, + ) + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_read"), + "Total number of bytes read from the drive", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.readSectorBytes, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_total_bytes_written"), + "Total number of bytes written to the drive", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.writeSectorBytes, drive.Name, + ) + + // Drive Read/Write Latency + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_read_latency_seconds"), + "Drive Read Latency", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.readTicks/1000, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_write_latency_seconds"), + "Drive Write Latency", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.writeTicks/1000, drive.Name, + ) + + // Drive Read/Write Throughput + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_read_throughput_bytes_per_second"), + "Drive Read Throughput", + []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*driveStat.readSectorBytes/driveStat.readTicks, drive.Name, + ) + + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_write_throughput_bytes_per_second"), + "Drive Write Throughput", + []string{"drive"}, nil), + prometheus.GaugeValue, + 1000*driveStat.writeSectorBytes/driveStat.writeTicks, drive.Name, + ) + + // Wait Time + ch <- prometheus.MustNewConstMetric( + prometheus.NewDesc( + prometheus.BuildFQName(consts.AppName, "stats", "drive_wait_time_seconds"), + "Drive Wait Time", + []string{"drive"}, nil), + prometheus.GaugeValue, + driveStat.timeInQueue/1000, drive.Name, + ) +} + // Collect is called by Prometheus registry when collecting metrics. func (c *metricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancelFunc := context.WithCancel(context.Background()) defer cancelFunc() - resultCh := client.NewVolumeLister(). + // Collecting volume statistics + volumeResultCh := client.NewVolumeLister(). NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}). List(ctx) - for result := range resultCh { + for result := range volumeResultCh { if result.Err != nil { - return + break } if result.Volume.Status.TargetPath != "" { c.publishVolumeStats(ctx, &result.Volume, ch) } } + + // Collecting drive statistics + driveResultCh := client.NewDriveLister(). + NodeSelector([]directpvtypes.LabelValue{directpvtypes.ToLabelValue(string(c.nodeID))}). + List(ctx) + for result := range driveResultCh { + if result.Err != nil { + break + } + + c.publishDriveStats(&result.Drive, ch) + } }