Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 61fc5f9

Browse files
authoredMay 28, 2018
Merge pull request #922 from grafana/cosmosdb
update to work with cosmosdb
2 parents 1e16dca + 35a0843 commit 61fc5f9

File tree

18 files changed

+260
-46
lines changed

18 files changed

+260
-46
lines changed
 

‎cmd/mt-split-metrics-by-ttl/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ var (
3232
cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication")
3333
cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication")
3434

35+
cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
36+
3537
// hard coded to default because those have no effect in the case of this tool anyway
3638
windowFactor = 20
3739
cassandraOmitReadTimeout = 60
@@ -70,7 +72,7 @@ func main() {
7072
panic(fmt.Sprintf("Error creating directory: %s", err))
7173
}
7274

73-
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, ttls)
75+
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, ttls, *cassandraDisableInitialHostLookup)
7476
if err != nil {
7577
panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err))
7678
}

‎cmd/mt-store-cat/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ var (
5353
cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication")
5454
cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing")
5555

56+
cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
57+
5658
// our own flags
5759
from = flag.String("from", "-24h", "get data from (inclusive). only for points and points-summary format")
5860
to = flag.String("to", "now", "get data until (exclusive). only for points and points-summary format")
@@ -163,7 +165,7 @@ func main() {
163165
}
164166
}
165167

166-
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil)
168+
store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil, *cassandraDisableInitialHostLookup)
167169
if err != nil {
168170
log.Fatal(4, "failed to initialize cassandra. %s", err)
169171
}

‎cmd/mt-update-ttl/main.go

+3
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ var (
3636
cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication")
3737
cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication")
3838

39+
cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
40+
3941
startTs = flag.Int("start-timestamp", 0, "timestamp at which to start, defaults to 0")
4042
endTs = flag.Int("end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max")
4143
numThreads = flag.Int("threads", 1, "number of workers to use to process data")
@@ -100,6 +102,7 @@ func NewCassandraStore() (*gocql.Session, error) {
100102
cluster.ProtoVersion = *cqlProtocolVersion
101103
cluster.Keyspace = *cassandraKeyspace
102104
cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: *cassandraRetries}
105+
cluster.DisableInitialHostLookup = *cassandraDisableInitialHostLookup
103106

104107
switch *cassandraHostSelectionPolicy {
105108
case "roundrobin":

‎cmd/mt-whisper-importer-writer/main.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ var (
9494

9595
cassandraSchemaFile = flag.String("cassandra-schema-file", "/etc/metrictank/schema-store-cassandra.toml", "File containing the needed schemas in case database needs initializing")
9696

97+
cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
98+
9799
gitHash = "(none)"
98100
)
99101

@@ -153,7 +155,7 @@ func main() {
153155
log.SetLevel(log.InfoLevel)
154156
}
155157

156-
store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil)
158+
store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil, *cassandraDisableInitialHostLookup)
157159
if err != nil {
158160
panic(fmt.Sprintf("Failed to initialize cassandra: %q", err))
159161
}

‎docker/docker-chaos/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ cassandra-auth = false
7272
cassandra-username = cassandra
7373
# password for authentication
7474
cassandra-password = cassandra
75+
# instruct the driver to not attempt to get host info from the system.peers table
76+
cassandra-disable-initial-host-lookup = false
7577

7678
## Profiling and logging ##
7779

@@ -356,6 +358,8 @@ password = cassandra
356358
create-keyspace = false
357359
# File containing the needed schemas in case database needs initializing
358360
schema-file = /etc/metrictank/schema-idx-cassandra.toml
361+
# instruct the driver to not attempt to get host info from the system.peers table
362+
disable-initial-host-lookup = false
359363

360364
### in-memory only
361365
[memory-idx]

‎docker/docker-cluster/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ cassandra-auth = false
7272
cassandra-username = cassandra
7373
# password for authentication
7474
cassandra-password = cassandra
75+
# instruct the driver to not attempt to get host info from the system.peers table
76+
cassandra-disable-initial-host-lookup = false
7577

7678
## Profiling and logging ##
7779

