generated from devnw/oss-template
-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
156 lines (130 loc) · 2.63 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package ttl
import (
"context"
"fmt"
"time"
)
type newvalue[V any] struct {
v V
timeout time.Duration
}
type rw[V any] struct {
ctx context.Context
cancel context.CancelFunc
read <-chan V
write chan<- newvalue[V]
}
func (c *Cache[K, V]) write(key K, value *rw[V]) error {
if c.values == nil || value == nil {
return fmt.Errorf("invalid cache instance")
}
c.valuesMu.Lock()
defer c.valuesMu.Unlock()
c.values[key] = value
return nil
}
func (c *Cache[K, V]) cleanup() {
c.valuesMu.Lock()
defer c.valuesMu.Unlock()
if c.values == nil {
return
}
// Cancel contexts
for _, value := range c.values {
if value == nil || value.cancel == nil {
continue
}
value.cancel()
}
// Nil the map out so nothing can write
// to the cache
c.values = nil
}
func (c *Cache[K, V]) set(
key K,
value V,
timeout time.Duration,
extend bool,
) *rw[V] {
ctx, cancel := context.WithCancel(c.ctx)
outgoing := make(chan V)
incoming := make(chan newvalue[V])
out := &rw[V]{
ctx: ctx,
cancel: cancel,
read: outgoing,
write: incoming,
}
go c.rwloop(
ctx,
key,
value,
outgoing,
incoming,
timeout,
extend,
)
return out
}
func (c *Cache[K, V]) rwloop(
ctx context.Context,
key K,
value V,
outgoing chan<- V,
incoming <-chan newvalue[V],
timeout time.Duration,
extend bool,
) {
defer func() {
// Recover from any panic (most likely closed channel)
// NOTE: This is ignored on purpose because the next
// defer removes this key from the cache
_ = recover()
select {
case <-ctx.Done():
return
default:
c.Delete(ctx, key) // Cleanup the map entry
}
}()
defer close(outgoing)
// Create the internal timer if the timeout is non-nil
// and assign the internal `C` channel to the timer channel
// for use in the select. Otherwise leave the timer channel
// nil so that it never trips the select statement because
// this specific key/value should be persistent.
t := time.NewTimer(timeout)
for {
select {
case <-ctx.Done():
return
case <-t.C:
return
case v, ok := <-incoming:
if !ok {
continue
}
value = v.v
timeout = v.timeout
resetTimer(t, timeout)
case outgoing <- value:
// Only extend the timer on read
// if it is configured to do so
if !extend {
continue
}
resetTimer(t, timeout)
}
}
}
// resetTimer resets the timer instance using the
// duration passed in. This uses the recommended
// set of calls from the go doc for `time.Timer.Reset`
// to ensure the the `C` channel is drained and doesn't
// immediately read on reset
func resetTimer(t *time.Timer, d time.Duration) {
if !t.Stop() {
<-t.C
}
t.Reset(d)
}