Skip to content

Commit

Permalink
Merge pull request #415 from redis/om-ttl
Browse files Browse the repository at this point in the history
feat: allow setting expiry on om entities by adding an exat field
  • Loading branch information
rueian authored Dec 1, 2023
2 parents 65f7055 + 0e916e0 commit 7654a00
Show file tree
Hide file tree
Showing 8 changed files with 382 additions and 12 deletions.
15 changes: 12 additions & 3 deletions om/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import (
)

type Example struct {
Key string `json:"key" redis:",key"` // the redis:",key" is required to indicate which field is the ULID key
Ver int64 `json:"ver" redis:",ver"` // the redis:",ver" is required to do optimistic locking to prevent lost update
Str string `json:"str"` // both NewHashRepository and NewJSONRepository use json tag as field name
Key string `json:"key" redis:",key"` // the redis:",key" is required to indicate which field is the ULID key
Ver int64 `json:"ver" redis:",ver"` // the redis:",ver" is required to do optimistic locking to prevent lost update
ExAt time.Time `json:"exat" redis:",exat"` // the redis:",exat" is optional for setting record expiry with unix timestamp
Str string `json:"str"` // both NewHashRepository and NewJSONRepository use json tag as field name
}

func main() {
Expand All @@ -31,6 +32,7 @@ func main() {

exp := repo.NewEntity()
exp.Str = "mystr"
exp.ExAt = time.Now().Add(time.Hour)
fmt.Println(exp.Key) // output 01FNH4FCXV9JTB9WTVFAAKGSYB
repo.Save(ctx, exp) // success

Expand Down Expand Up @@ -88,6 +90,12 @@ repo1 := om.NewHashRepository("my_prefix", Example{}, c, om.WithIndexName("my_in
repo2 := om.NewHashRepository("my_prefix", Example{}, c, om.WithIndexName("my_index2"))
```

### Object Expiry Timestamp

Setting a `redis:",exat"` tag on a `time.Time` field will set `PEXPIREAT` on the record accordingly when calling `.Save()`.

If the `time.Time` is zero, then the expiry will be untouched when calling `.Save()`.

### Object Mapping Limitation

`NewHashRepository` only accepts these field types:
Expand All @@ -96,5 +104,6 @@ repo2 := om.NewHashRepository("my_prefix", Example{}, c, om.WithIndexName("my_in
* `bool`, `*bool`
* `[]byte`, `json.RawMessage`
* `[]float32`, `[]float64` for vector search
* `json.Marshaler+json.Unmarshaler`

Field projection by RediSearch is not supported.
33 changes: 28 additions & 5 deletions om/conv.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package om

import (
"encoding/json"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -50,7 +51,11 @@ func (r hashConv) ToHash() (fields map[string]string) {
fields = make(map[string]string, len(r.factory.fields))
for k, f := range r.factory.fields {
ref := r.entity.Field(f.idx)
if v, ok := f.conv.ValueToString(ref); ok {
if f.conv.ValueToString == nil {
if bs, err := json.Marshal(ref.Interface()); err == nil {
fields[k] = rueidis.BinaryString(bs)
}
} else if v, ok := f.conv.ValueToString(ref); ok {
fields[k] = v
}
}
Expand All @@ -63,11 +68,17 @@ func (r hashConv) FromHash(fields map[string]string) error {
if !ok {
continue
}
val, err := f.conv.StringToValue(v)
if err != nil {
return err
if f.conv.StringToValue == nil {
if err := json.Unmarshal(unsafe.Slice(unsafe.StringData(v), len(v)), r.entity.Field(f.idx).Addr().Interface()); err != nil {
return err
}
} else {
val, err := f.conv.StringToValue(v)
if err != nil {
return err
}
r.entity.Field(f.idx).Set(val)
}
r.entity.Field(f.idx).Set(val)
}
return nil
}
Expand Down Expand Up @@ -124,6 +135,10 @@ var converters = struct {
return reflect.ValueOf(&b), nil
},
},
reflect.Struct: {
ValueToString: nil,
StringToValue: nil,
},
},
val: map[reflect.Kind]converter{
reflect.Int64: {
Expand Down Expand Up @@ -158,6 +173,10 @@ var converters = struct {
return reflect.ValueOf(b), nil
},
},
reflect.Struct: {
ValueToString: nil,
StringToValue: nil,
},
},
slice: map[reflect.Kind]converter{
reflect.Uint8: {
Expand Down Expand Up @@ -187,5 +206,9 @@ var converters = struct {
return reflect.ValueOf(rueidis.ToVector64(value)), nil
},
},
reflect.Struct: {
ValueToString: nil,
StringToValue: nil,
},
},
}
22 changes: 20 additions & 2 deletions om/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,26 @@ func (r *HashRepository[T]) toExec(entity *T) (val reflect.Value, exec rueidis.L
fields := r.factory.NewConverter(val).ToHash()
keyVal := fields[r.schema.key.name]
verVal := fields[r.schema.ver.name]
extVal := int64(0)
if r.schema.ext != nil {
if ext, ok := val.Field(r.schema.ext.idx).Interface().(time.Time); ok && !ext.IsZero() {
extVal = ext.UnixMilli()
}
}
exec.Keys = []string{key(r.prefix, keyVal)}
exec.Args = make([]string, 0, len(fields)*2)
if extVal != 0 {
exec.Args = make([]string, 0, len(fields)*2+1)
} else {
exec.Args = make([]string, 0, len(fields)*2)
}
exec.Args = append(exec.Args, r.schema.ver.name, verVal) // keep the ver field be the first pair for the hashSaveScript
delete(fields, r.schema.ver.name)
for k, v := range fields {
exec.Args = append(exec.Args, k, v)
}
if extVal != 0 {
exec.Args = append(exec.Args, strconv.FormatInt(extVal, 10))
}
return
}

Expand Down Expand Up @@ -184,7 +197,12 @@ local v = redis.call('HGET',KEYS[1],ARGV[1])
if (not v or v == ARGV[2])
then
ARGV[2] = tostring(tonumber(ARGV[2])+1)
if redis.call('HSET',KEYS[1],unpack(ARGV)) then return ARGV[2] end
local e = (#ARGV % 2 == 1) and table.remove(ARGV) or nil
if redis.call('HSET',KEYS[1],unpack(ARGV))
then
if e then redis.call('PEXPIREAT',KEYS[1],e) end
return ARGV[2]
end
end
return nil
`)
141 changes: 141 additions & 0 deletions om/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,144 @@ func TestNewHashRepository(t *testing.T) {
}
})
}