@@ -356,6 +358,8 @@ password = cassandra
356358
create-keyspace = false
357359
# File containing the needed schemas in case database needs initializing
358360
schema-file = /etc/metrictank/schema-idx-cassandra.toml
361+
# instruct the driver to not attempt to get host info from the system.peers table
362+
disable-initial-host-lookup = false
359363

360364
### in-memory only
361365
[memory-idx]
+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
version: '2'
2+
3+
services:
4+
metrictank:
5+
hostname: metrictank
6+
image: grafana/metrictank
7+
ports:
8+
- "6060:6060"
9+
- "2003:2003"
10+
volumes:
11+
- ../../build/metrictank:/usr/bin/metrictank
12+
- ../../scripts/config/metrictank-docker.ini:/etc/metrictank/metrictank.ini
13+
- ./storage-schemas.conf:/etc/metrictank/storage-schemas.conf
14+
- ./storage-aggregation.conf:/etc/metrictank/storage-aggregation.conf
15+
environment:
16+
WAIT_HOSTS: kafka:9092
17+
WAIT_TIMEOUT: 60
18+
MT_STATS_ADDR: graphite:2003
19+
MT_CASSANDRA_ADDRS: <addr>
20+
MT_CASSANDRA_TIMEOUT: 30000
21+
MT_CASSANDRA_SSL: "true"
22+
MT_CASSANDRA_CA_PATH: ""
23+
MT_CASSANDRA_HOST_VERIFICATION: "false"
24+
MT_CASSANDRA_DISABLE_INITIAL_HOST_LOOKUP: "true"
25+
MT_CASSANDRA_AUTH: "true"
26+
MT_CASSANDRA_USERNAME: <user>
27+
MT_CASSANDRA_PASSWORD: <pass>
28+
MT_CASSANDRA_IDX_HOSTS: <addr>
29+
MT_CASSANDRA_IDX_TIMEOUT: "30s"
30+
MT_CASSANDRA_IDX_DISABLE_INITIAL_HOST_LOOKUP: "true"
31+
MT_CASSANDRA_IDX_NUM_CONNS: 1
32+
MT_CASSANDRA_IDX_SSL: "true"
33+
MT_CASSANDRA_IDX_CA_PATH: ""
34+
MT_CASSANDRA_IDX_HOST_VERIFICATION: "false"
35+
MT_CASSANDRA_IDX_AUTH: "true"
36+
MT_CASSANDRA_IDX_USERNAME: <user>
37+
MT_CASSANDRA_IDX_PASSWORD: <pass>
38+
links:
39+
- kafka
40+
41+
kafka:
42+
hostname: kafka
43+
image: raintank/kafka:latest
44+
environment:
45+
ADVERTISED_HOST: kafka
46+
NUM_PARTITIONS: 8
47+
ports:
48+
- "2181:2181"
49+
- "9092:9092"
50+
- "9999:9999"
51+
volumes:
52+
- /tmp/kafka-logs
53+
54+
graphite:
55+
hostname: graphite
56+
image: graphiteapp/graphite-statsd:latest
57+
ports:
58+
- "8080:80"
59+
60+
grafana:
61+
hostname: grafana
62+
image: grafana/grafana:latest
63+
ports:
64+
- "3000:3000"
65+
66+
tsdb-gw:
67+
hostname: tsdb-gw
68+
image: raintank/tsdb-gw:latest
69+
ports:
70+
- "8081:80"
71+
environment:
72+
GW_GRAPHITE_URL: http://metrictank:6060
73+
GW_METRICS_PUBLISH: "true"
74+
GW_METRICS_KAFKA_COMP: snappy
75+
GW_KAFKA_TCP_ADDR: kafka:9092
76+
GW_STATS_ENABLED: "true"
77+
GW_STATS_PREFIX: "tsdb-gw.stats.dev.tsdbgw_tsdb-gw_1"
78+
GW_STATS_ADDR: "graphite:2003"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf
2+
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf
3+
# Note:
4+
# * This file is optional. If it is not present, we will use avg for everything
5+
# * Anything not matched also uses avg for everything
6+
# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5.
7+
# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average.
8+
# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do.
9+
# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading.
10+
# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank.
11+
#
12+
# see https://github.com/grafana/metrictank/blob/master/docs/consolidation.md for related info.
13+
14+
[default]
15+
pattern = .*
16+
xFilesFactor = 0.5
17+
aggregationMethod = avg,min,max
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# This config file sets up your retention rules.
2+
# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf
3+
# Note:
4+
# * You can have 0 to N sections
5+
# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks
6+
# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern.
7+
# * When running a cluster of metrictank instances, all instances should have the same agg-settings.
8+
# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those
9+
# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready.
10+
# (note in particular that if you remove archives here, we will no longer read from them)
11+
# * Retentions must be specified in order of increasing interval and retention
12+
# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory.
13+
#
14+
# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size.
15+
# The retentions line can specify multiple retention definitions. You need one or more, space separated.
16+
#
17+
# There are 2 formats for a single retention definition:
18+
# 1) 'series-interval:count-of-datapoints' legacy and not easy to read
19+
# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields
20+
#
21+
#Series intervals and retentions are specified using the following suffixes:
22+
#
23+
#s - second
24+
#m - minute
25+
#h - hour
26+
#d - day
27+
#y - year
28+
#
29+
# The final 3 fields are specific to metrictank and if unspecified, use sane defaults.
30+
# See https://github.com/grafana/metrictank/blob/master/docs/memory-server.md for more details
31+
#
32+
# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min...
33+
# must be valid value as described here https://github.com/grafana/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans
34+
# Defaults to a the smallest chunkspan that can hold at least 100 points.
35+
#
36+
# numchunks: number of raw chunks to keep in in-memory ring buffer
37+
# See https://github.com/grafana/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache
38+
# which may be a more effective method to cache data and alleviate workload for cassandra.
39+
# Defaults to 2
40+
#
41+
# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated
42+
# so you rather query other archives, even if they don't have the retention to serve your queries
43+
# Defaults to true
44+
#
45+
# Here's an example with multiple retentions:
46+
# [apache_busyWorkers]
47+
# pattern = ^servers\.www.*\.workers\.busyWorkers$
48+
# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false
49+
#
50+
# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory
51+
# and the last rollup is marked as not ready yet for querying.
52+
53+
[default]
54+
pattern = .*
55+
retentions = 1s:10m:2min:2,1m:20m:5min:2
56+
# reorderBuffer = 20

