forked from go-eden/routine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroutine_storage.go
142 lines (125 loc) · 3.08 KB
/
routine_storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package routine
import (
"sync"
"sync/atomic"
"time"
"unsafe"
)
var (
storages atomic.Value // The global storage map (map[int64]*store)
storageLock sync.Mutex // The Lock to control accessing of storages
storageGCTimer *time.Timer // The timer of storage's garbage collector
storageGCInterval = time.Second * 30 // The pre-defined gc interval
)
func init() {
storages.Store(map[int64]*store{})
}
func gcRunning() bool {
storageLock.Lock()
defer storageLock.Unlock()
return storageGCTimer != nil
}
type store struct {
gid int64
count uint32
values map[uintptr]interface{}
}
type storage struct {
}
func (t *storage) Get() (v interface{}) {
s := loadCurrentStore()
id := uintptr(unsafe.Pointer(t))
return s.values[id]
}
func (t *storage) Set(v interface{}) (oldValue interface{}) {
s := loadCurrentStore()
id := uintptr(unsafe.Pointer(t))
oldValue = s.values[id]
s.values[id] = v
atomic.StoreUint32(&s.count, uint32(len(s.values)))
// try restart gc timer if Set for the first time
if oldValue == nil {
storageLock.Lock()
if storageGCTimer == nil {
storageGCTimer = time.AfterFunc(storageGCInterval, clearDeadStore)
}
storageLock.Unlock()
}
return
}
func (t *storage) Del() (v interface{}) {
s := loadCurrentStore()
id := uintptr(unsafe.Pointer(t))
v = s.values[id]
delete(s.values, id)
atomic.StoreUint32(&s.count, uint32(len(s.values)))
return
}
func (t *storage) Clear() {
s := loadCurrentStore()
s.values = map[uintptr]interface{}{}
atomic.StoreUint32(&s.count, 0)
}
// loadCurrentStore load the store of current goroutine.
func loadCurrentStore() (s *store) {
gid := Goid()
storeMap := storages.Load().(map[int64]*store)
if s = storeMap[gid]; s == nil {
storageLock.Lock()
oldStoreMap := storages.Load().(map[int64]*store)
if s = oldStoreMap[gid]; s == nil {
s = &store{
gid: gid,
values: map[uintptr]interface{}{},
}
newStoreMap := make(map[int64]*store, len(oldStoreMap)+1)
for k, v := range oldStoreMap {
newStoreMap[k] = v
}
newStoreMap[gid] = s
storages.Store(newStoreMap)
}
storageLock.Unlock()
}
return
}
// clearDeadStore clear all data of dead goroutine.
func clearDeadStore() {
storageLock.Lock()
defer storageLock.Unlock()
// load all alive goids
gids := AllGoids()
gidMap := make(map[int64]struct{}, len(gids))
for _, gid := range gids {
gidMap[gid] = struct{}{}
}
// scan global storeMap check the dead and live store count.
var storeMap = storages.Load().(map[int64]*store)
var deadCnt, liveCnt int
for id, s := range storeMap {
if _, ok := gidMap[id]; ok {
if atomic.LoadUint32(&s.count) > 0 {
liveCnt++
}
liveCnt++
} else {
deadCnt++
}
}
// clean dead store of dead goroutine if need.
if deadCnt > 0 {
newStoreMap := make(map[int64]*store, len(storeMap)-deadCnt)
for id, s := range storeMap {
if _, ok := gidMap[id]; ok {
newStoreMap[id] = s
}
}
storages.Store(newStoreMap)
}
// setup next round timer if need. TODO it's ok?
if liveCnt > 0 {
storageGCTimer.Reset(storageGCInterval)
} else {
storageGCTimer = nil
}
}