Skip to content

Commit 7df53a1

Browse files
Adds support for TTL in Redis State Store (#990)
* Adds support for TTL in Redis State Store * return explicit errors * Refactor Redis TTL * reduce nestedness * Changing if logic for ttl to fit lint * Ignore lint for nestif Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
1 parent c930c97 commit 7df53a1

File tree

2 files changed

+201
-6
lines changed

2 files changed

+201
-6
lines changed

state/redis/redis.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
infoReplicationDelimiter = "\r\n"
3131
maxRetries = "maxRetries"
3232
maxRetryBackoff = "maxRetryBackoff"
33+
ttlInSeconds = "ttlInSeconds"
3334
defaultBase = 10
3435
defaultBitSize = 0
3536
defaultDB = 0
@@ -230,6 +231,10 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
230231
if err != nil {
231232
return err
232233
}
234+
ttl, err := r.parseTTL(req)
235+
if err != nil {
236+
return fmt.Errorf("failed to parse ttl from metadata: %s", err)
237+
}
233238

234239
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
235240

@@ -242,6 +247,20 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
242247
return fmt.Errorf("failed to set key %s: %s", req.Key, err)
243248
}
244249

250+
if ttl != nil && *ttl > 0 {
251+
_, err = r.client.Do(r.ctx, "EXPIRE", req.Key, *ttl).Result()
252+
if err != nil {
253+
return fmt.Errorf("failed to set key %s ttl: %s", req.Key, err)
254+
}
255+
}
256+
257+
if ttl != nil && *ttl <= 0 {
258+
_, err = r.client.Do(r.ctx, "PERSIST", req.Key).Result()
259+
if err != nil {
260+
return fmt.Errorf("failed to persist key %s: %s", req.Key, err)
261+
}
262+
}
263+
245264
if req.Options.Consistency == state.Strong && r.replicas > 0 {
246265
_, err = r.client.Do(r.ctx, "WAIT", r.replicas, 1000).Result()
247266
if err != nil {
@@ -261,14 +280,25 @@ func (r *StateStore) Set(req *state.SetRequest) error {
261280
func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
262281
pipe := r.client.TxPipeline()
263282
for _, o := range request.Operations {
283+
//nolint:golint,nestif
264284
if o.Operation == state.Upsert {
265285
req := o.Request.(state.SetRequest)
266286
ver, err := r.parseETag(&req)
267287
if err != nil {
268288
return err
269289
}
290+
ttl, err := r.parseTTL(&req)
291+
if err != nil {
292+
return fmt.Errorf("failed to parse ttl from metadata: %s", err)
293+
}
270294
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
271295
pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt)
296+
if ttl != nil && *ttl > 0 {
297+
pipe.Do(r.ctx, "EXPIRE", req.Key, *ttl)
298+
}
299+
if ttl != nil && *ttl <= 0 {
300+
pipe.Do(r.ctx, "PERSIST", req.Key)
301+
}
272302
} else if o.Operation == state.Delete {
273303
req := o.Request.(state.DeleteRequest)
274304
if req.ETag == nil {
@@ -318,6 +348,20 @@ func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
318348
return ver, nil
319349
}
320350

351+
func (r *StateStore) parseTTL(req *state.SetRequest) (*int, error) {
352+
if val, ok := req.Metadata[ttlInSeconds]; ok && val != "" {
353+
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
354+
if err != nil {
355+
return nil, err
356+
}
357+
ttl := int(parsedVal)
358+
359+
return &ttl, nil
360+
}
361+
362+
return nil, nil
363+
}
364+
321365
func (r *StateStore) Close() error {
322366
r.cancel()
323367

state/redis/redis_test.go

Lines changed: 157 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package redis
77

88
import (
99
"context"
10+
"strconv"
1011
"testing"
12+
"time"
1113

1214
"github.com/agrea/ptr"
1315
miniredis "github.com/alicebob/miniredis/v2"
@@ -90,6 +92,51 @@ func TestParseEtag(t *testing.T) {
9092
})
9193
}
9294

95+
func TestParseTTL(t *testing.T) {
96+
store := NewRedisStateStore(logger.NewLogger("test"))
97+
t.Run("TTL Not an integer", func(t *testing.T) {
98+
ttlInSeconds := "not an integer"
99+
ttl, err := store.parseTTL(&state.SetRequest{
100+
Metadata: map[string]string{
101+
"ttlInSeconds": ttlInSeconds,
102+
},
103+
})
104+
assert.Error(t, err)
105+
assert.Nil(t, ttl)
106+
})
107+
t.Run("TTL specified with wrong key", func(t *testing.T) {
108+
ttlInSeconds := 12345
109+
ttl, err := store.parseTTL(&state.SetRequest{
110+
Metadata: map[string]string{
111+
"expirationTime": strconv.Itoa(ttlInSeconds),
112+
},
113+
})
114+
assert.NoError(t, err)
115+
assert.Nil(t, ttl)
116+
})
117+
t.Run("TTL is a number", func(t *testing.T) {
118+
ttlInSeconds := 12345
119+
ttl, err := store.parseTTL(&state.SetRequest{
120+
Metadata: map[string]string{
121+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
122+
},
123+
})
124+
assert.NoError(t, err)
125+
assert.Equal(t, *ttl, ttlInSeconds)
126+
})
127+
128+
t.Run("TTL never expires", func(t *testing.T) {
129+
ttlInSeconds := -1
130+
ttl, err := store.parseTTL(&state.SetRequest{
131+
Metadata: map[string]string{
132+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
133+
},
134+
})
135+
assert.NoError(t, err)
136+
assert.Equal(t, *ttl, ttlInSeconds)
137+
})
138+
}
139+
93140
func TestParseConnectedSlavs(t *testing.T) {
94141
store := NewRedisStateStore(logger.NewLogger("test"))
95142

@@ -126,13 +173,35 @@ func TestTransactionalUpsert(t *testing.T) {
126173
ss.ctx, ss.cancel = context.WithCancel(context.Background())
127174

128175
err := ss.Multi(&state.TransactionalStateRequest{
129-
Operations: []state.TransactionalStateOperation{{
130-
Operation: state.Upsert,
131-
Request: state.SetRequest{
132-
Key: "weapon",
133-
Value: "deathstar",
176+
Operations: []state.TransactionalStateOperation{
177+
{
178+
Operation: state.Upsert,
179+
Request: state.SetRequest{
180+
Key: "weapon",
181+
Value: "deathstar",
182+
},
134183
},
135-
}},
184+
{
185+
Operation: state.Upsert,
186+
Request: state.SetRequest{
187+
Key: "weapon2",
188+
Value: "deathstar2",
189+
Metadata: map[string]string{
190+
"ttlInSeconds": "123",
191+
},
192+
},
193+
},
194+
{
195+
Operation: state.Upsert,
196+
Request: state.SetRequest{
197+
Key: "weapon3",
198+
Value: "deathstar3",
199+
Metadata: map[string]string{
200+
"ttlInSeconds": "-1",
201+
},
202+
},
203+
},
204+
},
136205
})
137206
assert.Equal(t, nil, err)
138207

@@ -144,6 +213,18 @@ func TestTransactionalUpsert(t *testing.T) {
144213
assert.Equal(t, nil, err)
145214
assert.Equal(t, ptr.String("1"), version)
146215
assert.Equal(t, `"deathstar"`, data)
216+
217+
res, err = c.Do(context.Background(), "TTL", "weapon").Result()
218+
assert.Equal(t, nil, err)
219+
assert.Equal(t, int64(-1), res)
220+
221+
res, err = c.Do(context.Background(), "TTL", "weapon2").Result()
222+
assert.Equal(t, nil, err)
223+
assert.Equal(t, int64(123), res)
224+
225+
res, err = c.Do(context.Background(), "TTL", "weapon3").Result()
226+
assert.Equal(t, nil, err)
227+
assert.Equal(t, int64(-1), res)
147228
}
148229

149230
func TestTransactionalDelete(t *testing.T) {
@@ -201,6 +282,76 @@ func TestPing(t *testing.T) {
201282
assert.Error(t, err)
202283
}
203284

285+
func TestSetRequestWithTTL(t *testing.T) {
286+
s, c := setupMiniredis()
287+
defer s.Close()
288+
289+
ss := &StateStore{
290+
client: c,
291+
json: jsoniter.ConfigFastest,
292+
logger: logger.NewLogger("test"),
293+
}
294+
ss.ctx, ss.cancel = context.WithCancel(context.Background())
295+
296+
t.Run("TTL specified", func(t *testing.T) {
297+
ttlInSeconds := 100
298+
ss.Set(&state.SetRequest{
299+
Key: "weapon100",
300+
Value: "deathstar100",
301+
Metadata: map[string]string{
302+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
303+
},
304+
})
305+
306+
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
307+
308+
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
309+
})
310+
311+
t.Run("TTL not specified", func(t *testing.T) {
312+
ss.Set(&state.SetRequest{
313+
Key: "weapon200",
314+
Value: "deathstar200",
315+
})
316+
317+
ttl, _ := ss.client.TTL(ss.ctx, "weapon200").Result()
318+
319+
assert.Equal(t, time.Duration(-1), ttl)
320+
})
321+
322+
t.Run("TTL Changed for Existing Key", func(t *testing.T) {
323+
ss.Set(&state.SetRequest{
324+
Key: "weapon300",
325+
Value: "deathstar300",
326+
})
327+
ttl, _ := ss.client.TTL(ss.ctx, "weapon300").Result()
328+
assert.Equal(t, time.Duration(-1), ttl)
329+
330+
// make the key no longer persistent
331+
ttlInSeconds := 123
332+
ss.Set(&state.SetRequest{
333+
Key: "weapon300",
334+
Value: "deathstar300",
335+
Metadata: map[string]string{
336+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
337+
},
338+
})
339+
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
340+
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
341+
342+
// make the key persistent again
343+
ss.Set(&state.SetRequest{
344+
Key: "weapon300",
345+
Value: "deathstar301",
346+
Metadata: map[string]string{
347+
"ttlInSeconds": strconv.Itoa(-1),
348+
},
349+
})
350+
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
351+
assert.Equal(t, time.Duration(-1), ttl)
352+
})
353+
}
354+
204355
func TestTransactionalDeleteNoEtag(t *testing.T) {
205356
s, c := setupMiniredis()
206357
defer s.Close()

0 commit comments

Comments
 (0)