‎docker/docker-dev-custom-cfg-kafka/metrictank.ini

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ cassandra-auth = false
7272
cassandra-username = cassandra
7373
# password for authentication
7474
cassandra-password = cassandra
75+
# instruct the driver to not attempt to get host info from the system.peers table
76+
cassandra-disable-initial-host-lookup = false
7577

7678
## Profiling and logging ##
7779

@@ -356,6 +358,8 @@ password = cassandra
356358
create-keyspace = true
357359
# File containing the needed schemas in case database needs initializing
358360
schema-file = /etc/metrictank/schema-idx-cassandra.toml
361+
# instruct the driver to not attempt to get host info from the system.peers table
362+
disable-initial-host-lookup = false
359363

360364
### in-memory only
361365
[memory-idx]

‎docs/config.md

+4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ cassandra-auth = false
102102
cassandra-username = cassandra
103103
# password for authentication
104104
cassandra-password = cassandra
105+
# instruct the driver to not attempt to get host info from the system.peers table
106+
cassandra-disable-initial-host-lookup = false
105107
```
106108

107109
## Profiling and logging ##
@@ -418,6 +420,8 @@ password = cassandra
418420
create-keyspace = true
419421
# File containing the needed schemas in case database needs initializing
420422
schema-file = /etc/metrictank/schema-idx-cassandra.toml
423+
# instruct the driver to not attempt to get host info from the system.peers table
424+
disable-initial-host-lookup = false
421425
```
422426

423427
### in-memory only

‎docs/tools.md

