Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit ff60085

Browse files
committed
backwards compatible cassandra timeout
1 parent 63a741e commit ff60085

File tree

9 files changed

+55
-22
lines changed

9 files changed

+55
-22
lines changed

cmd/mt-split-metrics-by-ttl/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func main() {
2323
flag.StringVar(&storeConfig.Keyspace, "cassandra-keyspace", storeConfig.Keyspace, "cassandra keyspace to use for storing the metric data table")
2424
flag.StringVar(&storeConfig.Consistency, "cassandra-consistency", storeConfig.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
2525
flag.StringVar(&storeConfig.HostSelectionPolicy, "cassandra-host-selection-policy", storeConfig.HostSelectionPolicy, "")
26-
flag.DurationVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
26+
flag.StringVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
2727
flag.IntVar(&storeConfig.Retries, "cassandra-retries", storeConfig.Retries, "how many times to retry a query before failing it")
2828
flag.IntVar(&storeConfig.CqlProtocolVersion, "cql-protocol-version", storeConfig.CqlProtocolVersion, "cql protocol version to use")
2929
flag.BoolVar(&storeConfig.DisableInitialHostLookup, "cassandra-disable-initial-host-lookup", storeConfig.DisableInitialHostLookup, "instruct the driver to not attempt to get host info from the system.peers table")

cmd/mt-store-cat/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func main() {
4545
flag.StringVar(&storeConfig.Keyspace, "cassandra-keyspace", storeConfig.Keyspace, "cassandra keyspace to use for storing the metric data table")
4646
flag.StringVar(&storeConfig.Consistency, "cassandra-consistency", storeConfig.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
4747
flag.StringVar(&storeConfig.HostSelectionPolicy, "cassandra-host-selection-policy", storeConfig.HostSelectionPolicy, "")
48-
flag.DurationVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
48+
flag.StringVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
4949
flag.IntVar(&storeConfig.ReadConcurrency, "cassandra-read-concurrency", storeConfig.ReadConcurrency, "max number of concurrent reads to cassandra.")
5050
//flag.IntVar(&storeConfig.WriteConcurrency, "write-concurrency", storeConfig.WriteConcurrency, "max number of concurrent writes to cassandra.")
5151
flag.IntVar(&storeConfig.ReadQueueSize, "cassandra-read-queue-size", storeConfig.ReadQueueSize, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")

cmd/mt-store-cp-experimental/main.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/gocql/gocql"
16+
"github.com/grafana/metrictank/store/cassandra"
1617
hostpool "github.com/hailocab/go-hostpool"
1718
)
1819

@@ -25,7 +26,7 @@ var (
2526
cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table")
2627
cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
2728
cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
28-
cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
29+
cassandraTimeout = flag.String("cassandra-timeout", "1s", "cassandra timeout")
2930
cassandraConcurrency = flag.Int("cassandra-concurrency", 20, "max number of concurrent reads to cassandra.")
3031
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
3132
cassandraDisableHostLookup = flag.Bool("cassandra-disable-host-lookup", false, "disable host lookup (useful if going through proxy)")
@@ -119,7 +120,7 @@ func NewCassandraStore(cassandraAddrs *string) (*gocql.Session, error) {
119120
}
120121
cluster.DisableInitialHostLookup = *cassandraDisableHostLookup
121122
cluster.Consistency = gocql.ParseConsistency(*cassandraConsistency)
122-
cluster.Timeout = time.Duration(*cassandraTimeout) * time.Millisecond
123+
cluster.Timeout = cassandra.ConvertTimeout(*cassandraTimeout)
123124
cluster.NumConns = *cassandraConcurrency
124125
cluster.ProtoVersion = *cqlProtocolVersion
125126
cluster.Keyspace = *cassandraKeyspace

cmd/mt-update-ttl/main.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/gocql/gocql"
15+
"github.com/grafana/metrictank/store/cassandra"
1516
hostpool "github.com/hailocab/go-hostpool"
1617
"github.com/raintank/dur"
1718
)
@@ -23,7 +24,7 @@ var (
2324
cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table")
2425
cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
2526
cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
26-
cassandraTimeout = flag.Duration("cassandra-timeout", time.Second, "cassandra timeout")
27+
cassandraTimeout = flag.String("cassandra-timeout", "1s", "cassandra timeout")
2728
cassandraConcurrency = flag.Int("cassandra-concurrency", 20, "max number of concurrent reads to cassandra.")
2829
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
2930
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
@@ -97,7 +98,7 @@ func NewCassandraStore() (*gocql.Session, error) {
9798
}
9899
}
99100
cluster.Consistency = gocql.ParseConsistency(*cassandraConsistency)
100-
cluster.Timeout = time.Duration(*cassandraTimeout) * time.Millisecond
101+
cluster.Timeout = cassandra.ConvertTimeout(*cassandraTimeout)
101102
cluster.NumConns = *cassandraConcurrency
102103
cluster.ProtoVersion = *cqlProtocolVersion
103104
cluster.Keyspace = *cassandraKeyspace

cmd/mt-whisper-importer-writer/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func main() {
8989
globalFlags.StringVar(&storeConfig.Keyspace, "cassandra-keyspace", storeConfig.Keyspace, "cassandra keyspace to use for storing the metric data table")
9090
globalFlags.StringVar(&storeConfig.Consistency, "cassandra-consistency", storeConfig.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
9191
globalFlags.StringVar(&storeConfig.HostSelectionPolicy, "cassandra-host-selection-policy", storeConfig.HostSelectionPolicy, "")
92-
globalFlags.DurationVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
92+
globalFlags.StringVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
9393
globalFlags.IntVar(&storeConfig.ReadConcurrency, "cassandra-read-concurrency", storeConfig.ReadConcurrency, "max number of concurrent reads to cassandra.")
9494
globalFlags.IntVar(&storeConfig.WriteConcurrency, "cassandra-write-concurrency", storeConfig.WriteConcurrency, "max number of concurrent writes to cassandra.")
9595
globalFlags.IntVar(&storeConfig.ReadQueueSize, "cassandra-read-queue-size", storeConfig.ReadQueueSize, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")

docs/tools.md

+10-10
Original file line numberDiff line numberDiff line change
@@ -298,8 +298,8 @@ Flags:
298298
how many times to retry a query before failing it
299299
-cassandra-ssl
300300
enable SSL connection to cassandra
301-
-cassandra-timeout duration
302-
cassandra timeout (default 1s)
301+
-cassandra-timeout string
302+
cassandra timeout (default "1s")
303303
-cassandra-username string
304304
username for authentication (default "cassandra")
305305
-cql-protocol-version int
@@ -364,8 +364,8 @@ Flags:
364364
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-store-cassandra.toml")
365365
-cassandra-ssl
366366
enable SSL connection to cassandra
367-
-cassandra-timeout duration
368-
cassandra timeout (default 1s)
367+
-cassandra-timeout string
368+
cassandra timeout (default "1s")
369369
-cassandra-username string
370370
username for authentication (default "cassandra")
371371
-config string
@@ -431,8 +431,8 @@ Flags:
431431
how many times to retry a query before failing it
432432
-cassandra-ssl
433433
enable SSL connection to cassandra
434-
-cassandra-timeout duration
435-
cassandra timeout (default 1s)
434+
-cassandra-timeout string
435+
cassandra timeout (default "1s")
436436
-cassandra-username string
437437
username for authentication (default "cassandra")
438438
-cql-protocol-version int
@@ -498,8 +498,8 @@ Flags:
498498
how many times to retry a query before failing it
499499
-cassandra-ssl
500500
enable SSL connection to cassandra
501-
-cassandra-timeout int
502-
cassandra timeout in milliseconds (default 1000)
501+
-cassandra-timeout string
502+
cassandra timeout (default "1s")
503503
-cassandra-username string
504504
username for authentication (default "cassandra")
505505
-cql-protocol-version int
@@ -610,8 +610,8 @@ global config flags:
610610
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-store-cassandra.toml")
611611
-cassandra-ssl
612612
enable SSL connection to cassandra
613-
-cassandra-timeout duration
614-
cassandra timeout (default 1s)
613+
-cassandra-timeout string
614+
cassandra timeout (default "1s")
615615
-cassandra-username string
616616
username for authentication (default "cassandra")
617617
-cassandra-window-factor int

store/cassandra/cassandra.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,22 @@ func PrepareChunkData(span uint32, data []byte) []byte {
109109
return buf.Bytes()
110110
}
111111

112+
// the timeout value used to be an integer specifying the number of milliseconds
113+
// we want to convert it to a duration string, but we need to stay backwards-compatible for now
114+
func ConvertTimeout(timeout string) time.Duration {
115+
if timeoutI, err := strconv.Atoi(timeout); err == nil {
116+
log.Warn("cassandra_store: specifying the timeout as integer is deprecated, please use a duration value")
117+
return time.Duration(timeoutI) * time.Millisecond
118+
} else {
119+
timeoutD, err := time.ParseDuration(timeout)
120+
if err == nil {
121+
return timeoutD
122+
}
123+
log.Warn("cassandra_store: invalid duration value %s, assuming default (1s)", timeout)
124+
return time.Second
125+
}
126+
}
127+
112128
func NewCassandraStore(config *StoreConfig, ttls []uint32) (*CassandraStore, error) {
113129
stats.NewGauge32("store.cassandra.write_queue.size").Set(config.WriteQueueSize)
114130
stats.NewGauge32("store.cassandra.num_writers").Set(config.WriteConcurrency)
@@ -126,8 +142,9 @@ func NewCassandraStore(config *StoreConfig, ttls []uint32) (*CassandraStore, err
126142
Password: config.Password,
127143
}
128144
}
145+
146+
cluster.Timeout = ConvertTimeout(config.Timeout)
129147
cluster.Consistency = gocql.ParseConsistency(config.Consistency)
130-
cluster.Timeout = config.Timeout
131148
cluster.ConnectTimeout = cluster.Timeout
132149
cluster.NumConns = config.WriteConcurrency
133150
cluster.ProtoVersion = config.CqlProtocolVersion

store/cassandra/cassandra_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"fmt"
55
"math"
66
"testing"
7+
"time"
78
)
89

910
type testCase struct {
@@ -60,3 +61,17 @@ func TestGetTTLTables(t *testing.T) {
6061
}
6162
}
6263
}
64+
65+
func TestBackwardsCompatibleTimeout(t *testing.T) {
66+
checkTimeout := func(input string, expected time.Duration) {
67+
timeoutD := ConvertTimeout(input)
68+
if timeoutD != expected {
69+
t.Fatalf("expected time %s but got %s from input %s", expected.String(), timeoutD.String(), input)
70+
}
71+
}
72+
73+
checkTimeout("3500", time.Duration(3500)*time.Millisecond)
74+
checkTimeout("3500ms", time.Duration(3500)*time.Millisecond)
75+
checkTimeout("3.5s", time.Duration(3500)*time.Millisecond)
76+
checkTimeout("nonsense", time.Second)
77+
}

store/cassandra/config.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package cassandra
22

33
import (
44
"flag"
5-
"time"
65

76
"github.com/rakyll/globalconf"
87
)
@@ -12,7 +11,7 @@ type StoreConfig struct {
1211
Keyspace string
1312
Consistency string
1413
HostSelectionPolicy string
15-
Timeout time.Duration
14+
Timeout string
1615
ReadConcurrency int
1716
WriteConcurrency int
1817
ReadQueueSize int
@@ -39,7 +38,7 @@ func NewStoreConfig() *StoreConfig {
3938
Keyspace: "metrictank",
4039
Consistency: "one",
4140
HostSelectionPolicy: "tokenaware,hostpool-epsilon-greedy",
42-
Timeout: time.Second,
41+
Timeout: "1s",
4342
ReadConcurrency: 20,
4443
WriteConcurrency: 10,
4544
ReadQueueSize: 200000,
@@ -68,7 +67,7 @@ func ConfigSetup() *flag.FlagSet {
6867
cas.StringVar(&CliConfig.Keyspace, "keyspace", CliConfig.Keyspace, "cassandra keyspace to use for storing the metric data table")
6968
cas.StringVar(&CliConfig.Consistency, "consistency", CliConfig.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
7069
cas.StringVar(&CliConfig.HostSelectionPolicy, "host-selection-policy", CliConfig.HostSelectionPolicy, "")
71-
cas.DurationVar(&CliConfig.Timeout, "timeout", CliConfig.Timeout, "cassandra timeout")
70+
cas.StringVar(&CliConfig.Timeout, "timeout", CliConfig.Timeout, "cassandra timeout")
7271
cas.IntVar(&CliConfig.ReadConcurrency, "read-concurrency", CliConfig.ReadConcurrency, "max number of concurrent reads to cassandra.")
7372
cas.IntVar(&CliConfig.WriteConcurrency, "write-concurrency", CliConfig.WriteConcurrency, "max number of concurrent writes to cassandra.")
7473
cas.IntVar(&CliConfig.ReadQueueSize, "read-queue-size", CliConfig.ReadQueueSize, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")

0 commit comments

Comments
 (0)