Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 045e2f9

Browse files
authored
Merge pull request #1579 from grafana/recover-from-cassandra-catastrophe
add catastrophe recovery for cassandra
2 parents 069ffcc + 438c027 commit 045e2f9

File tree

18 files changed

+503
-162
lines changed

18 files changed

+503
-162
lines changed

cassandra/cassandra_session.go

+153
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package cassandra
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"time"
7+
8+
"github.com/gocql/gocql"
9+
log "github.com/sirupsen/logrus"
10+
)
11+
12+
// Session stores a connection to Cassandra along with associated configurations
13+
type Session struct {
14+
wg sync.WaitGroup
15+
session *gocql.Session
16+
cluster *gocql.ClusterConfig
17+
shutdown chan struct{}
18+
connectionCheckTimeout time.Duration
19+
connectionCheckInterval time.Duration
20+
addrs string
21+
logPrefix string
22+
sync.RWMutex
23+
}
24+
25+
// NewSession creates and returns a Session. Upon failure it will return nil and an error.
26+
func NewSession(clusterConfig *gocql.ClusterConfig,
27+
timeout time.Duration,
28+
interval time.Duration,
29+
addrs string,
30+
logPrefix string) (*Session, error) {
31+
if clusterConfig == nil {
32+
log.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
33+
return nil, fmt.Errorf("cassandra.NewSession received nil pointer for ClusterConfig")
34+
}
35+
36+
session, err := clusterConfig.CreateSession()
37+
if err != nil {
38+
log.Errorf("cassandra.NewSession failed to create session: %v", err)
39+
return nil, err
40+
}
41+
42+
cs := &Session{
43+
session: session,
44+
cluster: clusterConfig,
45+
shutdown: make(chan struct{}),
46+
connectionCheckTimeout: timeout,
47+
connectionCheckInterval: interval,
48+
addrs: addrs,
49+
logPrefix: logPrefix,
50+
}
51+
52+
if cs.connectionCheckInterval > 0 {
53+
cs.wg.Add(1)
54+
go cs.deadConnectionRefresh()
55+
}
56+
57+
return cs, nil
58+
59+
}
60+
61+
func (s *Session) Stop() {
62+
close(s.shutdown)
63+
s.wg.Wait()
64+
}
65+
66+
// deadConnectionRefresh will run a query using the current Cassandra session every connectionCheckInterval
67+
// if it cannot query Cassandra for longer than connectionCheckTimeout it will create a new session
68+
//
69+
// We implemented this due to an issue in gocql (https://github.com/gocql/gocql/issues/831). Once that issue is resolved
70+
// we should be able to get rid of this code.
71+
func (s *Session) deadConnectionRefresh() {
72+
defer s.wg.Done()
73+
74+
log.Infof("%s: dead connection check enabled with an interval of %s", s.logPrefix, s.connectionCheckInterval.String())
75+
76+
ticker := time.NewTicker(s.connectionCheckInterval)
77+
var totaltime time.Duration
78+
var err error
79+
var oldSession *gocql.Session
80+
81+
OUTER:
82+
for {
83+
// connection to Cassandra has been down for longer than the configured timeout
84+
if totaltime >= s.connectionCheckTimeout {
85+
s.Lock()
86+
start := time.Now()
87+
for {
88+
select {
89+
case <-s.shutdown:
90+
log.Infof("%s: received shutdown, exiting deadConnectionRefresh", s.logPrefix)
91+
if s.session != nil && !s.session.Closed() {
92+
s.session.Close()
93+
}
94+
// make sure we unlock the sessionLock before returning
95+
s.Unlock()
96+
return
97+
default:
98+
log.Errorf("%s: creating new session to cassandra using hosts: %v", s.logPrefix, s.addrs)
99+
if s.session != nil && !s.session.Closed() && oldSession == nil {
100+
oldSession = s.session
101+
}
102+
s.session, err = s.cluster.CreateSession()
103+
if err != nil {
104+
log.Errorf("%s: error while attempting to recreate cassandra session. will retry after %v: %v", s.logPrefix, s.connectionCheckInterval.String(), err)
105+
time.Sleep(s.connectionCheckInterval)
106+
totaltime += s.connectionCheckInterval
107+
// continue inner loop to attempt to reconnect
108+
continue
109+
}
110+
s.Unlock()
111+
log.Errorf("%s: reconnecting to cassandra took %s", s.logPrefix, time.Since(start).String())
112+
totaltime = 0
113+
if oldSession != nil {
114+
oldSession.Close()
115+
oldSession = nil
116+
}
117+
// we connected, so go back to the normal outer loop
118+
continue OUTER
119+
}
120+
}
121+
}
122+
123+
select {
124+
case <-s.shutdown:
125+
log.Infof("%s: received shutdown, exiting deadConnectionRefresh", s.logPrefix)
126+
if s.session != nil && !s.session.Closed() {
127+
s.session.Close()
128+
}
129+
return
130+
case <-ticker.C:
131+
s.RLock()
132+
// this query should work on all cassandra deployments, but we may need to revisit this
133+
err = s.session.Query("SELECT cql_version FROM system.local").Exec()
134+
s.RUnlock()
135+
if err == nil {
136+
totaltime = 0
137+
} else {
138+
totaltime += s.connectionCheckInterval
139+
log.Errorf("%s: could not execute connection check query for %v: %v", s.logPrefix, totaltime.String(), err)
140+
}
141+
}
142+
}
143+
}
144+
145+
// CurrentSession retrieves the current active Cassandra session
146+
//
147+
// If the connection to Cassandra is down, this will block until it can be restored
148+
func (s *Session) CurrentSession() *gocql.Session {
149+
s.RLock()
150+
session := s.session
151+
s.RUnlock()
152+
return session
153+
}