+10
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ cass config flags:
9494
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
9595
-create-keyspace
9696
enable the creation of the index keyspace and tables, only one node needs this (default true)
97+
-disable-initial-host-lookup
98+
instruct the driver to not attempt to get host info from the system.peers table
9799
-enabled
98100
(default true)
99101
-host-verification
@@ -279,6 +281,8 @@ Flags:
279281
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
280282
-cassandra-create-keyspace
281283
enable the creation of the metrictank keyspace (default true)
284+
-cassandra-disable-initial-host-lookup
285+
instruct the driver to not attempt to get host info from the system.peers table
282286
-cassandra-host-selection-policy string
283287
(default "tokenaware,hostpool-epsilon-greedy")
284288
-cassandra-host-verification
@@ -337,6 +341,8 @@ Flags:
337341
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
338342
-cassandra-create-keyspace
339343
enable the creation of the metrictank keyspace (default true)
344+
-cassandra-disable-initial-host-lookup
345+
instruct the driver to not attempt to get host info from the system.peers table
340346
-cassandra-host-selection-policy string
341347
(default "tokenaware,hostpool-epsilon-greedy")
342348
-cassandra-host-verification
@@ -457,6 +463,8 @@ Flags:
457463
max number of concurrent reads to cassandra. (default 20)
458464
-cassandra-consistency string
459465
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
466+
-cassandra-disable-initial-host-lookup
467+
instruct the driver to not attempt to get host info from the system.peers table
460468
-cassandra-host-selection-policy string
461469
(default "tokenaware,hostpool-epsilon-greedy")
462470
-cassandra-host-verification
@@ -612,6 +620,8 @@ cass config flags:
612620
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
613621
-create-keyspace
614622
enable the creation of the index keyspace and tables, only one node needs this (default true)
623+
-disable-initial-host-lookup
624+
instruct the driver to not attempt to get host info from the system.peers table
615625
-enabled
616626
(default true)
617627
-host-verification

‎idx/cassandra/cassandra.go