type HashTestTTLStruct struct {
Key string `redis:",key"`
Ver int64 `redis:",ver"`
Exat time.Time `redis:",exat"`
}

//gocyclo:ignore
func TestNewHashRepositoryTTL(t *testing.T) {
ctx := context.Background()

client := setup(t)
client.Do(ctx, client.B().Flushall().Build())
defer client.Close()

repo := NewHashRepository("hashttl", HashTestTTLStruct{}, client)

t.Run("NewEntity", func(t *testing.T) {
e := repo.NewEntity()
ulid.MustParse(e.Key)
})

t.Run("Save", func(t *testing.T) {
e := repo.NewEntity()
e.Exat = time.Now().Add(time.Minute)
if err := repo.Save(ctx, e); err != nil {
t.Fatal(err)
}
if e.Ver != 1 {
t.Fatalf("ver should be increment")
}

// test ErrVersionMismatch
e.Ver = 0
if err := repo.Save(ctx, e); err != ErrVersionMismatch {
t.Fatalf("save should fail if ErrVersionMismatch, got: %v", err)
}
e.Ver = 1 // restore

t.Run("ExpireAt", func(t *testing.T) {
exat, err := client.Do(ctx, client.B().Pexpiretime().Key("hashttl:"+e.Key).Build()).AsInt64()
if err != nil {
t.Fatal(err)
}
if exat != e.Exat.UnixMilli() {
t.Fatalf("wrong exat")
}
})

t.Run("Fetch", func(t *testing.T) {
ei, err := repo.Fetch(ctx, e.Key)
if err != nil {
t.Fatal(err)
}
if e == ei {
t.Fatalf("e's address should not be the same as ee's")
}
e.Exat = e.Exat.Truncate(time.Millisecond)
ei.Exat = ei.Exat.Truncate(time.Millisecond)
if !e.Exat.Equal(ei.Exat) {
t.Fatalf("e should be the same as ee %v %v", e, ei)
}
})

t.Run("FetchCache", func(t *testing.T) {
ei, err := repo.FetchCache(ctx, e.Key, time.Minute)
if err != nil {
t.Fatal(err)
}
if e == ei {
t.Fatalf("e's address should not be the same as ee's")
}
e.Exat = e.Exat.Truncate(time.Millisecond)
ei.Exat = ei.Exat.Truncate(time.Millisecond)
if !e.Exat.Equal(ei.Exat) {
t.Fatalf("ee should be the same as e %v %v", e, ei)
}
})
t.Run("Delete", func(t *testing.T) {
if err := repo.Remove(ctx, e.Key); err != nil {
t.Fatal(err)
}
ei, err := repo.Fetch(ctx, e.Key)
if !IsRecordNotFound(err) {
t.Fatalf("should not be found, but got %v", ei)
}
_, err = repo.FetchCache(ctx, e.Key, time.Minute)
if !IsRecordNotFound(err) {
t.Fatalf("should not be found, but got %v", e)
}
})
})

t.Run("SaveMulti", func(t *testing.T) {
entities := []*HashTestTTLStruct{
repo.NewEntity(),
repo.NewEntity(),
repo.NewEntity(),
}

for _, e := range entities {
e.Exat = time.Now().Add(time.Minute)
}

for i, err := range repo.SaveMulti(context.Background(), entities...) {
if err != nil {
t.Fatal(err)
}
if entities[i].Ver != 1 {
t.Fatalf("unexpected ver %d", entities[i].Ver)
}
}

entities[len(entities)-1].Ver = 0

for i, err := range repo.SaveMulti(context.Background(), entities...) {
if i == len(entities)-1 {
if err != ErrVersionMismatch {
t.Fatalf("unexpected err %v", err)
}
} else {
if err != nil {
t.Fatal(err)
}
if entities[i].Ver != 2 {
t.Fatalf("unexpected ver %d", entities[i].Ver)
}
}
}

for _, e := range entities {
exat, err := client.Do(ctx, client.B().Pexpiretime().Key("hashttl:"+e.Key).Build()).AsInt64()
if err != nil {
t.Fatal(err)
}
if exat != e.Exat.UnixMilli() {
t.Fatalf("wrong exat")
}
}
})
}
16 changes: 14 additions & 2 deletions om/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,18 @@ func (r *JSONRepository[T]) decode(record string) (*T, error) {
func (r *JSONRepository[T]) toExec(entity *T) (verf reflect.Value, exec rueidis.LuaExec) {
val := reflect.ValueOf(entity).Elem()
verf = val.Field(r.schema.ver.idx)
extVal := int64(0)
if r.schema.ext != nil {
if ext, ok := val.Field(r.schema.ext.idx).Interface().(time.Time); ok && !ext.IsZero() {
extVal = ext.UnixMilli()
}
}
exec.Keys = []string{key(r.prefix, val.Field(r.schema.key.idx).String())}
exec.Args = []string{r.schema.ver.name, strconv.FormatInt(verf.Int(), 10), rueidis.JSON(entity)}
if extVal != 0 {
exec.Args = []string{r.schema.ver.name, strconv.FormatInt(verf.Int(), 10), rueidis.JSON(entity), strconv.FormatInt(extVal, 10)}
} else {
exec.Args = []string{r.schema.ver.name, strconv.FormatInt(verf.Int(), 10), rueidis.JSON(entity)}
}
return
}

Expand Down Expand Up @@ -174,7 +184,9 @@ local v = redis.call('JSON.GET',KEYS[1],ARGV[1])
if (not v or v == ARGV[2])
then
redis.call('JSON.SET',KEYS[1],'$',ARGV[3])
return redis.call('JSON.NUMINCRBY',KEYS[1],ARGV[1],1)
local v = redis.call('JSON.NUMINCRBY',KEYS[1],ARGV[1],1)
if #ARGV == 4 then redis.call('PEXPIREAT',KEYS[1],ARGV[4]) end
return v
end
return nil
`)
Loading

0 comments on commit 7654a00

Please sign in to comment.