42
42
43
43
startTs = flag .Int ("start-timestamp" , 0 , "timestamp at which to start, defaults to 0" )
44
44
endTs = flag .Int ("end-timestamp" , math .MaxInt32 , "timestamp at which to stop, defaults to int max" )
45
- numThreads = flag .Int ("threads" , 1 , "number of workers to use to process data" )
45
+ numThreads = flag .Int ("threads" , 10 , "number of workers to use to process data" )
46
46
47
47
verbose = flag .Bool ("verbose" , false , "show every record being processed" )
48
48
@@ -85,7 +85,7 @@ func main() {
85
85
session , err := NewCassandraStore ()
86
86
87
87
if err != nil {
88
- panic ( fmt . Sprintf ("Failed to instantiate cassandra: %s" , err ) )
88
+ log . Fatalf ("Failed to instantiate cassandra: %s" , err )
89
89
}
90
90
91
91
update (session , ttl , tableIn , tableOut )
@@ -182,26 +182,26 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, session *gocql.Sessi
182
182
query = fmt .Sprintf ("INSERT INTO %s (data, key, ts) values(?,?,?) USING TTL %d" , tableOut , newTTL )
183
183
}
184
184
if * verbose {
185
- log .Printf ("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n " , id , atomic .LoadUint64 (& doneRows )+ 1 , tableIn , key , ts , query , data )
185
+ log .Infof ("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n " , id , atomic .LoadUint64 (& doneRows )+ 1 , tableIn , key , ts , query , data )
186
186
}
187
187
188
188
err := session .Query (query , data , key , ts ).Exec ()
189
189
if err != nil {
190
- fmt . Fprintf ( os . Stderr , "ERROR: id=%d failed updating %s %s %d: %q" , id , tableOut , key , ts , err )
190
+ log . Errorf ( " id=%d failed updating %s %s %d: %q" , id , tableOut , key , ts , err )
191
191
}
192
192
193
193
doneRowsSnap := atomic .AddUint64 (& doneRows , 1 )
194
194
if doneRowsSnap % 10000 == 0 {
195
195
doneKeysSnap := atomic .LoadUint64 (& doneKeys )
196
196
completeness := completenessEstimate (token )
197
- log .Printf ("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)" , id , doneKeysSnap , doneRowsSnap , token , completeness * 100 )
197
+ log .Infof ("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)" , id , doneKeysSnap , doneRowsSnap , token , completeness * 100 )
198
198
}
199
199
}
200
200
err := iter .Close ()
201
201
if err != nil {
202
202
doneKeysSnap := atomic .LoadUint64 (& doneKeys )
203
203
doneRowsSnap := atomic .LoadUint64 (& doneRows )
204
- fmt . Fprintf ( os . Stderr , "ERROR: id=%d failed querying %s: %q. processed %d keys, %d rows" , id , tableIn , err , doneKeysSnap , doneRowsSnap )
204
+ log . Errorf ( " id=%d failed querying %s: %q. processed %d keys, %d rows" , id , tableIn , err , doneKeysSnap , doneRowsSnap )
205
205
}
206
206
atomic .AddUint64 (& doneKeys , 1 )
207
207
}
@@ -227,11 +227,13 @@ func update(session *gocql.Session, ttl int, tableIn, tableOut string) {
227
227
close (jobs )
228
228
err := keyItr .Close ()
229
229
if err != nil {
230
- fmt .Fprintf (os .Stderr , "ERROR: failed querying %s: %q. processed %d keys, %d rows" , tableIn , err , doneKeys , doneRows )
230
+ doneKeysSnap := atomic .LoadUint64 (& doneKeys )
231
+ doneRowsSnap := atomic .LoadUint64 (& doneRows )
232
+ log .Errorf ("failed querying %s: %q. processed %d keys, %d rows" , tableIn , err , doneKeysSnap , doneRowsSnap )
231
233
wg .Wait ()
232
234
os .Exit (2 )
233
235
}
234
236
235
237
wg .Wait ()
236
- log .Printf ("DONE. Processed %d keys, %d rows" , doneKeys , doneRows )
238
+ log .Infof ("DONE. Processed %d keys, %d rows" , doneKeys , doneRows )
237
239
}
0 commit comments