Skip to content

Commit

Permalink
feat(CSI-308): implement database and sqlite3 backing
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Nov 26, 2024
1 parent 66f8205 commit b57fee4
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 0 deletions.
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ require (
golang.org/x/sync v0.9.0
google.golang.org/grpc v1.67.1
google.golang.org/protobuf v1.35.1
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.12
k8s.io/apimachinery v0.31.2
k8s.io/helm v2.17.0+incompatible
k8s.io/mount-utils v0.31.2
Expand All @@ -34,10 +36,13 @@ require (
github.com/go-logr/stdr v1.2.2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-sqlite3 v1.14.24 // indirect
github.com/moby/sys/mountinfo v0.7.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/opencontainers/runc v1.1.15 // indirect
Expand Down
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand All @@ -46,6 +50,10 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/moby/sys/mountinfo v0.7.2 h1:1shs6aH5s4o5H2zQLn796ADW1wMrIwHsyJ2v9KouLrg=
github.com/moby/sys/mountinfo v0.7.2/go.mod h1:1YOa8w8Ih7uW0wALDUgT1dTTSBrZ+HiBLGws92L2RU4=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
Expand Down Expand Up @@ -121,6 +129,10 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
k8s.io/apimachinery v0.31.2 h1:i4vUt2hPK56W6mlT7Ry+AO8eEsyxMD1U44NR22CLTYw=
k8s.io/apimachinery v0.31.2/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/helm v2.17.0+incompatible h1:Bpn6o1wKLYqKM3+Osh8e+1/K2g/GsQJ4F4yNF2+deao=
Expand Down
184 changes: 184 additions & 0 deletions pkg/wekafs/db/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package db

import (
"context"
"errors"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/rs/zerolog/log"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"os"
"path/filepath"
)

const DBPath = "/tmp/csi-wekafs-attachments/csi-attachments.db"

type PvcAttachment struct {
ID uint `gorm:"primaryKey"` // Auto-incrementing ID
VolumeId string `gorm:"index:idx_volume_target,unique"`
TargetPath string `gorm:"index:idx_volume_target,unique"`
Node string `json:"node"`
BootID string `json:"boot_id"`
AccessType string `json:"access_type"`
}

func (pal *PvcAttachment) String() string {
return fmt.Sprintf("PVC: %s, Node: %s, TargetPath: %s, BootID: %s, AccessType: %s", pal.VolumeId, pal.Node, pal.TargetPath, pal.BootID, pal.AccessType)
}

func (pal *PvcAttachment) MatchesBootId(bootID string) bool {
return pal.BootID == bootID
}

func (pal *PvcAttachment) MatchesNode(node string) bool {
return pal.Node == node
}

func (pal *PvcAttachment) MatchesVolumeId(volumeId string) bool {
return pal.VolumeId == volumeId
}

func (pal *PvcAttachment) MatchesAccessType(accessType string) bool {
return pal.AccessType == accessType
}

func (pal *PvcAttachment) MatchesTargetPath(path string) bool {
return pal.TargetPath == path
}

func (pal *PvcAttachment) Matches(volumeId, path, node, accessType string, bootId string) bool {
return pal.MatchesVolumeId(volumeId) && pal.MatchesTargetPath(path) && pal.MatchesNode(node) && pal.MatchesBootId(bootId) && pal.MatchesAccessType(accessType)
}

func (pal *PvcAttachment) IsSingleWriter() bool {
return pal.AccessType == csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER.String() ||
pal.AccessType == csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER.String()
}

// GetDatabase returns a database for pod attachments that will be used on each node to satisfy the ReadWriteOncePod attachment mode
func GetDatabase(ctx context.Context) (Database, error) {
logger := log.Ctx(ctx)
directory := filepath.Dir(DBPath)
if err := EnsureDirectoryExists(directory); err != nil {
logger.Error().Err(err).Msg("Failed to create directory")
return nil, err
}
db, err := gorm.Open(sqlite.Open(DBPath), &gorm.Config{})
if err != nil {
logger.Error().Err(err).Msg("Failed to connect to the database")
}

// Auto-migrate the schema
if err := db.AutoMigrate(&PvcAttachment{}); err != nil {
logger.Error().Err(err).Msg("Failed to migrate the database")
}

return &SqliteDatabase{
db,
}, nil
}

func EnsureDirectoryExists(directory string) error {
_, err := os.Stat(directory)
if err != nil {
if os.IsNotExist(err) {
if err := os.MkdirAll(directory, 0755); err != nil {
return fmt.Errorf("failed to create directory: %w", err)
}
} else {
return fmt.Errorf("failed to stat directory: %w", err)
}
}
return nil
}

type SqliteDatabase struct {
*gorm.DB
}

func (d *SqliteDatabase) GetAttachmentsByVolumeIdOrTargetPath(ctx context.Context, volumeId, targetPath *string) (*[]PvcAttachment, error) {
if d == nil {
return nil, errors.New("database is nil")
}
query := d.Model(&PvcAttachment{})
if volumeId != nil {
query = query.Where("volume_id = ?", *volumeId)
}
if targetPath != nil {
query = query.Where("target_path = ?", *targetPath)
}
locks := &[]PvcAttachment{}

err := query.Find(locks).Error
if err != nil {
return nil, fmt.Errorf("failed to lookup records: %w", err)
}

return locks, nil
}

func (d *SqliteDatabase) CreateAttachment(ctx context.Context, attachment *PvcAttachment) error {
if d == nil {
return errors.New("database is nil")
}
if err := d.Create(attachment).Error; err != nil {
return fmt.Errorf("failed to create record: %w", err)
}
return nil
}

func (d *SqliteDatabase) UpdateAttachment(ctx context.Context, attachment *PvcAttachment) error {
if d == nil {
return errors.New("database is nil")
}
existing, err := d.GetAttachmentsByVolumeIdOrTargetPath(ctx, &attachment.VolumeId, &attachment.TargetPath)
if err != nil {
return fmt.Errorf("failed to lookup existing record: %w", err)
}
if len(*existing) == 0 {
return errors.New("no record found")
}
attachment.ID = (*existing)[0].ID

if err := d.Save(attachment).Error; err != nil {
return fmt.Errorf("failed to update record: %w", err)
}
return nil
}

func (d *SqliteDatabase) CreateOrUpdateAttachment(ctx context.Context, attachment *PvcAttachment) error {
if d == nil {
return errors.New("database is nil")
}
existing, err := d.GetAttachmentsByVolumeIdOrTargetPath(ctx, &attachment.VolumeId, &attachment.TargetPath)
if err != nil {
return fmt.Errorf("failed to lookup existing record: %w", err)
}
if len(*existing) == 0 {
return d.CreateAttachment(ctx, attachment)
}
return d.UpdateAttachment(ctx, attachment)
}

func (d *SqliteDatabase) DeleteAttachment(ctx context.Context, attachment *PvcAttachment) error {
if d == nil {
return errors.New("database is nil")
}
if err := d.Delete(attachment).Error; err != nil {
return fmt.Errorf("failed to delete record: %w", err)
}
return nil
}

func (d *SqliteDatabase) DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId string) error {
// Build the delete query
result := d.Where("volume_id = ? AND target_path = ? AND node = ? AND boot_id = ?", volumeId, targetPath, node, bootId).Delete(&PvcAttachment{})
if result.Error != nil {
return fmt.Errorf("failed to delete record: %w", result.Error)
}
if result.RowsAffected == 0 {
return fmt.Errorf("no record found for VolumeId: %s and TargetPath: %s", volumeId, targetPath)
}
return nil
}
130 changes: 130 additions & 0 deletions pkg/wekafs/db/db_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package db

import (
"context"
"os"
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
)

func setupTestDB(t *testing.T) *SqliteDatabase {
_ = os.Remove(DBPath)
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
assert.NoError(t, err)

err = db.AutoMigrate(&PvcAttachment{})
assert.NoError(t, err)

return &SqliteDatabase{db}
}

func TestDatabaseWrapper_GetAttachmentsByVolumeIdOrTargetPath(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()

volumeId := uuid.New().String()
targetPath := "/test/path"
attachment := &PvcAttachment{
VolumeId: volumeId,
TargetPath: targetPath,
Node: "node1",
BootID: "boot1",
AccessType: "ReadWriteOnce",
}

// same attachment exactly
attachment2 := &PvcAttachment{
VolumeId: volumeId,
TargetPath: targetPath,
Node: "node1",
BootID: "boot1",
AccessType: "ReadWriteOnce",
}

// attachment with different accesstype
attachment3 := &(*attachment2)
attachment3.AccessType = "ReadOnlyMany"

// attachment with different ID but same other values, should fail on unique constraints of volumeId and targetPath
attachment4 := &(*attachment3)
attachment4.ID = 10

err := db.CreateAttachment(ctx, attachment)
assert.NoError(t, err)

err = db.UpdateAttachment(ctx, attachment2)
assert.NoError(t, err)

err = db.UpdateAttachment(ctx, attachment3)
assert.NoError(t, err)

err = db.CreateOrUpdateAttachment(ctx, attachment3)
assert.NoError(t, err)

err = db.CreateAttachment(ctx, attachment4)
assert.Error(t, err)

// Test by volumeId
attachments, err := db.GetAttachmentsByVolumeIdOrTargetPath(ctx, &volumeId, nil)
assert.NoError(t, err)
assert.Len(t, *attachments, 1)
assert.Equal(t, *attachment3, (*attachments)[0])

// Test by targetPath
attachments, err = db.GetAttachmentsByVolumeIdOrTargetPath(ctx, nil, &targetPath)
assert.NoError(t, err)
assert.Len(t, *attachments, 1)
assert.Equal(t, *attachment3, (*attachments)[0])
}

func TestDatabaseWrapper_DeleteAttachment(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()

attachment := &PvcAttachment{
VolumeId: uuid.New().String(),
TargetPath: "/test/path",
Node: "node1",
BootID: "boot1",
AccessType: "ReadWriteOnce",
}

err := db.CreateOrUpdateAttachment(ctx, attachment)
assert.NoError(t, err)

err = db.DeleteAttachment(ctx, attachment)
assert.NoError(t, err)

var result PvcAttachment
err = db.First(&result, "volume_id = ?", attachment.VolumeId).Error
assert.Error(t, err)
assert.Equal(t, gorm.ErrRecordNotFound, err)
}

func TestDatabaseWrapper_DeleteAttachmentDisregardingAccessType(t *testing.T) {
db := setupTestDB(t)
ctx := context.Background()

attachment := &PvcAttachment{
VolumeId: uuid.New().String(),
TargetPath: "/test/path",
Node: "node1",
BootID: "boot1",
AccessType: "ReadWriteOnce",
}

err := db.CreateOrUpdateAttachment(ctx, attachment)
assert.NoError(t, err)

err = db.DeleteAttachmentDisregardingAccessType(attachment.VolumeId, attachment.TargetPath, attachment.Node, attachment.BootID)
assert.NoError(t, err)

var result PvcAttachment
err = db.First(&result, "volume_id = ?", attachment.VolumeId).Error
assert.Error(t, err)
assert.Equal(t, gorm.ErrRecordNotFound, err)
}
12 changes: 12 additions & 0 deletions pkg/wekafs/db/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package db

import "context"

type Database interface {
GetAttachmentsByVolumeIdOrTargetPath(ctx context.Context, volumeId, targetPath *string) (*[]PvcAttachment, error)
CreateAttachment(ctx context.Context, attachment *PvcAttachment) error
UpdateAttachment(ctx context.Context, attachment *PvcAttachment) error
CreateOrUpdateAttachment(ctx context.Context, attachment *PvcAttachment) error
DeleteAttachment(ctx context.Context, attachment *PvcAttachment) error
DeleteAttachmentDisregardingAccessType(volumeId, targetPath, node, bootId string) error
}

0 comments on commit b57fee4

Please sign in to comment.