Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Redis Gears support #2675

Merged
merged 6 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,5 @@ uri
URI
url
variadic
RedisStack
RedisStack
RedisGears
65 changes: 65 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -3713,6 +3713,71 @@ func (cmd *MapStringStringSliceCmd) readReply(rd *proto.Reader) error {
return nil
}

//-----------------------------------------------------------------------

type MapStringInterfaceSliceCmd struct {
baseCmd

val []map[string]interface{}
}

var _ Cmder = (*MapStringInterfaceSliceCmd)(nil)

func NewMapStringInterfaceSliceCmd(ctx context.Context, args ...interface{}) *MapStringInterfaceSliceCmd {
return &MapStringInterfaceSliceCmd{
baseCmd: baseCmd{
ctx: ctx,
args: args,
},
}
}

func (cmd *MapStringInterfaceSliceCmd) SetVal(val []map[string]interface{}) {
cmd.val = val
}

func (cmd *MapStringInterfaceSliceCmd) Val() []map[string]interface{} {
return cmd.val
}

func (cmd *MapStringInterfaceSliceCmd) Result() ([]map[string]interface{}, error) {
return cmd.Val(), cmd.Err()
}

func (cmd *MapStringInterfaceSliceCmd) String() string {
return cmdString(cmd, cmd.val)
}

func (cmd *MapStringInterfaceSliceCmd) readReply(rd *proto.Reader) error {
n, err := rd.ReadArrayLen()
if err != nil {
return err
}

cmd.val = make([]map[string]interface{}, n)
for i := 0; i < n; i++ {
nn, err := rd.ReadMapLen()
if err != nil {
return err
}
cmd.val[i] = make(map[string]interface{}, nn)
for f := 0; f < nn; f++ {
k, err := rd.ReadString()
if err != nil {
return err
}
v, err := rd.ReadReply()
if err != nil {
if err != Nil {
return err
}
}
cmd.val[i][k] = v
}
}
return nil
}

//------------------------------------------------------------------------------

type KeyValuesCmd struct {
Expand Down
1 change: 1 addition & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ type Cmdable interface {

ModuleLoadex(ctx context.Context, conf *ModuleLoadexConfig) *StringCmd

gearsCmdable
probabilisticCmdable
}

Expand Down
161 changes: 161 additions & 0 deletions redis_gears.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package redis

import (
"context"
"fmt"
"strings"
)

type gearsCmdable interface {
TFunctionLoad(ctx context.Context, lib string) *StatusCmd
TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd
TFunctionDelete(ctx context.Context, libName string) *StatusCmd
chayim marked this conversation as resolved.
Show resolved Hide resolved
TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd
TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd
TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
ofekshenawa marked this conversation as resolved.
Show resolved Hide resolved
TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd
TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd
}
type TFunctionLoadOptions struct {
Replace bool
Config string
}

type TFunctionListOptions struct {
Withcode bool
Verbose int
Library string
}

type TFCallOptions struct {
Keys []string
Arguments []string
}

// TFunctionLoad - load a new JavaScript library into Redis.
// For more information - https://redis.io/commands/tfunction-load/
func (c cmdable) TFunctionLoad(ctx context.Context, lib string) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD", lib}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
chayim marked this conversation as resolved.
Show resolved Hide resolved
return cmd
}

