@@ -2,6 +2,7 @@ package hamt
2
2
3
3
import (
4
4
"context"
5
+ "time"
5
6
//"fmt"
6
7
//"os"
7
8
@@ -29,7 +30,7 @@ type fetcher struct {
29
30
30
31
idle bool
31
32
32
- done chan int
33
+ done chan batchJob
33
34
34
35
todoFirst * job // do this job first since we are waiting for its results
35
36
todo jobStack // stack of jobs that still need to be done
@@ -43,6 +44,8 @@ type fetcher struct {
43
44
44
45
// other useful stats
45
46
cidCnt int
47
+
48
+ start time.Time
46
49
}
47
50
48
51
// batchSize must be at least as large as the largest number of cids
@@ -65,7 +68,7 @@ func startFetcher(ctx context.Context, dserv ipld.DAGService) *fetcher {
65
68
reqRes : make (chan * Shard ),
66
69
result : make (chan result ),
67
70
idle : true ,
68
- done : make (chan int ),
71
+ done : make (chan batchJob ),
69
72
jobs : make (map [* Shard ]* job ),
70
73
}
71
74
go f .mainLoop ()
@@ -109,6 +112,7 @@ type jobStack struct {
109
112
110
113
func (f * fetcher ) mainLoop () {
111
114
var want * Shard
115
+ f .start = time .Now ()
112
116
for {
113
117
select {
114
118
case id := <- f .reqRes :
@@ -142,8 +146,9 @@ func (f *fetcher) mainLoop() {
142
146
}
143
147
want = id
144
148
}
145
- case cnt := <- f .done :
146
- f .doneCnt += cnt
149
+ case bj := <- f .done :
150
+ f .doneCnt += len (bj .jobs )
151
+ f .cidCnt += len (bj .cids )
147
152
f .launch ()
148
153
if want != nil {
149
154
j := f .jobs [want ]
@@ -153,11 +158,10 @@ func (f *fetcher) mainLoop() {
153
158
}
154
159
}
155
160
log .Infof ("fetcher: batch job done" )
156
- log . Infof ( "fetcher stats (done, hits, nearMisses, misses): %d %d %d %d" , f . doneCnt , f . hits , f . nearMisses , f . misses )
161
+ f . mainLoopLogStats ( )
157
162
case <- f .ctx .Done ():
158
163
log .Infof ("fetcher: exiting" )
159
- log .Infof ("fetcher stats (done, hits, nearMisses, misses): %d %d %d %d" , f .doneCnt , f .hits , f .nearMisses , f .misses )
160
- log .Infof ("fetcher total number of CIDs retrieved: %d" , f .cidCnt )
164
+ f .mainLoopLogStats ()
161
165
return
162
166
}
163
167
}
@@ -173,7 +177,6 @@ func (f *fetcher) mainLoopAddJob(hamt *Shard) *job {
173
177
// programmer error
174
178
panic ("job size larger than batchSize" )
175
179
}
176
- f .cidCnt += len (j .cids )
177
180
f .todo .push (j )
178
181
f .jobs [j .id ] = j
179
182
return j
@@ -197,6 +200,12 @@ func (f *fetcher) mainLoopSendResult(j *job) {
197
200
}
198
201
}
199
202
203
+ func (f * fetcher ) mainLoopLogStats () {
204
+ log .Infof ("fetcher stats (cids, done, hits, nearMisses, misses): %d %d %d %d %d" , f .cidCnt , f .doneCnt , f .hits , f .nearMisses , f .misses )
205
+ elapsed := time .Now ().Sub (f .start ).Seconds ()
206
+ log .Infof ("fetcher perf (cids/sec, jobs/sec) %f %f" , float64 (f .cidCnt )/ elapsed , float64 (f .doneCnt + f .hits + f .nearMisses + f .misses )/ elapsed )
207
+ }
208
+
200
209
type batchJob struct {
201
210
cids []* cid.Cid
202
211
jobs []* job
@@ -252,7 +261,7 @@ func (f *fetcher) launch() {
252
261
for _ , job := range bj .jobs {
253
262
job .res = fetched
254
263
}
255
- f .done <- len ( bj . jobs )
264
+ f .done <- bj
256
265
}()
257
266
}
258
267
0 commit comments