Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

add catastrophe recovery for cassandra #1579

Merged
merged 10 commits into from
Jan 17, 2020
Next Next commit
add catastrophe recovery to cassandra store
robert-milan committed Jan 14, 2020
commit ab690ddc7242a403fdfdb88b557c2267dd1bd2bc
114 changes: 89 additions & 25 deletions store/cassandra/cassandra.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/grafana/metrictank/schema"
@@ -78,13 +79,17 @@ type ChunkReadRequest struct {

type CassandraStore struct {
Session *gocql.Session
Cluster *gocql.ClusterConfig
config *StoreConfig
writeQueues []chan *mdata.ChunkWriteRequest
writeQueueMeters []*stats.Range32
readQueue chan *ChunkReadRequest
TTLTables TTLTables
omitReadTimeout time.Duration
tracer opentracing.Tracer
timeout time.Duration
shutdown chan struct{}
wg sync.WaitGroup
sessionLock sync.RWMutex
}

// ConvertTimeout provides backwards compatibility for values that used to be specified as integers,
@@ -222,25 +227,32 @@ func NewCassandraStore(config *StoreConfig, ttls []uint32) (*CassandraStore, err
log.Debugf("CS: created session with config %+v", config)
c := &CassandraStore{
Session: session,
Cluster: cluster,
writeQueues: make([]chan *mdata.ChunkWriteRequest, config.WriteConcurrency),
writeQueueMeters: make([]*stats.Range32, config.WriteConcurrency),
readQueue: make(chan *ChunkReadRequest, config.ReadQueueSize),
omitReadTimeout: ConvertTimeout(config.OmitReadTimeout, time.Second),
TTLTables: ttlTables,
tracer: opentracing.NoopTracer{},
timeout: cluster.Timeout,
shutdown: make(chan struct{}),
config: config,
}

for i := 0; i < config.WriteConcurrency; i++ {
c.writeQueues[i] = make(chan *mdata.ChunkWriteRequest, config.WriteQueueSize)
c.writeQueueMeters[i] = stats.NewRange32(fmt.Sprintf("store.cassandra.write_queue.%d.items", i+1))
c.wg.Add(1)
go c.processWriteQueue(c.writeQueues[i], c.writeQueueMeters[i])
}

for i := 0; i < config.ReadConcurrency; i++ {
c.wg.Add(1)
go c.processReadQueue()
}

c.wg.Add(1)
go c.deadConnectionCheck()

return c, err
}

@@ -254,6 +266,8 @@ func NewCassandraStore(config *StoreConfig, ttls []uint32) (*CassandraStore, err
// so remember the TTL might have been up to twice as much
func (c *CassandraStore) FindExistingTables(keyspace string) error {

c.sessionLock.RLock()
defer c.sessionLock.RUnlock()
meta, err := c.Session.KeyspaceMetadata(keyspace)
if err != nil {
return err
@@ -301,6 +315,8 @@ func (c *CassandraStore) Add(cwr *mdata.ChunkWriteRequest) {
/* process writeQueue.
*/
func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest, meter *stats.Range32) {
defer c.wg.Done()

tick := time.Tick(time.Duration(1) * time.Second)
for {
select {
@@ -340,6 +356,9 @@ func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest,
attempts++
}
}
case <-c.shutdown:
log.Info("cassandra_store: received shutdown, exiting processWriteQueue")
return
}
}
}
@@ -382,7 +401,9 @@ func (c *CassandraStore) insertChunk(key string, t0, ttl uint32, data []byte) er

row_key := fmt.Sprintf("%s_%d", key, t0/Month_sec) // "month number" based on unix timestamp (rounded down)
pre := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
ctx, cancel := context.WithTimeout(context.Background(), c.Cluster.Timeout)
c.sessionLock.RLock()
defer c.sessionLock.RUnlock()
ret := c.Session.Query(table.QueryWrite, row_key, t0, data, uint32(relativeTtl)).WithContext(ctx).Exec()
cancel()
cassPutExecDuration.Value(time.Now().Sub(pre))
@@ -395,30 +416,69 @@ type readResult struct {
}

func (c *CassandraStore) processReadQueue() {
for crr := range c.readQueue {
// check to see if the request has been canceled, if so abort now.
defer c.wg.Done()

for {
select {
case <-crr.ctx.Done():
//request canceled
crr.out <- readResult{err: errCtxCanceled}
continue
default:
}
waitDuration := time.Since(crr.timestamp)
cassGetWaitDuration.Value(waitDuration)
if waitDuration > c.omitReadTimeout {
cassOmitOldRead.Inc()
crr.out <- readResult{err: errReadTooOld}
continue
case crr := <-c.readQueue:
// check to see if the request has been canceled, if so abort now.
select {
case <-crr.ctx.Done():
//request canceled
crr.out <- readResult{err: errCtxCanceled}
continue
default:
}
waitDuration := time.Since(crr.timestamp)
cassGetWaitDuration.Value(waitDuration)
if waitDuration > c.omitReadTimeout {
cassOmitOldRead.Inc()
crr.out <- readResult{err: errReadTooOld}
continue
}

pre := time.Now()
iter := readResult{
i: c.Session.Query(crr.q, crr.p...).WithContext(crr.ctx).Iter(),
err: nil,
}
cassGetExecDuration.Value(time.Since(pre))
crr.out <- iter
case <-c.shutdown:
log.Info("cassandra_store: received shutdown, exiting processReadQueue")
return
}
}
}

func (c *CassandraStore) deadConnectionCheck() {
defer c.wg.Done()

pre := time.Now()
iter := readResult{
i: c.Session.Query(crr.q, crr.p...).WithContext(crr.ctx).Iter(),
err: nil,
ticker := time.NewTicker(time.Second * 5)
attempts := 0
for {
select {
case <-ticker.C:
err := c.Session.Query("SELECT cql_version FROM system.local").Exec()
if err != nil {
attempts++
log.Errorf("cassandra_store: could not execute connection check query for %d attempts: %v", attempts, err)
}
if attempts >= 6 {
attempts = 0
log.Errorf("cassandra_store: creating new session to cassandra using hosts: %v", c.config.Addrs)
c.Cluster.Hosts = strings.Split(c.config.Addrs, ",")
c.Session.Close()
c.Session, err = c.Cluster.CreateSession()
if err != nil {
log.Errorf("cassandra_store: error while attempting to recreate cassandra session. will retry after %d seconds: %v", time.Second*30, err)
attempts++
}
}
case <-c.shutdown:
log.Info("cassandra_store: received shutdown, exiting deadConnectionCheck")
return
}
cassGetExecDuration.Value(time.Since(pre))
crr.out <- iter
}
}

@@ -482,7 +542,7 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl
endMonth := (end - 1) / Month_sec // ending row has to include the last point we might need (end-1)
rowKeys := make([]string, endMonth-startMonth+1)
i := 0
for num := startMonth; num <= endMonth; num += 1 {
for num := startMonth; num <= endMonth; num++ {
rowKeys[i] = fmt.Sprintf("%s_%d", key, num)
i++
}
@@ -562,5 +622,9 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl
}

func (c *CassandraStore) Stop() {
c.Session.Close()
close(c.shutdown)
c.wg.Wait()
if c.Session != nil && !c.Session.Closed() {
c.Session.Close()
}
}