diff --git a/go.mod b/go.mod index 6408fcd6..d0d863cd 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,8 @@ require ( golang.org/x/sync v0.9.0 google.golang.org/grpc v1.67.1 google.golang.org/protobuf v1.35.1 + gorm.io/driver/sqlite v1.5.6 + gorm.io/gorm v1.25.12 k8s.io/apimachinery v0.31.2 k8s.io/helm v2.17.0+incompatible k8s.io/mount-utils v0.31.2 @@ -34,10 +36,13 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect github.com/klauspost/compress v1.17.10 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-sqlite3 v1.14.24 // indirect github.com/moby/sys/mountinfo v0.7.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/opencontainers/runc v1.1.15 // indirect diff --git a/go.sum b/go.sum index 90aa2f0a..0153ab4d 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY= github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -46,6 +50,10 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= +github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/moby/sys/mountinfo v0.7.2 h1:1shs6aH5s4o5H2zQLn796ADW1wMrIwHsyJ2v9KouLrg= github.com/moby/sys/mountinfo v0.7.2/go.mod h1:1YOa8w8Ih7uW0wALDUgT1dTTSBrZ+HiBLGws92L2RU4= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -121,6 +129,10 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE= +gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= +gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8= +gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ= k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw= k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo= k8s.io/helm v2.17.0+incompatible h1:Bpn6o1wKLYqKM3+Osh8e+1/K2g/GsQJ4F4yNF2+deao= diff --git a/pkg/wekafs/db/db.go b/pkg/wekafs/db/db.go new file mode 100644 index 00000000..17747630 --- /dev/null +++ b/pkg/wekafs/db/db.go @@ -0,0 +1,184 @@ +package db + +import ( + "context" + "errors" + "fmt" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/rs/zerolog/log" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "os" + "path/filepath" +) + +const DBPath = "/tmp/csi-wekafs-attachments/csi-attachments.db" + +type PvcAttachment struct { + ID uint `gorm:"primaryKey"` // Auto-incrementing ID + VolumeId string `gorm:"index:idx_volume_target,unique"` + TargetPath string `gorm:"index:idx_volume_target,unique"` + Node string `json:"node"` + BootID string `json:"boot_id"` + AccessType string `json:"access_type"` +} + +func (pal *PvcAttachment) String() string { + return fmt.Sprintf("PVC: %s, Node: %s, TargetPath: %s, BootID: %s, AccessType: %s", pal.VolumeId, pal.Node, pal.TargetPath, pal.BootID, pal.AccessType) +} + +func (pal *PvcAttachment) MatchesBootId(bootID string) bool { + return pal.BootID == bootID +} + +func (pal *PvcAttachment) MatchesNode(node string) bool { + return pal.Node == node +} + +func (pal *PvcAttachment) MatchesVolumeId(volumeId string) bool { + return pal.VolumeId == volumeId +} + +func (pal *PvcAttachment) MatchesAccessType(accessType string) bool { + return pal.AccessType == accessType +} + +func (pal *PvcAttachment) MatchesTargetPath(path string) bool { + return pal.TargetPath == path +} + +func (pal *PvcAttachment) Matches(volumeId, path, node, accessType string, bootId string) bool { + return pal.MatchesVolumeId(volumeId) && pal.MatchesTargetPath(path) && pal.MatchesNode(node) && pal.MatchesBootId(bootId) && pal.MatchesAccessType(accessType) +} + +func (pal *PvcAttachment) IsSingleWriter() bool { + return pal.AccessType == csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER.String() || + pal.AccessType == csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER.String() +} + +// GetDatabase returns a database for pod attachments that will be used on each node to satisfy the ReadWriteOncePod attachment mode +func GetDatabase(ctx context.Context) (Database, error) { + logger := log.Ctx(ctx) + directory := filepath.Dir(DBPath) + if err := EnsureDirectoryExists(directory); err != nil { + logger.Error().Err(err).Msg("Failed to create directory") + return nil, err + } + db, err := gorm.Open(sqlite.Open(DBPath), &gorm.Config{}) + if err != nil { + logger.Error().Err(err).Msg("Failed to connect to the database") + } + + // Auto-migrate the schema + if err := db.AutoMigrate(&PvcAttachment{}); err != nil { + logger.Error().Err(err).Msg("Failed to migrate the database") + } + + return &SqliteDatabase{ + db, + }, nil +} + +func EnsureDirectoryExists(directory string) error { + _, err := os.Stat(directory) + if err != nil { + if os.IsNotExist(err) { + if err := os.MkdirAll(directory, 0755); err != nil { + return fmt.Errorf("failed to create directory: %w", err) + } + } else { + return fmt.Errorf("failed to stat directory: %w", err) + } + } + return nil +} + +type SqliteDatabase struct { + *gorm.DB +} + +func (d *SqliteDatabase) GetAttachmentsByVolumeIdOrTargetPath(ctx context.Context, volumeId, targetPath *string) (*[]PvcAttachment, error) { + if d == nil { + return nil, errors.New("database is nil") + } + query := d.Model(&PvcAttachment{}) + if volumeId != nil { + query = query.Where("volume_id = ?", *volumeId) + } + if targetPath != nil { + query = query.Where("target_path = ?", *targetPath) + } + locks := &[]PvcAttachment{} + + err := query.Find(locks).Error + if err != nil { + return nil, fmt.Errorf("failed to lookup records: %w", err) + } + + return locks, nil +} + +func (d *SqliteDatabase) CreateAttachment(ctx context.Context, attachment *PvcAttachment) error { + if d == nil { + return errors.New("database is nil") + } + if err := d.Create(attachment).Error; err != nil { + return fmt.Errorf("failed to create record: %w", err) + } + return nil +} + +func (d *SqliteDatabase) UpdateAttachment(ctx context.Context, attachment *PvcAttachment) error { + if d == nil { + return errors.New("database is nil") + } + existing, err := d.GetAttachmentsByVolumeIdOrTargetPath(ctx, &attachment.VolumeId, &attachment.TargetPath) + if err != nil { + return fmt.Errorf("failed to lookup existing record: %w", err) + } + if len(*existing) == 0 { + return errors.New("no record found") + } + attachment.ID = (*existing)[0].ID + + if err := d.Save(attachment).Error; err != nil { + return fmt.Errorf("failed to update record: %w", err) + } + return nil +} + +func (d *SqliteDatabase) CreateOrUpdateAttachment(ctx context.Context, attachment *PvcAttachment) error { + if d == nil { + return errors.New("database is nil") + } + existing, err := d.GetAttachmentsByVolumeIdOrTargetPath(ctx, &attachment.VolumeId, &attachment.TargetPath) + if err != nil { + return fmt.Errorf("failed to lookup existing record: %w", err) + } + if len(*existing) == 0 { + return d.CreateAttachment(ctx, attachment) + } + return d.UpdateAttachment(ctx, attachment) +} + +func (d *SqliteDatabase) DeleteAttachment(ctx context.Context, attachment *PvcAttachment) error { + if d == nil { + return errors.New("database is nil") + } + if err := d.Delete(attachment).Error; err != nil { + return fmt.Errorf("failed to delete record: %w", err) + } + return nil +} + +func (d *SqliteDatabase) DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId string) error { + // Build the delete query + result := d.Where("volume_id = ? AND target_path = ? AND node = ? AND boot_id = ?", volumeId, targetPath, node, bootId).Delete(&PvcAttachment{}) + if result.Error != nil { + return fmt.Errorf("failed to delete record: %w", result.Error) + } + if result.RowsAffected == 0 { + return fmt.Errorf("no record found for VolumeId: %s and TargetPath: %s", volumeId, targetPath) + } + return nil +} diff --git a/pkg/wekafs/db/db_test.go b/pkg/wekafs/db/db_test.go new file mode 100644 index 00000000..95dff249 --- /dev/null +++ b/pkg/wekafs/db/db_test.go @@ -0,0 +1,130 @@ +package db + +import ( + "context" + "os" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "gorm.io/driver/sqlite" + "gorm.io/gorm" +) + +func setupTestDB(t *testing.T) *SqliteDatabase { + _ = os.Remove(DBPath) + db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{}) + assert.NoError(t, err) + + err = db.AutoMigrate(&PvcAttachment{}) + assert.NoError(t, err) + + return &SqliteDatabase{db} +} + +func TestDatabaseWrapper_GetAttachmentsByVolumeIdOrTargetPath(t *testing.T) { + db := setupTestDB(t) + ctx := context.Background() + + volumeId := uuid.New().String() + targetPath := "/test/path" + attachment := &PvcAttachment{ + VolumeId: volumeId, + TargetPath: targetPath, + Node: "node1", + BootID: "boot1", + AccessType: "ReadWriteOnce", + } + + // same attachment exactly + attachment2 := &PvcAttachment{ + VolumeId: volumeId, + TargetPath: targetPath, + Node: "node1", + BootID: "boot1", + AccessType: "ReadWriteOnce", + } + + // attachment with different accesstype + attachment3 := &(*attachment2) + attachment3.AccessType = "ReadOnlyMany" + + // attachment with different ID but same other values, should fail on unique constraints of volumeId and targetPath + attachment4 := &(*attachment3) + attachment4.ID = 10 + + err := db.CreateAttachment(ctx, attachment) + assert.NoError(t, err) + + err = db.UpdateAttachment(ctx, attachment2) + assert.NoError(t, err) + + err = db.UpdateAttachment(ctx, attachment3) + assert.NoError(t, err) + + err = db.CreateOrUpdateAttachment(ctx, attachment3) + assert.NoError(t, err) + + err = db.CreateAttachment(ctx, attachment4) + assert.Error(t, err) + + // Test by volumeId + attachments, err := db.GetAttachmentsByVolumeIdOrTargetPath(ctx, &volumeId, nil) + assert.NoError(t, err) + assert.Len(t, *attachments, 1) + assert.Equal(t, *attachment3, (*attachments)[0]) + + // Test by targetPath + attachments, err = db.GetAttachmentsByVolumeIdOrTargetPath(ctx, nil, &targetPath) + assert.NoError(t, err) + assert.Len(t, *attachments, 1) + assert.Equal(t, *attachment3, (*attachments)[0]) +} + +func TestDatabaseWrapper_DeleteAttachment(t *testing.T) { + db := setupTestDB(t) + ctx := context.Background() + + attachment := &PvcAttachment{ + VolumeId: uuid.New().String(), + TargetPath: "/test/path", + Node: "node1", + BootID: "boot1", + AccessType: "ReadWriteOnce", + } + + err := db.CreateOrUpdateAttachment(ctx, attachment) + assert.NoError(t, err) + + err = db.DeleteAttachment(ctx, attachment) + assert.NoError(t, err) + + var result PvcAttachment + err = db.First(&result, "volume_id = ?", attachment.VolumeId).Error + assert.Error(t, err) + assert.Equal(t, gorm.ErrRecordNotFound, err) +} + +func TestDatabaseWrapper_DeleteAttachmentDisregardingAccessType(t *testing.T) { + db := setupTestDB(t) + ctx := context.Background() + + attachment := &PvcAttachment{ + VolumeId: uuid.New().String(), + TargetPath: "/test/path", + Node: "node1", + BootID: "boot1", + AccessType: "ReadWriteOnce", + } + + err := db.CreateOrUpdateAttachment(ctx, attachment) + assert.NoError(t, err) + + err = db.DeleteAttachmentDisregardingAccessType(attachment.VolumeId, attachment.TargetPath, attachment.Node, attachment.BootID) + assert.NoError(t, err) + + var result PvcAttachment + err = db.First(&result, "volume_id = ?", attachment.VolumeId).Error + assert.Error(t, err) + assert.Equal(t, gorm.ErrRecordNotFound, err) +} diff --git a/pkg/wekafs/db/interfaces.go b/pkg/wekafs/db/interfaces.go new file mode 100644 index 00000000..49e64942 --- /dev/null +++ b/pkg/wekafs/db/interfaces.go @@ -0,0 +1,12 @@ +package db + +import "context" + +type Database interface { + GetAttachmentsByVolumeIdOrTargetPath(ctx context.Context, volumeId, targetPath *string) (*[]PvcAttachment, error) + CreateAttachment(ctx context.Context, attachment *PvcAttachment) error + UpdateAttachment(ctx context.Context, attachment *PvcAttachment) error + CreateOrUpdateAttachment(ctx context.Context, attachment *PvcAttachment) error + DeleteAttachment(ctx context.Context, attachment *PvcAttachment) error + DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId string) error +}