Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 145d324

Browse files
committed
add locks
1 parent 8798480 commit 145d324

File tree

1 file changed

+42
-14
lines changed

1 file changed

+42
-14
lines changed

store/cassandra/cassandra.go

+42-14
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,9 @@ func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest,
325325
case cwr := <-queue:
326326
meter.Value(len(queue))
327327
keyStr := cwr.Key.String()
328-
log.Debugf("CS: starting to save %s:%d %v", keyStr, cwr.T0, cwr.Data)
328+
if log.IsLevelEnabled(log.DebugLevel) {
329+
log.Debugf("cassandra_store: starting to save %s:%d %v", keyStr, cwr.T0, cwr.Data)
330+
}
329331
//log how long the chunk waited in the queue before we attempted to save to cassandra
330332
cassPutWaitDuration.Value(time.Now().Sub(cwr.Timestamp))
331333

@@ -340,12 +342,14 @@ func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest,
340342
if cwr.Callback != nil {
341343
cwr.Callback()
342344
}
343-
log.Debugf("CS: save complete. %s:%d %v", keyStr, cwr.T0, cwr.Data)
345+
if log.IsLevelEnabled(log.DebugLevel) {
346+
log.Debugf("cassandra_store: save complete. %s:%d %v", keyStr, cwr.T0, cwr.Data)
347+
}
344348
chunkSaveOk.Inc()
345349
} else {
346350
errmetrics.Inc(err)
347351
if (attempts % 20) == 0 {
348-
log.Warnf("CS: failed to save chunk to cassandra after %d attempts. %v, %s", attempts+1, cwr.Data, err)
352+
log.Warnf("cassandra_store: failed to save chunk to cassandra after %d attempts. %v, %s", attempts+1, cwr.Data, err)
349353
}
350354
chunkSaveFail.Inc()
351355
sleepTime := 100 * attempts
@@ -358,6 +362,9 @@ func (c *CassandraStore) processWriteQueue(queue chan *mdata.ChunkWriteRequest,
358362
}
359363
case <-c.shutdown:
360364
log.Info("cassandra_store: received shutdown, exiting processWriteQueue")
365+
if c.Session != nil && !c.Session.Closed() {
366+
c.Session.Close()
367+
}
361368
return
362369
}
363370
}
@@ -446,6 +453,9 @@ func (c *CassandraStore) processReadQueue() {
446453
crr.out <- iter
447454
case <-c.shutdown:
448455
log.Info("cassandra_store: received shutdown, exiting processReadQueue")
456+
if c.Session != nil && !c.Session.Closed() {
457+
c.Session.Close()
458+
}
449459
return
450460
}
451461
}
@@ -459,24 +469,42 @@ func (c *CassandraStore) deadConnectionCheck() {
459469
for {
460470
select {
461471
case <-ticker.C:
472+
c.sessionLock.RLock()
462473
err := c.Session.Query("SELECT cql_version FROM system.local").Exec()
463-
if err != nil {
474+
if err == nil {
475+
attempts = 0
476+
} else {
464477
attempts++
465478
log.Errorf("cassandra_store: could not execute connection check query for %d attempts: %v", attempts, err)
466479
}
480+
c.sessionLock.RUnlock()
481+
467482
if attempts >= 6 {
483+
success := false
468484
attempts = 0
469-
log.Errorf("cassandra_store: creating new session to cassandra using hosts: %v", c.config.Addrs)
470-
c.Cluster.Hosts = strings.Split(c.config.Addrs, ",")
471-
c.Session.Close()
472-
c.Session, err = c.Cluster.CreateSession()
473-
if err != nil {
474-
log.Errorf("cassandra_store: error while attempting to recreate cassandra session. will retry after %d seconds: %v", time.Second*30, err)
475-
attempts++
485+
c.sessionLock.Lock()
486+
for !success {
487+
log.Errorf("cassandra_store: creating new session to cassandra using hosts: %v", c.config.Addrs)
488+
// c.Cluster.Hosts = strings.Split(c.config.Addrs, ",")
489+
if c.Session != nil && !c.Session.Closed() {
490+
c.Session.Close()
491+
c.Session = nil
492+
}
493+
c.Session, err = c.Cluster.CreateSession()
494+
if err != nil {
495+
log.Errorf("cassandra_store: error while attempting to recreate cassandra session. will retry after 5 seconds: %v", err)
496+
time.Sleep(time.Second * 5)
497+
} else {
498+
success = true
499+
}
476500
}
501+
c.sessionLock.Unlock()
477502
}
478503
case <-c.shutdown:
479504
log.Info("cassandra_store: received shutdown, exiting deadConnectionCheck")
505+
if c.Session != nil && !c.Session.Closed() {
506+
c.Session.Close()
507+
}
480508
return
481509
}
482510
}
@@ -554,6 +582,9 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl
554582
ctx: ctx,
555583
}
556584

585+
c.sessionLock.RLock()
586+
defer c.sessionLock.RUnlock()
587+
557588
select {
558589
case <-ctx.Done():
559590
// request has been canceled, so no need to continue queuing reads.
@@ -624,7 +655,4 @@ func (c *CassandraStore) SearchTable(ctx context.Context, key schema.AMKey, tabl
624655
func (c *CassandraStore) Stop() {
625656
close(c.shutdown)
626657
c.wg.Wait()
627-
if c.Session != nil && !c.Session.Closed() {
628-
c.Session.Close()
629-
}
630658
}

0 commit comments

Comments
 (0)