Skip to content

Commit

Permalink
SQL storage (#137)
Browse files Browse the repository at this point in the history
* SQL storage

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* SQL Implementation

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Fixed the codereview issues

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Fixed the code review comments

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Fixed the glob search for sql

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Some more code fixes

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* More tweaks to the code

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Update cmd/server/server.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Update pkg/storages/sql_test.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Fixed the failing test

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>

* Fixed sqlite SaveCaches

Signed-off-by: neilnaveen <42328488+neilnaveen@users.noreply.github.com>

* Fixed error message for redis, and tests

Signed-off-by: neilnaveen <42328488+neilnaveen@users.noreply.github.com>

---------

Signed-off-by: naveensrinivasan <172697+naveensrinivasan@users.noreply.github.com>
Signed-off-by: neilnaveen <42328488+neilnaveen@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: neilnaveen <42328488+neilnaveen@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 18, 2024
1 parent 1b82676 commit a79071d
Show file tree
Hide file tree
Showing 8 changed files with 873 additions and 24 deletions.
28 changes: 27 additions & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,32 @@ type options struct {
addr string
StorageType string
StorageAddr string
StoragePath string
UseInMemory bool
}

const (
defaultConcurrency = 10
defaultAddr = "localhost:8089"
redisStorageType = "redis"
sqliteStorageType = "sqlite"
)

func (o *options) AddFlags(cmd *cobra.Command) {
cmd.Flags().Int32Var(&o.concurrency, "concurrency", defaultConcurrency, "Maximum number of concurrent operations for leaderboard operations")
cmd.Flags().StringVar(&o.addr, "addr", defaultAddr, "Network address and port for the server (e.g. localhost:8089)")
cmd.Flags().StringVar(&o.StorageType, "storage-type", redisStorageType, "Type of storage to use (e.g., redis, sql)")
cmd.Flags().StringVar(&o.StorageType, "storage-type", redisStorageType, "Type of storage to use (e.g., redis, sqlite)")
cmd.Flags().StringVar(&o.StorageAddr, "storage-addr", "localhost:6379", "Address for storage backend")
cmd.Flags().StringVar(&o.StoragePath, "storage-path", "", "Path to the SQLite database file")
cmd.Flags().BoolVar(&o.UseInMemory, "use-in-memory", true, "Use in-memory SQLite database")
}

func (o *options) ProvideStorage() (graph.Storage, error) {
switch o.StorageType {
case redisStorageType:
return storages.NewRedisStorage(o.StorageAddr)
case sqliteStorageType:
return storages.NewSQLStorage(o.StoragePath, o.UseInMemory)
default:
return nil, fmt.Errorf("unknown storage type: %s", o.StorageType)
}
Expand All @@ -64,6 +71,24 @@ func (o *options) Run(cmd *cobra.Command, args []string) error {
return o.startServer(server)
}

func (o *options) PersistentPreRunE(cmd *cobra.Command, args []string) error {
if o.StorageType != redisStorageType && o.StorageType != sqliteStorageType {
return fmt.Errorf("invalid storage-type %q: must be one of [redis, sqlite]", o.StorageType)
}

if o.StorageType == sqliteStorageType && o.StoragePath == "" {
if !o.UseInMemory {
return fmt.Errorf("storage-path is required when using SQLite with file-based storage")
}
}

if o.StorageType == redisStorageType && o.StorageAddr == "" {
return fmt.Errorf("storage-addr is required when using Redis (format: host:port)")
}

return nil
}

func (o *options) setupServer() (*http.Server, error) {
if o.concurrency <= 0 {
return nil, fmt.Errorf("concurrency must be greater than zero")
Expand Down Expand Up @@ -130,6 +155,7 @@ func New() *cobra.Command {
Use: "server",
Short: "Start the minefield server for graph operations and queries",
Args: cobra.ExactArgs(0),
PersistentPreRunE: o.PersistentPreRunE,
RunE: o.Run,
DisableAutoGenTag: true,
}
Expand Down
71 changes: 71 additions & 0 deletions cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,74 @@ func TestSetupServer(t *testing.T) {
t.Error("Expected handler to be set, got nil")
}
}

func TestOptions_PersistentPreRunE(t *testing.T) {
tests := []struct {
name string
options *options
wantErr bool
errorMessage string
}{
{
name: "SQLite with empty StoragePath",
options: &options{
StorageType: sqliteStorageType,
StoragePath: "",
},
wantErr: true,
errorMessage: "storage-path is required when using SQLite with file-based storage",
},
{
name: "Redis with empty StorageAddr",
options: &options{
StorageType: redisStorageType,
StorageAddr: "",
},
wantErr: true,
errorMessage: "storage-addr is required when using Redis (format: host:port)",
},
{
name: "SQLite with valid StoragePath",
options: &options{
StorageType: sqliteStorageType,
StoragePath: "/path/to/sqlite.db",
},
wantErr: false,
},
{
name: "Redis with valid StorageAddr and UseInMemory disabled",
options: &options{
StorageType: redisStorageType,
StorageAddr: "localhost:6379",
UseInMemory: false,
},
wantErr: false,
},
{
name: "Unsupported StorageType",
options: &options{
StorageType: "unsupported",
},
wantErr: true,
errorMessage: `invalid storage-type "unsupported": must be one of [redis, sqlite]`,
},
}

cmd := &cobra.Command{}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.options.PersistentPreRunE(cmd, []string{})
if (err != nil) != tt.wantErr {
t.Errorf("PersistentPreRunE() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.wantErr {
if err == nil {
t.Errorf("Expected error but got none")
} else if err.Error() != tt.errorMessage {
t.Errorf("PersistentPreRunE() error message = %v, want %v", err.Error(), tt.errorMessage)
}
}
})
}
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ require (
github.com/zeebo/assert v1.3.1
golang.org/x/net v0.31.0
google.golang.org/protobuf v1.35.2
gorm.io/driver/sqlite v1.5.6
gorm.io/gorm v1.25.12
)

require (
Expand All @@ -39,7 +41,10 @@ require (
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/mattn/go-sqlite3 v1.14.24 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/onsi/gomega v1.30.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUq
github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
Expand All @@ -58,6 +62,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down Expand Up @@ -174,6 +180,10 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/driver/sqlite v1.5.6 h1:fO/X46qn5NUEEOZtnjJRWRzZMe8nqJiQ9E+0hi+hKQE=
gorm.io/driver/sqlite v1.5.6/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4=
gorm.io/gorm v1.25.12 h1:I0u8i2hWQItBq1WfE0o2+WuL9+8L21K9e2HHSTE/0f8=
gorm.io/gorm v1.25.12/go.mod h1:xh7N7RHfYlNc5EmcI/El95gXusucDrQnHXe0+CgWcLQ=
sigs.k8s.io/release-utils v0.8.4 h1:4QVr3UgbyY/d9p74LBhg0njSVQofUsAZqYOzVZBhdBw=
sigs.k8s.io/release-utils v0.8.4/go.mod h1:m1bHfscTemQp+z+pLCZnkXih9n0+WukIUU70n6nFnU0=
sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY=
46 changes: 23 additions & 23 deletions pkg/storages/redis_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewRedisStorage(addr string) (graph.Storage, error) {
}

func (r *RedisStorage) GenerateID() (uint32, error) {
id, err := r.Client.Incr(context.Background(), "id_counter").Result()
id, err := r.Client.Incr(context.Background(), IDCounterKey).Result()
if err != nil {
return 0, fmt.Errorf("failed to generate ID: %w", err)
}
Expand All @@ -37,20 +37,20 @@ func (r *RedisStorage) SaveNode(node *graph.Node) error {
if err != nil {
return fmt.Errorf("failed to marshal node: %w", err)
}
if err := r.Client.Set(context.Background(), fmt.Sprintf("node:%d", node.ID), data, 0).Err(); err != nil {
if err := r.Client.Set(context.Background(), fmt.Sprintf("%s%d", NodeKeyPrefix, node.ID), data, 0).Err(); err != nil {
return fmt.Errorf("failed to save node data: %w", err)
}
if err := r.Client.Set(context.Background(), fmt.Sprintf("name_to_id:%s", node.Name), strconv.Itoa(int(node.ID)), 0).Err(); err != nil {
if err := r.Client.Set(context.Background(), fmt.Sprintf("%s%s", NameToIDKey, node.Name), strconv.Itoa(int(node.ID)), 0).Err(); err != nil {
return fmt.Errorf("failed to save node name to ID mapping: %w", err)
}
if err := r.AddNodeToCachedStack(node.ID); err != nil {
return fmt.Errorf("failed to add node ID to to_be_cached set: %w", err)
return fmt.Errorf("failed to add node ID to %s set: %w", CacheStackKey, err)
}
return nil
}

func (r *RedisStorage) NameToID(name string) (uint32, error) {
id, err := r.Client.Get(context.Background(), fmt.Sprintf("name_to_id:%s", name)).Result()
id, err := r.Client.Get(context.Background(), fmt.Sprintf("%s%s", NameToIDKey, name)).Result()
if err != nil {
return 0, fmt.Errorf("failed to get ID for name %s: %w", name, err)
}
Expand All @@ -64,7 +64,7 @@ func (r *RedisStorage) NameToID(name string) (uint32, error) {

func (r *RedisStorage) GetNode(id uint32) (*graph.Node, error) {
ctx := context.Background()
data, err := r.Client.Get(ctx, fmt.Sprintf("node:%d", id)).Result()
data, err := r.Client.Get(ctx, fmt.Sprintf("%s%d", NodeKeyPrefix, id)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get node data for ID %d: %w", id, err)
}
Expand All @@ -77,15 +77,15 @@ func (r *RedisStorage) GetNode(id uint32) (*graph.Node, error) {

func (r *RedisStorage) GetNodesByGlob(pattern string) ([]*graph.Node, error) {
// Use pattern matching for Redis keys
keys, err := r.Client.Keys(context.Background(), fmt.Sprintf("name_to_id:%s", pattern)).Result()
keys, err := r.Client.Keys(context.Background(), fmt.Sprintf("%s%s", NameToIDKey, pattern)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get nodes by pattern %s: %w", pattern, err)
}

nodes := make([]*graph.Node, 0, len(keys))
for _, key := range keys {
// Extract the name from the key
name := strings.TrimPrefix(key, "name_to_id:")
name := strings.TrimPrefix(key, NameToIDKey)

// Get the ID using the name
id, err := r.NameToID(name)
Expand All @@ -104,13 +104,13 @@ func (r *RedisStorage) GetNodesByGlob(pattern string) ([]*graph.Node, error) {
}

func (r *RedisStorage) GetAllKeys() ([]uint32, error) {
keys, err := r.Client.Keys(context.Background(), "node:*").Result()
keys, err := r.Client.Keys(context.Background(), fmt.Sprintf("%s*", NodeKeyPrefix)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get all keys: %w", err)
}
var result []uint32
for _, key := range keys {
id, err := strconv.ParseUint(strings.TrimPrefix(key, "node:"), 10, 32)
id, err := strconv.ParseUint(strings.TrimPrefix(key, NodeKeyPrefix), 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse key %s: %w", key, err)
}
Expand All @@ -125,21 +125,21 @@ func (r *RedisStorage) SaveCache(cache *graph.NodeCache) error {
if err != nil {
return fmt.Errorf("failed to marshal cache: %w", err)
}
return r.Client.Set(ctx, fmt.Sprintf("cache:%d", cache.ID), data, 0).Err()
return r.Client.Set(ctx, fmt.Sprintf("%s%d", CacheKeyPrefix, cache.ID), data, 0).Err()
}

func (r *RedisStorage) ToBeCached() ([]uint32, error) {
ctx := context.Background()
data, err := r.Client.LRange(ctx, "to_be_cached", 0, -1).Result()
data, err := r.Client.LRange(ctx, CacheStackKey, 0, -1).Result()
if err != nil {
return nil, fmt.Errorf("failed to get to_be_cached data: %w", err)
return nil, fmt.Errorf("failed to get %s data: %w", CacheStackKey, err)
}

result := make([]uint32, 0, len(data))
for _, item := range data {
id, err := strconv.ParseUint(item, 10, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse item %s in to_be_cached: %w", item, err)
return nil, fmt.Errorf("failed to parse item %s in %s: %w", item, CacheStackKey, err)
}
result = append(result, uint32(id))
}
Expand All @@ -149,7 +149,7 @@ func (r *RedisStorage) ToBeCached() ([]uint32, error) {

func (r *RedisStorage) AddNodeToCachedStack(nodeID uint32) error {
ctx := context.Background()
err := r.Client.RPush(ctx, "to_be_cached", nodeID).Err()
err := r.Client.RPush(ctx, CacheStackKey, nodeID).Err()
if err != nil {
return fmt.Errorf("failed to add node %d to cached stack: %w", nodeID, err)
}
Expand All @@ -158,7 +158,7 @@ func (r *RedisStorage) AddNodeToCachedStack(nodeID uint32) error {

func (r *RedisStorage) ClearCacheStack() error {
ctx := context.Background()
err := r.Client.Del(ctx, "to_be_cached").Err()
err := r.Client.Del(ctx, CacheStackKey).Err()
if err != nil {
return fmt.Errorf("failed to clear cache stack: %w", err)
}
Expand All @@ -167,7 +167,7 @@ func (r *RedisStorage) ClearCacheStack() error {

func (r *RedisStorage) GetCache(nodeID uint32) (*graph.NodeCache, error) {
ctx := context.Background()
data, err := r.Client.Get(ctx, fmt.Sprintf("cache:%d", nodeID)).Result()
data, err := r.Client.Get(ctx, fmt.Sprintf("%s%d", CacheKeyPrefix, nodeID)).Result()
if err != nil {
return nil, fmt.Errorf("failed to get cache for node %d: %w", nodeID, err)
}
Expand All @@ -184,7 +184,7 @@ func (r *RedisStorage) GetNodes(ids []uint32) (map[uint32]*graph.Node, error) {

cmds := make([]*redis.StringCmd, len(ids))
for i, id := range ids {
cmds[i] = pipe.Get(ctx, fmt.Sprintf("node:%d", id))
cmds[i] = pipe.Get(ctx, fmt.Sprintf("%s%d", NodeKeyPrefix, id))
}

_, err := pipe.Exec(ctx)
Expand Down Expand Up @@ -220,7 +220,7 @@ func (r *RedisStorage) SaveCaches(caches []*graph.NodeCache) error {
if err != nil {
return fmt.Errorf("failed to marshal cache: %w", err)
}
pipe.Set(ctx, fmt.Sprintf("cache:%d", cache.ID), data, 0)
pipe.Set(ctx, fmt.Sprintf("%s%d", CacheKeyPrefix, cache.ID), data, 0)
}

_, err := pipe.Exec(ctx)
Expand All @@ -236,7 +236,7 @@ func (r *RedisStorage) GetCaches(ids []uint32) (map[uint32]*graph.NodeCache, err

cmds := make([]*redis.StringCmd, len(ids))
for i, id := range ids {
cmds[i] = pipe.Get(ctx, fmt.Sprintf("cache:%d", id))
cmds[i] = pipe.Get(ctx, fmt.Sprintf("%s%d", CacheKeyPrefix, id))
}

_, err := pipe.Exec(ctx)
Expand Down Expand Up @@ -270,7 +270,7 @@ func (r *RedisStorage) RemoveAllCaches() error {

for {
var keys []string
keys, cursor, err = r.Client.Scan(ctx, cursor, "cache:*", 1000).Result()
keys, cursor, err = r.Client.Scan(ctx, cursor, fmt.Sprintf("%s*", CacheKeyPrefix), 1000).Result()
if err != nil {
return fmt.Errorf("failed to scan cache keys: %w", err)
}
Expand All @@ -280,8 +280,8 @@ func (r *RedisStorage) RemoveAllCaches() error {

// Extract IDs and add them to the cache stack
for _, key := range keys {
id := strings.TrimPrefix(key, "cache:")
pipe.RPush(ctx, "to_be_cached", id)
id := strings.TrimPrefix(key, CacheKeyPrefix)
pipe.RPush(ctx, CacheStackKey, id)
}

// Delete the cache entries
Expand Down
Loading

0 comments on commit a79071d

Please sign in to comment.