diff --git a/cmd/mt-update-ttl/main.go b/cmd/mt-update-ttl/main.go index 45458feb6e..2d594141e9 100644 --- a/cmd/mt-update-ttl/main.go +++ b/cmd/mt-update-ttl/main.go @@ -42,7 +42,7 @@ var ( startTs = flag.Int("start-timestamp", 0, "timestamp at which to start, defaults to 0") endTs = flag.Int("end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max") - numThreads = flag.Int("threads", 1, "number of workers to use to process data") + numThreads = flag.Int("threads", 10, "number of workers to use to process data") verbose = flag.Bool("verbose", false, "show every record being processed") @@ -85,7 +85,7 @@ func main() { session, err := NewCassandraStore() if err != nil { - panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err)) + log.Fatalf("Failed to instantiate cassandra: %s", err) } update(session, ttl, tableIn, tableOut) @@ -182,26 +182,26 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, session *gocql.Sessi query = fmt.Sprintf("INSERT INTO %s (data, key, ts) values(?,?,?) USING TTL %d", tableOut, newTTL) } if *verbose { - log.Printf("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data) + log.Infof("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data) } err := session.Query(query, data, key, ts).Exec() if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err) + log.Errorf("id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err) } doneRowsSnap := atomic.AddUint64(&doneRows, 1) if doneRowsSnap%10000 == 0 { doneKeysSnap := atomic.LoadUint64(&doneKeys) completeness := completenessEstimate(token) - log.Printf("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)", id, doneKeysSnap, doneRowsSnap, token, completeness*100) + log.Infof("WORKING: id=%d processed %d keys, %d rows. (last token: %d, completeness estimate %.1f%%)", id, doneKeysSnap, doneRowsSnap, token, completeness*100) } } err := iter.Close() if err != nil { doneKeysSnap := atomic.LoadUint64(&doneKeys) doneRowsSnap := atomic.LoadUint64(&doneRows) - fmt.Fprintf(os.Stderr, "ERROR: id=%d failed querying %s: %q. processed %d keys, %d rows", id, tableIn, err, doneKeysSnap, doneRowsSnap) + log.Errorf("id=%d failed querying %s: %q. processed %d keys, %d rows", id, tableIn, err, doneKeysSnap, doneRowsSnap) } atomic.AddUint64(&doneKeys, 1) } @@ -227,11 +227,13 @@ func update(session *gocql.Session, ttl int, tableIn, tableOut string) { close(jobs) err := keyItr.Close() if err != nil { - fmt.Fprintf(os.Stderr, "ERROR: failed querying %s: %q. processed %d keys, %d rows", tableIn, err, doneKeys, doneRows) + doneKeysSnap := atomic.LoadUint64(&doneKeys) + doneRowsSnap := atomic.LoadUint64(&doneRows) + log.Errorf("failed querying %s: %q. processed %d keys, %d rows", tableIn, err, doneKeysSnap, doneRowsSnap) wg.Wait() os.Exit(2) } wg.Wait() - log.Printf("DONE. Processed %d keys, %d rows", doneKeys, doneRows) + log.Infof("DONE. Processed %d keys, %d rows", doneKeys, doneRows) } diff --git a/docs/tools.md b/docs/tools.md index b38061d846..2a68d0e351 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -528,7 +528,7 @@ Flags: -start-timestamp int timestamp at which to start, defaults to 0 -threads int - number of workers to use to process data (default 1) + number of workers to use to process data (default 10) -verbose show every record being processed ```