cmd/mt-store-cat/chunk.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,16 @@ func printChunkSummary(ctx context.Context, store *cassandra.CassandraStore, tab
2222
fmt.Println("## Table", tbl.Name)
2323
if len(metrics) == 0 {
2424
query := fmt.Sprintf("select key, ttl(data) from %s", tbl.Name)
25-
iter := store.Session.Query(query).Iter()
25+
session := store.Session.CurrentSession()
26+
iter := session.Query(query).Iter()
2627
showKeyTTL(iter, groupTTL)
2728
} else {
2829
for _, metric := range metrics {
2930
for num := startMonth; num <= endMonth; num += 1 {
3031
row_key := fmt.Sprintf("%s_%d", metric.AMKey.String(), num)
3132
query := fmt.Sprintf("select key, ttl(data) from %s where key=?", tbl.Name)
32-
iter := store.Session.Query(query, row_key).Iter()
33+
session := store.Session.CurrentSession()
34+
iter := session.Query(query, row_key).Iter()
3335
showKeyTTL(iter, groupTTL)
3436
}
3537
}
@@ -54,7 +56,8 @@ func printChunkCsv(ctx context.Context, store *cassandra.CassandraStore, table c
5456
i++
5557
}
5658
params := []interface{}{rowKeys, end}
57-
iter := store.Session.Query(query, params...).WithContext(ctx).Iter()
59+
session := store.Session.CurrentSession()
60+
iter := session.Query(query, params...).WithContext(ctx).Iter()
5861
var key string
5962
var ts int
6063
var b []byte

cmd/mt-store-cat/metrics.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ func match(prefix, substr, glob string, metric Metric) bool {
5252
// getMetrics lists all metrics from the store matching the given condition.
5353
func getMetrics(idx *cassandra.CasIdx, prefix, substr, glob string, archive schema.Archive) ([]Metric, error) {
5454
var metrics []Metric
55-
iter := idx.Session.Query(fmt.Sprintf("select id, name from %s", idx.Config.Table)).Iter()
55+
session := idx.Session.CurrentSession()
56+
iter := session.Query(fmt.Sprintf("select id, name from %s", idx.Config.Table)).Iter()
5657
var m Metric
5758
var idString string
5859
for iter.Scan(&idString, &m.name) {
@@ -80,7 +81,8 @@ func getMetrics(idx *cassandra.CasIdx, prefix, substr, glob string, archive sche
8081
func getMetric(idx *cassandra.CasIdx, amkey schema.AMKey) ([]Metric, error) {
8182
var metrics []Metric
8283
// index only stores MKey's, not AMKey's.
83-
iter := idx.Session.Query(fmt.Sprintf("select name from %s where id=? ALLOW FILTERING", idx.Config.Table), amkey.MKey.String()).Iter()
84+
session := idx.Session.CurrentSession()
85+
iter := session.Query(fmt.Sprintf("select name from %s where id=? ALLOW FILTERING", idx.Config.Table), amkey.MKey.String()).Iter()
8486

8587
var m Metric
8688
for iter.Scan(&m.name) {

cmd/mt-update-ttl/main.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
134134
queryTpl := fmt.Sprintf("SELECT token(key), ts, data FROM %s where key=? AND ts>=? AND ts<?", tableIn)
135135

136136
for key := range jobs {
137-
iter := store.Session.Query(queryTpl, key, startTime, endTime).Iter()
137+
session := store.Session.CurrentSession()
138+
iter := session.Query(queryTpl, key, startTime, endTime).Iter()
138139
for iter.Scan(&token, &ts, &data) {
139140
newTTL := getTTL(int(time.Now().Unix()), ts, ttlOut)
140141
if tableIn == tableOut {
@@ -146,7 +147,7 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
146147
log.Infof("id=%d processing rownum=%d table=%q key=%q ts=%d query=%q data='%x'\n", id, atomic.LoadUint64(&doneRows)+1, tableIn, key, ts, query, data)
147148
}
148149

149-
err := store.Session.Query(query, data, key, ts).Exec()
150+
err := session.Query(query, data, key, ts).Exec()
150151
if err != nil {
151152
log.Errorf("id=%d failed updating %s %s %d: %q", id, tableOut, key, ts, err)
152153
}
@@ -178,7 +179,8 @@ func worker(id int, jobs <-chan string, wg *sync.WaitGroup, store *cassandra.Cas
178179

179180
func update(store *cassandra.CassandraStore, ttlOut int, tableIn, tableOut string) {
180181

181-
keyItr := store.Session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()
182+
session := store.Session.CurrentSession()
183+
keyItr := session.Query(fmt.Sprintf("SELECT distinct key FROM %s", tableIn)).Iter()
182184

183185
jobs := make(chan string, 100)
184186

docker/docker-chaos/metrictank.ini

+8
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ username = cassandra
146146
password = cassandra
147147
# instruct the driver to not attempt to get host info from the system.peers table
148148
disable-initial-host-lookup = false
149+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
150+
connection-check-interval = 5s
151+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
152+
connection-check-timeout = 30s
149153

150154
## Bigtable backend Store Settings ##
151155
[bigtable-store]
@@ -444,6 +448,10 @@ create-keyspace = false
444448
schema-file = /etc/metrictank/schema-idx-cassandra.toml
445449
# instruct the driver to not attempt to get host info from the system.peers table
446450
disable-initial-host-lookup = false
451+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
452+
connection-check-interval = 5s
453+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
454+
connection-check-timeout = 30s
447455

448456
### in-memory only
449457
[memory-idx]

docker/docker-cluster-query/metrictank.ini

+8
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ username = cassandra
146146
password = cassandra
147147
# instruct the driver to not attempt to get host info from the system.peers table
148148
disable-initial-host-lookup = false
149+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
150+
connection-check-interval = 5s
151+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
152+
connection-check-timeout = 30s
149153

150154
## Bigtable backend Store Settings ##
151155
[bigtable-store]
@@ -444,6 +448,10 @@ create-keyspace = false
444448
schema-file = /etc/metrictank/schema-idx-cassandra.toml
445449
# instruct the driver to not attempt to get host info from the system.peers table
446450
disable-initial-host-lookup = false
451+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
452+
connection-check-interval = 5s
453+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
454+
connection-check-timeout = 30s
447455

448456
### in-memory only
449457
[memory-idx]

docker/docker-cluster/metrictank.ini

+8
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ username = cassandra
146146
password = cassandra
147147
# instruct the driver to not attempt to get host info from the system.peers table
148148
disable-initial-host-lookup = false
149+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
150+
connection-check-interval = 5s
151+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
152+
connection-check-timeout = 30s
149153

150154
## Bigtable backend Store Settings ##
151155
[bigtable-store]
@@ -444,6 +448,10 @@ create-keyspace = false
444448
schema-file = /etc/metrictank/schema-idx-cassandra.toml
445449
# instruct the driver to not attempt to get host info from the system.peers table
446450
disable-initial-host-lookup = false
451+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
452+
connection-check-interval = 5s
453+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
454+
connection-check-timeout = 30s
447455

448456
### in-memory only
449457
[memory-idx]

docker/docker-dev-custom-cfg-kafka/metrictank.ini

+8
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,10 @@ username = cassandra
146146
password = cassandra
147147
# instruct the driver to not attempt to get host info from the system.peers table
148148
disable-initial-host-lookup = false
149+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
150+
connection-check-interval = 5s
151+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
152+
connection-check-timeout = 30s
149153

150154
## Bigtable backend Store Settings ##
151155
[bigtable-store]
@@ -444,6 +448,10 @@ create-keyspace = true
444448
schema-file = /etc/metrictank/schema-idx-cassandra.toml
445449
# instruct the driver to not attempt to get host info from the system.peers table
446450
disable-initial-host-lookup = false
451+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
452+
connection-check-interval = 5s
453+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
454+
connection-check-timeout = 30s
447455

448456
### in-memory only
449457
[memory-idx]

docs/config.md

+8
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,10 @@ username = cassandra
180180
password = cassandra
181181
# instruct the driver to not attempt to get host info from the system.peers table
182182
disable-initial-host-lookup = false
183+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
184+
connection-check-interval = 5s
185+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
186+
connection-check-timeout = 30s
183187
```
184188

185189
## Bigtable backend Store Settings ##
@@ -515,6 +519,10 @@ create-keyspace = true
515519
schema-file = /etc/metrictank/schema-idx-cassandra.toml
516520
# instruct the driver to not attempt to get host info from the system.peers table
517521
disable-initial-host-lookup = false
522+
# interval at which to perform a connection check to cassandra, set to 0 to disable.
523+
connection-check-interval = 5s
524+
# maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval.
525+
connection-check-timeout = 30s
518526
```
519527

520528
### in-memory only

docs/tools.md

+8
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,10 @@ cass config flags:
126126
enable cassandra user authentication
127127
-ca-path string
128128
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
129+
-connection-check-interval duration
130+
interval at which to perform a connection check to cassandra, set to 0 to disable. (default 5s)
131+
-connection-check-timeout duration
132+
maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval. (default 30s)
129133
-consistency string
130134
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
131135
-create-keyspace
@@ -277,6 +281,10 @@ cass config flags:
277281
enable cassandra user authentication
278282
-ca-path string
279283
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
284+
-connection-check-interval duration
285+
interval at which to perform a connection check to cassandra, set to 0 to disable. (default 5s)
286+
-connection-check-timeout duration
287+
maximum total time to wait before considering a connection to cassandra invalid. This value should be higher than connection-check-interval. (default 30s)
280288
-consistency string
281289
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
282290
-create-keyspace

0 commit comments

Comments
 (0)