Skip to content

Commit 3c464a3

Browse files
committed
Adds support for TTL in Redis State Store
1 parent b1f584d commit 3c464a3

File tree

2 files changed

+188
-6
lines changed

2 files changed

+188
-6
lines changed

state/redis/redis.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
infoReplicationDelimiter = "\r\n"
3131
maxRetries = "maxRetries"
3232
maxRetryBackoff = "maxRetryBackoff"
33+
ttlInSeconds = "ttlInSeconds"
3334
defaultBase = 10
3435
defaultBitSize = 0
3536
defaultDB = 0
@@ -230,6 +231,7 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
230231
if err != nil {
231232
return err
232233
}
234+
ttl := r.parseTTL(req)
233235

234236
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
235237

@@ -241,6 +243,19 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
241243

242244
return fmt.Errorf("failed to set key %s: %s", req.Key, err)
243245
}
246+
switch {
247+
case ttl == -1:
248+
_, err = r.client.Do(r.ctx, "PERSIST", req.Key).Result()
249+
if err != nil {
250+
return fmt.Errorf("failed to persist key %s: %s", req.Key, err)
251+
}
252+
253+
case ttl > 0:
254+
_, err = r.client.Do(r.ctx, "EXPIRE", req.Key, ttl).Result()
255+
if err != nil {
256+
return fmt.Errorf("failed to set key %s ttl: %s", req.Key, err)
257+
}
258+
}
244259

