Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 313dbd4

Browse files
committed
improve error handling, logging and reporting
- if writes to cassandra fail, just continue on to the next def - keep count of defs successfully archived.
1 parent 9ef83eb commit 313dbd4

File tree

2 files changed

+45
-11
lines changed

2 files changed

+45
-11
lines changed

cmd/mt-index-prune/main.go

+11-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,14 @@ type counters struct {
3232
total int
3333
active int
3434
deprecated int
35+
archived int
3536
}
3637

3738
func (c *counters) PrintCounters() {
3839
fmt.Println(fmt.Sprintf("Total analyzed defs: %d", c.total))
3940
fmt.Println(fmt.Sprintf("Active defs: %d", c.active))
4041
fmt.Println(fmt.Sprintf("Deprecated defs: %d", c.deprecated))
42+
fmt.Println(fmt.Sprintf("Archived defs: %d", c.archived))
4143
}
4244

4345
func main() {
@@ -68,7 +70,7 @@ func main() {
6870
fmt.Println()
6971
fmt.Println()
7072
fmt.Println("EXAMPLES:")
71-
fmt.Println("mt-index-prune --verbose --partition-count 128 cass -hosts cassandra:9042")
73+
fmt.Println("mt-index-prune --verbose --partition-from 0 --partition-to 8 cass -hosts cassandra:9042")
7274
}
7375

7476
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
@@ -123,7 +125,9 @@ func main() {
123125
defCounters := counters{}
124126
defs := make([]schema.MetricDefinition, 0)
125127
deprecatedDefs := make([]schema.MetricDefinition, 0)
128+
126129
for partition := partitionFrom; (partitionTo == -1 && partition == partitionFrom) || (partitionTo > 0 && partition < partitionTo); partition++ {
130+
log.Infof("starting to process partition %d", partition)
127131
defsByNameWithTags := make(map[string][]schema.MetricDefinition)
128132
defs = cassIdx.LoadPartitions([]int32{int32(partition)}, defs, now)
129133
defCounters.total += len(defs)
@@ -161,10 +165,15 @@ func main() {
161165
}
162166

163167
if noDryRun {
164-
err = cassIdx.ArchiveDefs(deprecatedDefs)
168+
count, err := cassIdx.ArchiveDefs(deprecatedDefs)
169+
log.Infof("archiving request complete. successful=%d", count)
170+
if count != len(deprecatedDefs) {
171+
log.Warnf("some defs failed to be archived. failed=%d", len(deprecatedDefs)-count)
172+
}
165173
if err != nil {
166174
log.Warnf("Failed to archive defs: %s", err.Error())
167175
}
176+
defCounters.archived += count
168177
}
169178

170179
defs = defs[:0]

idx/cassandra/cassandra.go

+34-9
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,17 @@ NAMES:
392392
return defs
393393
}
394394

395-
func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
395+
// ArchiveDefs writes each of the provided defs to the archive table and
396+
// then deletes the defs from the metric_idx table.
397+
func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) (int, error) {
396398
defChan := make(chan *schema.MetricDefinition, c.cfg.numConns)
397399
g, ctx := errgroup.WithContext(context.Background())
400+
401+
// keep track of how many defs were successfully archived.
402+
success := make([]int, c.cfg.numConns)
403+
398404
for i := 0; i < c.cfg.numConns; i++ {
405+
i := i
399406
g.Go(func() error {
400407
for {
401408
select {
@@ -405,13 +412,25 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
405412
}
406413
err := c.addDefToArchive(*def)
407414
if err != nil {
408-
return err
415+
// If we failed to add the def to the archive table then just continue on to the next def.
416+
// As we havnet yet removed the this def from the metric_idx table yet, the next time archiving
417+
// is performed the this def will be processed again. As no action is needed by an operator, we
418+
// just log this as a warning.
419+
log.Warnf("cassandra-idx: Failed add def to archive table. error=%s. def=%+v", err, *def)
420+
continue
409421
}
410422

411423
err = c.deleteDef(def.Id, def.Partition)
412424
if err != nil {
413-
return err
425+
// The next time archiving is performed this def will be processed again. Re-adding the def to the archive
426+
// table will just be treated like an update with only the archived_at field changing. As no action is needed
427+
// by an operator, we just log this as a warning.
428+
log.Warnf("cassandra-idx: Failed to remove archived def from metric_idx table. error=%s. def=%+v", err, *def)
429+
continue
414430
}
431+
432+
// increment counter of defs successfully archived
433+
success[i] = success[i] + 1
415434
case <-ctx.Done():
416435
return ctx.Err()
417436
}
@@ -422,11 +441,17 @@ func (c *CasIdx) ArchiveDefs(defs []schema.MetricDefinition) error {
422441
defChan <- &defs[i]
423442
}
424443
close(defChan)
425-
if err := g.Wait(); err != nil {
426-
return err
444+
445+
// wait for all goroutines to complete.
446+
err := g.Wait()
447+
448+
// get the count of defs successfully archived.
449+
total := 0
450+
for _, count := range success {
451+
total = total + count
427452
}
428453

429-
return nil
454+
return total, err
430455
}
431456

432457
func (c *CasIdx) processWriteQueue() {
@@ -513,9 +538,9 @@ func (c *CasIdx) addDefToArchive(def schema.MetricDefinition) error {
513538
return nil
514539
}
515540

516-
// log first failure and every 20th after that.
517-
if (attempts % 20) == 0 {
518-
log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. %s. the value was: %+v", err, def)
541+
// log first failure as a warning. If we reach max attempts, the error will bubble up to the caller.
542+
if attempts == 0 {
543+
log.Warnf("cassandra-idx: Failed to write def to cassandra. it will be retried. error=%s. def=%+v", err, def)
519544
}
520545
}
521546

0 commit comments

Comments
 (0)