forked from leotons/estuary
-
Notifications
You must be signed in to change notification settings - Fork 0
/
offloading.go
283 lines (231 loc) · 6.96 KB
/
offloading.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
package main
import (
"context"
"fmt"
"sort"
"time"
"github.com/application-research/estuary/util"
"golang.org/x/xerrors"
)
const cacheThreshold = 0.50
type offloadCandidate struct {
Content
LastAccess time.Time
}
type collectionResult struct {
SpaceRequest int64 `json:"spaceRequest"`
SpaceFreed int64 `json:"spaceFreed"`
ContentsFreed []offloadCandidate `json:"contentsFreed"`
CandidatesConsidered int `json:"candidatesConsidered"`
BlocksRemoved int `json:"blocksRemoved"`
DryRun bool `json:"dryRun"`
OffloadError string `json:"offloadError,omitempty"`
}
func (cm *ContentManager) ClearUnused(ctx context.Context, spaceRequest int64, loc string, users []uint, dryrun bool) (*collectionResult, error) {
ctx, span := cm.tracer.Start(ctx, "clearUnused")
defer span.End()
// first, gather candidates for removal
// that is any content we have made the correct number of deals for, that
// hasnt been fetched from us in X days
candidates, err := cm.getRemovalCandidates(ctx, false, loc, users)
if err != nil {
return nil, fmt.Errorf("failed to get removal candidates: %w", err)
}
offs, err := cm.getLastAccesses(ctx, candidates)
if err != nil {
return nil, fmt.Errorf("failed to get last accesses: %w", err)
}
// grab enough candidates to fulfil the requested space
bytesRemaining := spaceRequest
var toRemove []offloadCandidate
for _, o := range offs {
toRemove = append(toRemove, o)
bytesRemaining -= o.Size
if bytesRemaining <= 0 {
break
}
}
result := &collectionResult{
SpaceRequest: spaceRequest,
SpaceFreed: spaceRequest - bytesRemaining,
ContentsFreed: toRemove,
CandidatesConsidered: len(candidates),
DryRun: dryrun,
}
if dryrun {
return result, nil
}
// go offload them all
var ids []uint
for _, tr := range toRemove {
ids = append(ids, tr.Content.ID)
}
rem, err := cm.OffloadContents(ctx, ids)
if err != nil {
result.OffloadError = err.Error()
log.Warnf("failed to offload contents: %s", err)
}
result.BlocksRemoved = rem
return result, nil
}
func (cm *ContentManager) getLastAccesses(ctx context.Context, candidates []removalCandidateInfo) ([]offloadCandidate, error) {
ctx, span := cm.tracer.Start(ctx, "getLastAccesses")
defer span.End()
var offs []offloadCandidate
for _, c := range candidates {
la, err := cm.getLastAccessForContent(c.Content)
if err != nil {
log.Errorf("check last access for %d: %s", c.Content, err)
continue
}
offs = append(offs, offloadCandidate{
Content: c.Content,
LastAccess: la,
})
}
// sort candidates by 'last used'
sort.Slice(offs, func(i, j int) bool {
return offs[i].LastAccess.Before(offs[j].LastAccess)
})
return offs, nil
}
// TODO: this is only looking at the root, maybe we could find an efficient way to check more of the objects?
// additionally, for aggregates, we should check each aggregated item under the root
func (cm *ContentManager) getLastAccessForContent(cont Content) (time.Time, error) {
var obj Object
if err := cm.DB.First(&obj, "cid = ?", cont.Cid).Error; err != nil {
return time.Time{}, err
}
return obj.LastAccess, nil
}
type refResult struct {
Cid util.DbCID
}
func (cm *ContentManager) OffloadContents(ctx context.Context, conts []uint) (int, error) {
ctx, span := cm.tracer.Start(ctx, "OffloadContents")
defer span.End()
var local []uint
remote := make(map[string][]uint)
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
for _, c := range conts {
var cont Content
if err := cm.DB.First(&cont, "id = ?", c).Error; err != nil {
return 0, err
}
if cont.Location == "local" {
local = append(local, cont.ID)
} else {
remote[cont.Location] = append(remote[cont.Location], cont.ID)
}
if cont.AggregatedIn > 0 {
return 0, fmt.Errorf("cannot offload aggregated content")
}
if err := cm.DB.Model(&Content{}).Where("id = ?", c).Update("offloaded", true).Error; err != nil {
return 0, err
}
if err := cm.DB.Model(&ObjRef{}).Where("content = ?", c).Update("offloaded", 1).Error; err != nil {
return 0, err
}
if cont.Aggregate {
if err := cm.DB.Model(&Content{}).Where("aggregated_in = ?", c).Update("offloaded", true).Error; err != nil {
return 0, err
}
if err := cm.DB.Model(&ObjRef{}).
Where("content in (?)",
cm.DB.Model(Content{}).
Where("aggregated_in = ?", c).
Select("id")).
Update("offloaded", 1).Error; err != nil {
return 0, err
}
var children []Content
if err := cm.DB.Find(&children, "aggregated_in = ?", c).Error; err != nil {
return 0, err
}
for _, c := range children {
if cont.Location == "local" {
local = append(local, c.ID)
} else {
remote[cont.Location] = append(remote[cont.Location], c.ID)
}
}
}
}
for loc, conts := range remote {
if err := cm.sendUnpinCmd(ctx, loc, conts); err != nil {
log.Errorf("failed to send unpin command to shuttle: %s", err)
}
}
var deleteCount int
for _, c := range local {
objs, err := cm.objectsForPin(ctx, c)
if err != nil {
return 0, err
}
for _, o := range objs {
del, err := cm.deleteIfNotPinnedLock(ctx, o)
if err != nil {
return deleteCount, err
}
if del {
deleteCount++
}
}
}
return deleteCount, nil
}
type removalCandidateInfo struct {
Content
TotalDeals int `json:"totalDeals"`
ActiveDeals int `json:"activeDeals"`
InProgressDeals int `json:"inProgressDeals"`
}
func (cm *ContentManager) getRemovalCandidates(ctx context.Context, all bool, loc string, users []uint) ([]removalCandidateInfo, error) {
ctx, span := cm.tracer.Start(ctx, "getRemovalCandidates")
defer span.End()
q := cm.DB.Model(Content{}).Where("active and not offloaded and (aggregate or not aggregated_in > 0)")
if loc != "" {
q = q.Where("location = ?", loc)
}
if len(users) > 0 {
q = q.Where("user_id in ?", users)
}
var conts []Content
if err := q.Scan(&conts).Error; err != nil {
return nil, fmt.Errorf("scanning removal candidates failed: %w", err)
}
var toOffload []removalCandidateInfo
for _, c := range conts {
good, progress, failed, err := cm.contentIsProperlyReplicated(ctx, c.ID)
if err != nil {
return nil, xerrors.Errorf("failed to check replication of %d: %w", c.ID, err)
}
if all || good >= c.Replication {
toOffload = append(toOffload, removalCandidateInfo{
Content: c,
TotalDeals: good + progress + failed,
ActiveDeals: good,
InProgressDeals: progress,
})
}
}
return toOffload, nil
}
func (cm *ContentManager) contentIsProperlyReplicated(ctx context.Context, c uint) (int, int, int, error) {
var contentDeals []contentDeal
if err := cm.DB.Find(&contentDeals, "content = ?", c).Error; err != nil {
return 0, 0, 0, err
}
var goodCount, inprog, failed int
for _, d := range contentDeals {
if d.Failed {
failed++
} else if !d.Failed && d.DealID > 0 {
goodCount++
} else {
inprog++
}
}
return goodCount, inprog, failed, nil
}