func (c cmdable) TFunctionLoadArgs(ctx context.Context, lib string, options *TFunctionLoadOptions) *StatusCmd {
args := []interface{}{"TFUNCTION", "LOAD"}
if options != nil {
if options.Replace {
args = append(args, "REPLACE")
}
if options.Config != "" {
args = append(args, "CONFIG", options.Config)
}
}
args = append(args, lib)
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFunctionDelete - delete a JavaScript library from Redis.
// For more information - https://redis.io/commands/tfunction-delete/
func (c cmdable) TFunctionDelete(ctx context.Context, libName string) *StatusCmd {
args := []interface{}{"TFUNCTION", "DELETE", libName}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFunctionList - list the functions with additional information about each function.
// For more information - https://redis.io/commands/tfunction-list/
func (c cmdable) TFunctionList(ctx context.Context) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFunctionListArgs(ctx context.Context, options *TFunctionListOptions) *MapStringInterfaceSliceCmd {
args := []interface{}{"TFUNCTION", "LIST"}
if options != nil {
if options.Withcode {
args = append(args, "WITHCODE")
}
if options.Verbose != 0 {
v := strings.Repeat("v", options.Verbose)
args = append(args, v)
}
if options.Library != "" {
args = append(args, "LIBRARY", options.Library)

}
}
cmd := NewMapStringInterfaceSliceCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFCall - invoke a function.
// For more information - https://redis.io/commands/tfcall/
func (c cmdable) TFCall(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFCallArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := libName + "." + funcName
args := []interface{}{"TFCALL", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {

args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {

args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

// TFCallASYNC - invoke an asynchronous JavaScript function (coroutine).
// For more information - https://redis.io/commands/TFCallASYNC/
func (c cmdable) TFCallASYNC(ctx context.Context, libName string, funcName string, numKeys int) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) TFCallASYNCArgs(ctx context.Context, libName string, funcName string, numKeys int, options *TFCallOptions) *Cmd {
lf := fmt.Sprintf("%s.%s", libName, funcName)
args := []interface{}{"TFCALLASYNC", lf, numKeys}
if options != nil {
if options.Keys != nil {
for _, key := range options.Keys {

args = append(args, key)
}
}
if options.Arguments != nil {
for _, key := range options.Arguments {

args = append(args, key)
}
}
}
cmd := NewCmd(ctx, args...)
_ = c(ctx, cmd)
return cmd
}
139 changes: 139 additions & 0 deletions redis_gears_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package redis_test

import (
"context"
"fmt"

. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
)

func libCode(libName string) string {
ofekshenawa marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Sprintf("#!js api_version=1.0 name=%s\n redis.registerFunction('foo', ()=>{{return 'bar'}})", libName)
}

func libCodeWithConfig(libName string) string {
lib := `#!js api_version=1.0 name=%s

var last_update_field_name = "__last_update__"

if (redis.config.last_update_field_name !== undefined) {
if (typeof redis.config.last_update_field_name != 'string') {
throw "last_update_field_name must be a string";
}
last_update_field_name = redis.config.last_update_field_name
}

redis.registerFunction("hset", function(client, key, field, val){
// get the current time in ms
var curr_time = client.call("time")[0];
return client.call('hset', key, field, val, last_update_field_name, curr_time);
});`
return fmt.Sprintf(lib, libName)
}

var _ = Describe("RedisGears commands", Label("gears"), func() {
ctx := context.TODO()
var client *redis.Client

BeforeEach(func() {
client = redis.NewClient(&redis.Options{Addr: ":6379"})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
chayim marked this conversation as resolved.
Show resolved Hide resolved
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should TFunctionLoad, TFunctionLoadArgs and TFunctionDelete ", Label("gears", "tfunctionload"), func() {

resultAdd, err := client.TFunctionLoad(ctx, libCode("libtflo1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFunctionLoadOptions{Replace: true, Config: `{"last_update_field_name":"last_update"}`}
resultAdd, err = client.TFunctionLoadArgs(ctx, libCodeWithConfig("libtflo1"), opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtflo1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))

})
It("should TFunctionList", Label("gears", "tfunctionlist"), func() {
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfli1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionLoad(ctx, libCode("libtfli2")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultList, err := client.TFunctionList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultList[0]["engine"]).To(BeEquivalentTo("js"))
opt := &redis.TFunctionListOptions{Withcode: true, Verbose: 2}
resultListArgs, err := client.TFunctionListArgs(ctx, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultListArgs[0]["code"]).To(BeEquivalentTo(libCode("libtfli1")))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfli2").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCall", Label("gears", "tfcall"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCall(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallArgs", Label("gears", "tfcallargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallASYNC", Label("gears", "TFCallASYNC"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfc1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
resultAdd, err = client.TFCallASYNC(ctx, "libtfc1", "foo", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfc1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

It("should TFCallASYNCArgs", Label("gears", "TFCallASYNCargs"), func() {
var resultAdd interface{}
resultAdd, err := client.TFunctionLoad(ctx, libCode("libtfca1")).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
opt := &redis.TFCallOptions{Arguments: []string{"foo", "bar"}}
resultAdd, err = client.TFCallASYNCArgs(ctx, "libtfca1", "foo", 0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("bar"))
resultAdd, err = client.TFunctionDelete(ctx, "libtfca1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo("OK"))
})

})
Loading