+37-27
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package cassandra
33
import (
44
"flag"
55
"fmt"
6+
"strconv"
67
"strings"
78
"sync"
89
"time"
@@ -48,27 +49,28 @@ var (
4849
statSaveSkipped = stats.NewCounter32("idx.cassandra.save.skipped")
4950
errmetrics = cassandra.NewErrMetrics("idx.cassandra")
5051

51-
Enabled bool
52-
ssl bool
53-
auth bool
54-
hostverification bool
55-
createKeyspace bool
56-
schemaFile string
57-
keyspace string
58-
hosts string
59-
capath string
60-
username string
61-
password string
62-
consistency string
63-
timeout time.Duration
64-
numConns int
65-
writeQueueSize int
66-
protoVer int
67-
maxStale time.Duration
68-
pruneInterval time.Duration
69-
updateCassIdx bool
70-
updateInterval time.Duration
71-
updateInterval32 uint32
52+
Enabled bool
53+
ssl bool
54+
auth bool
55+
hostverification bool
56+
createKeyspace bool
57+
schemaFile string
58+
keyspace string
59+
hosts string
60+
capath string
61+
username string
62+
password string
63+
consistency string
64+
timeout time.Duration
65+
numConns int
66+
writeQueueSize int
67+
protoVer int
68+
maxStale time.Duration
69+
pruneInterval time.Duration
70+
updateCassIdx bool
71+
updateInterval time.Duration
72+
updateInterval32 uint32
73+
disableInitialHostLookup bool
7274
)
7375

7476
func ConfigSetup() *flag.FlagSet {
@@ -88,7 +90,7 @@ func ConfigSetup() *flag.FlagSet {
8890
casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use")
8991
casIdx.BoolVar(&createKeyspace, "create-keyspace", true, "enable the creation of the index keyspace and tables, only one node needs this")
9092
casIdx.StringVar(&schemaFile, "schema-file", "/etc/metrictank/schema-idx-cassandra.toml", "File containing the needed schemas in case database needs initializing")
91-
93+
casIdx.BoolVar(&disableInitialHostLookup, "disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
9294
casIdx.BoolVar(&ssl, "ssl", false, "enable SSL connection to cassandra")
9395
casIdx.StringVar(&capath, "ca-path", "/etc/metrictank/ca.pem", "cassandra CA certficate path when using SSL")
9496
casIdx.BoolVar(&hostverification, "host-verification", true, "host (hostname and server cert) verification when using SSL")
@@ -125,8 +127,10 @@ func New() *CasIdx {
125127
cluster := gocql.NewCluster(strings.Split(hosts, ",")...)
126128
cluster.Consistency = gocql.ParseConsistency(consistency)
127129
cluster.Timeout = timeout
130+
cluster.ConnectTimeout = cluster.Timeout
128131
cluster.NumConns = numConns
129132
cluster.ProtoVersion = protoVer
133+
cluster.DisableInitialHostLookup = disableInitialHostLookup
130134
if ssl {
131135
cluster.SslOpts = &gocql.SslOptions{
132136
CaPath: capath,
@@ -166,10 +170,12 @@ func (c *CasIdx) InitBare() error {
166170

167171
// create the keyspace or ensure it exists
168172
if createKeyspace {
173+
log.Info("cassandra-idx: ensuring that keyspace %s exist.", keyspace)
169174
err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, keyspace)).Exec()
170175
if err != nil {
171176
return fmt.Errorf("failed to initialize cassandra keyspace: %s", err)
172177
}
178+
log.Info("cassandra-idx: ensuring that table metric_idx exist.")
173179
err = tmpSession.Query(fmt.Sprintf(schemaTable, keyspace)).Exec()
174180
if err != nil {
175181
return fmt.Errorf("failed to initialize cassandra table: %s", err)
@@ -359,9 +365,8 @@ func (c *CasIdx) rebuildIndex() {
359365
if maxStale != 0 {
360366
staleTs = uint32(time.Now().Add(maxStale * -1).Unix())
361367
}
362-
for _, partition := range cluster.Manager.GetPartitions() {
363-
defs = c.LoadPartition(partition, defs, staleTs)
364-
}
368+
defs = c.LoadPartitions(cluster.Manager.GetPartitions(), defs, staleTs)
369+
365370
num := c.MemoryIdx.Load(defs)
366371
log.Info("cassandra-idx Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre))
367372
}
@@ -371,8 +376,13 @@ func (c *CasIdx) Load(defs []schema.MetricDefinition, cutoff uint32) []schema.Me
371376
return c.load(defs, iter, cutoff)
372377
}
373378

374-
func (c *CasIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition {
375-
iter := c.session.Query("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx where partition=?", partition).Iter()
379+
func (c *CasIdx) LoadPartitions(partitions []int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition {
380+
placeholders := make([]string, len(partitions))
381+
for i, p := range partitions {
382+
placeholders[i] = strconv.Itoa(int(p))
383+
}
384+
q := fmt.Sprintf("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx where partition in (%s)", strings.Join(placeholders, ","))
385+
iter := c.session.Query(q).Iter()
376386
return c.load(defs, iter, cutoff)
377387
}
378388

‎metrictank-sample.ini

+4
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ cassandra-auth = false
7575
cassandra-username = cassandra
7676
# password for authentication
7777
cassandra-password = cassandra
78+
# instruct the driver to not attempt to get host info from the system.peers table
79+
cassandra-disable-initial-host-lookup = false
7880

7981
## Profiling and logging ##
8082

@@ -359,6 +361,8 @@ password = cassandra
359361
create-keyspace = true
360362
# File containing the needed schemas in case database needs initializing
361363
schema-file = /etc/metrictank/schema-idx-cassandra.toml
364+
# instruct the driver to not attempt to get host info from the system.peers table
365+
disable-initial-host-lookup = false
362366

363367
### in-memory only
364368
[memory-idx]

‎metrictank.go

+16-15
Original file line numberDiff line numberDiff line change
@@ -65,20 +65,21 @@ var (
6565
publicOrg = flag.Int("public-org", 0, "org Id for publically (any org) accessible data. leave 0 to disable")
6666

6767
// Cassandra:
68-
cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)")
69-
cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table")
70-
cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
71-
cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
72-
cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
73-
cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.")
74-
cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.")
75-
cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")
76-
cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have")
77-
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
78-
cassandraWindowFactor = flag.Int("cassandra-window-factor", 20, "size of compaction window relative to TTL")
79-
cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing")
80-
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
81-
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the mdata keyspace and tables, only one node needs this")
68+
cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)")
69+
cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table")
70+
cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
71+
cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "")
72+
cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds")
73+
cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.")
74+
cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.")
75+
cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")
76+
cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have")
77+
cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it")
78+
cassandraWindowFactor = flag.Int("cassandra-window-factor", 20, "size of compaction window relative to TTL")
79+
cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing")
80+
cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use")
81+
cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the mdata keyspace and tables, only one node needs this")
82+
cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table")
8283

8384
cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra")
8485
cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL")
@@ -293,7 +294,7 @@ func main() {
293294
/***********************************
294295
Initialize our backendStore
295296
***********************************/
296-
store, err = cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, mdata.TTLs())
297+
store, err = cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, mdata.TTLs(), *cassandraDisableInitialHostLookup)
297298
if err != nil {
298299
log.Fatal(4, "failed to initialize cassandra. %s", err)
299300
}

‎scripts/config/metrictank-docker.ini

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ cassandra-auth = false
7272
cassandra-username = cassandra
7373
# password for authentication
7474
cassandra-password = cassandra
75+
# instruct the driver to not attempt to get host info from the system.peers table
76+
cassandra-disable-initial-host-lookup = false
7577

7678
## Profiling and logging ##
7779

@@ -356,6 +358,8 @@ password = cassandra
356358
create-keyspace = true
357359
# File containing the needed schemas in case database needs initializing
358360
schema-file = /etc/metrictank/schema-idx-cassandra.toml
361+
# instruct the driver to not attempt to get host info from the system.peers table
362+
disable-initial-host-lookup = false
359363

360364
### in-memory only
361365
[memory-idx]

‎scripts/config/metrictank-package.ini

+4
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ cassandra-auth = false
7272
cassandra-username = cassandra
7373
# password for authentication
7474
cassandra-password = cassandra
75+
# instruct the driver to not attempt to get host info from the system.peers table
76+
cassandra-disable-initial-host-lookup = false
7577

7678
## Profiling and logging ##
7779

@@ -356,6 +358,8 @@ password = cassandra
356358
create-keyspace = true
357359
# File containing the needed schemas in case database needs initializing
358360
schema-file = /etc/metrictank/schema-idx-cassandra.toml
361+
# instruct the driver to not attempt to get host info from the system.peers table
362+
disable-initial-host-lookup = false
359363

360364
### in-memory only
361365
[memory-idx]

‎store/cassandra/store_cassandra.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func GetTTLTable(ttl uint32, windowFactor int, nameFormat string) ttlTable {
170170
}
171171
}
172172

173-
func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, createKeyspace bool, schemaFile string, ttls []uint32) (*CassandraStore, error) {
173+
func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, createKeyspace bool, schemaFile string, ttls []uint32, disableInitialHostLookup bool) (*CassandraStore, error) {
174174

175175
stats.NewGauge32("store.cassandra.write_queue.size").Set(writeqsize)
176176
stats.NewGauge32("store.cassandra.num_writers").Set(writers)
@@ -190,11 +190,14 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password,
190190
}
191191
cluster.Consistency = gocql.ParseConsistency(consistency)
192192
cluster.Timeout = time.Duration(timeout) * time.Millisecond
193+
cluster.ConnectTimeout = cluster.Timeout
193194
cluster.NumConns = writers
194195
cluster.ProtoVersion = protoVer
196+
cluster.DisableInitialHostLookup = disableInitialHostLookup
195197
var err error
196198
tmpSession, err := cluster.CreateSession()
197199
if err != nil {
200+
log.Error(3, "cassandra_store: failed to create cassandra session. %s", err.Error())
198201
return nil, err
199202
}
200203

@@ -205,11 +208,13 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password,
205208

206209
// create or verify the metrictank keyspace
207210
if createKeyspace {
211+
log.Info("cassandra_store: ensuring that keyspace %s exists.", keyspace)
208212
err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, keyspace)).Exec()
209213
if err != nil {
210214
return nil, err
211215
}
212216
for _, result := range ttlTables {
217+
log.Info("cassandra_store: ensuring that table %s exists.", result.Table)
213218
err := tmpSession.Query(fmt.Sprintf(schemaTable, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec()
214219
if err != nil {
215220
return nil, err

0 commit comments

Comments
 (0)
This repository has been archived.