245260
if req.Options.Consistency == state.Strong && r.replicas > 0 {
246261
_, err = r.client.Do(r.ctx, "WAIT", r.replicas, 1000).Result()
@@ -264,11 +279,18 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
264279
if o.Operation == state.Upsert {
265280
req := o.Request.(state.SetRequest)
266281
ver, err := r.parseETag(&req)
282+
ttl := r.parseTTL(&req)
267283
if err != nil {
268284
return err
269285
}
270286
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
271287
pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt)
288+
switch {
289+
case ttl == -1:
290+
pipe.Do(r.ctx, "PERSIST", req.Key)
291+
case ttl > 0:
292+
pipe.Do(r.ctx, "EXPIRE", req.Key, ttl)
293+
}
272294
} else if o.Operation == state.Delete {
273295
req := o.Request.(state.DeleteRequest)
274296
if req.ETag == nil {
@@ -318,6 +340,19 @@ func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
318340
return ver, nil
319341
}
320342

343+
func (r *StateStore) parseTTL(req *state.SetRequest) int {
344+
if val, ok := req.Metadata[ttlInSeconds]; ok && val != "" {
345+
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
346+
if err != nil {
347+
return 0
348+
}
349+
350+
return int(parsedVal)
351+
}
352+
353+
return 0
354+
}
355+
321356
func (r *StateStore) Close() error {
322357
r.cancel()
323358

state/redis/redis_test.go

Lines changed: 153 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ package redis
77

88
import (
99
"context"
10+
"strconv"
1011
"testing"
12+
"time"
1113

1214
"github.com/agrea/ptr"
1315
miniredis "github.com/alicebob/miniredis/v2"
@@ -90,6 +92,47 @@ func TestParseEtag(t *testing.T) {
9092
})
9193
}
9294

95+
func TestParseTTL(t *testing.T) {
96+
store := NewRedisStateStore(logger.NewLogger("test"))
97+
t.Run("TTL Not an integer", func(t *testing.T) {
98+
ttlInSeconds := "not an integer"
99+
ttl := store.parseTTL(&state.SetRequest{
100+
Metadata: map[string]string{
101+
"ttlInSeconds": ttlInSeconds,
102+
},
103+
})
104+
assert.Equal(t, ttl, 0)
105+
})
106+
t.Run("TTL specified with wrong key", func(t *testing.T) {
107+
ttlInSeconds := 12345
108+
ttl := store.parseTTL(&state.SetRequest{
109+
Metadata: map[string]string{
110+
"expirationTime": strconv.Itoa(ttlInSeconds),
111+
},
112+
})
113+
assert.Equal(t, ttl, 0)
114+
})
115+
t.Run("TTL is a number", func(t *testing.T) {
116+
ttlInSeconds := 12345
117+
ttl := store.parseTTL(&state.SetRequest{
118+
Metadata: map[string]string{
119+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
120+
},
121+
})
122+
assert.Equal(t, ttl, ttlInSeconds)
123+
})
124+
125+
t.Run("TTL never expires", func(t *testing.T) {
126+
ttlInSeconds := -1
127+
ttl := store.parseTTL(&state.SetRequest{
128+
Metadata: map[string]string{
129+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
130+
},
131+
})
132+
assert.Equal(t, ttl, ttlInSeconds)
133+
})
134+
}
135+
93136
func TestParseConnectedSlavs(t *testing.T) {
94137
store := NewRedisStateStore(logger.NewLogger("test"))
95138

@@ -126,13 +169,35 @@ func TestTransactionalUpsert(t *testing.T) {
126169
ss.ctx, ss.cancel = context.WithCancel(context.Background())
127170

128171
err := ss.Multi(&state.TransactionalStateRequest{
129-
Operations: []state.TransactionalStateOperation{{
130-
Operation: state.Upsert,
131-
Request: state.SetRequest{
132-
Key: "weapon",
133-
Value: "deathstar",
172+
Operations: []state.TransactionalStateOperation{
173+
{
174+
Operation: state.Upsert,
175+
Request: state.SetRequest{
176+
Key: "weapon",
177+
Value: "deathstar",
178+
},
134179
},
135-
}},
180+
{
181+
Operation: state.Upsert,
182+
Request: state.SetRequest{
183+
Key: "weapon2",
184+
Value: "deathstar2",
185+
Metadata: map[string]string{
186+
"ttlInSeconds": "123",
187+
},
188+
},
189+
},
190+
{
191+
Operation: state.Upsert,
192+
Request: state.SetRequest{
193+
Key: "weapon3",
194+
Value: "deathstar3",
195+
Metadata: map[string]string{
196+
"ttlInSeconds": "-1",
197+
},
198+
},
199+
},
200+
},
136201
})
137202
assert.Equal(t, nil, err)
138203

@@ -144,6 +209,18 @@ func TestTransactionalUpsert(t *testing.T) {
144209
assert.Equal(t, nil, err)
145210
assert.Equal(t, ptr.String("1"), version)
146211
assert.Equal(t, `"deathstar"`, data)
212+
213+
res, err = c.Do(context.Background(), "TTL", "weapon").Result()
214+
assert.Equal(t, nil, err)
215+
assert.Equal(t, int64(-1), res)
216+
217+
res, err = c.Do(context.Background(), "TTL", "weapon2").Result()
218+
assert.Equal(t, nil, err)
219+
assert.Equal(t, int64(123), res)
220+
221+
res, err = c.Do(context.Background(), "TTL", "weapon3").Result()
222+
assert.Equal(t, nil, err)
223+
assert.Equal(t, int64(-1), res)
147224
}
148225

149226
func TestTransactionalDelete(t *testing.T) {
@@ -201,6 +278,76 @@ func TestPing(t *testing.T) {
201278
assert.Error(t, err)
202279
}
203280

281+
func TestSetRequestWithTTL(t *testing.T) {
282+
s, c := setupMiniredis()
283+
defer s.Close()
284+
285+
ss := &StateStore{
286+
client: c,
287+
json: jsoniter.ConfigFastest,
288+
logger: logger.NewLogger("test"),
289+
}
290+
ss.ctx, ss.cancel = context.WithCancel(context.Background())
291+
292+
t.Run("TTL specified", func(t *testing.T) {
293+
ttlInSeconds := 100
294+
ss.Set(&state.SetRequest{
295+
Key: "weapon100",
296+
Value: "deathstar100",
297+
Metadata: map[string]string{
298+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
299+
},
300+
})
301+
302+
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
303+
304+
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
305+
})
306+
307+
t.Run("TTL not specified", func(t *testing.T) {
308+
ss.Set(&state.SetRequest{
309+
Key: "weapon200",
310+
Value: "deathstar200",
311+
})
312+
313+
ttl, _ := ss.client.TTL(ss.ctx, "weapon200").Result()
314+
315+
assert.Equal(t, time.Duration(-1), ttl)
316+
})
317+
318+
t.Run("TTL Changed for Existing Key", func(t *testing.T) {
319+
ss.Set(&state.SetRequest{
320+
Key: "weapon300",
321+
Value: "deathstar300",
322+
})
323+
ttl, _ := ss.client.TTL(ss.ctx, "weapon300").Result()
324+
assert.Equal(t, time.Duration(-1), ttl)
325+
326+
// make the key no longer persistent
327+
ttlInSeconds := 123
328+
ss.Set(&state.SetRequest{
329+
Key: "weapon300",
330+
Value: "deathstar300",
331+
Metadata: map[string]string{
332+
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
333+
},
334+
})
335+
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
336+
assert.Equal(t, time.Duration(ttlInSeconds)*time.Second, ttl)
337+
338+
// make the key persistent again
339+
ss.Set(&state.SetRequest{
340+
Key: "weapon300",
341+
Value: "deathstar301",
342+
Metadata: map[string]string{
343+
"ttlInSeconds": strconv.Itoa(-1),
344+
},
345+
})
346+
ttl, _ = ss.client.TTL(ss.ctx, "weapon300").Result()
347+
assert.Equal(t, time.Duration(-1), ttl)
348+
})
349+
}
350+
204351
func TestTransactionalDeleteNoEtag(t *testing.T) {
205352
s, c := setupMiniredis()
206353
defer s.Close()

0 commit comments

Comments
 (0)