diff --git a/cmd/mt-split-metrics-by-ttl/main.go b/cmd/mt-split-metrics-by-ttl/main.go index dc1e80754c..486ddb8b1b 100644 --- a/cmd/mt-split-metrics-by-ttl/main.go +++ b/cmd/mt-split-metrics-by-ttl/main.go @@ -32,6 +32,8 @@ var ( cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication") cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication") + cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") + // hard coded to default because those have no effect in the case of this tool anyway windowFactor = 20 cassandraOmitReadTimeout = 60 @@ -70,7 +72,7 @@ func main() { panic(fmt.Sprintf("Error creating directory: %s", err)) } - store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, ttls) + store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, cassandraReadConcurrency, cassandraReadConcurrency, cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, windowFactor, cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, ttls, *cassandraDisableInitialHostLookup) if err != nil { panic(fmt.Sprintf("Failed to instantiate cassandra: %s", err)) } diff --git a/cmd/mt-store-cat/main.go b/cmd/mt-store-cat/main.go index 8660850779..69f6c85d6f 100644 --- a/cmd/mt-store-cat/main.go +++ b/cmd/mt-store-cat/main.go @@ -53,6 +53,8 @@ var ( cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication") cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing") + cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") + // our own flags from = flag.String("from", "-24h", "get data from (inclusive). only for points and points-summary format") to = flag.String("to", "now", "get data until (exclusive). only for points and points-summary format") @@ -163,7 +165,7 @@ func main() { } } - store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil) + store, err := cassandra.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil, *cassandraDisableInitialHostLookup) if err != nil { log.Fatal(4, "failed to initialize cassandra. %s", err) } diff --git a/cmd/mt-update-ttl/main.go b/cmd/mt-update-ttl/main.go index 8d9ead0ec2..7358bd54b5 100644 --- a/cmd/mt-update-ttl/main.go +++ b/cmd/mt-update-ttl/main.go @@ -36,6 +36,8 @@ var ( cassandraUsername = flag.String("cassandra-username", "cassandra", "username for authentication") cassandraPassword = flag.String("cassandra-password", "cassandra", "password for authentication") + cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") + startTs = flag.Int("start-timestamp", 0, "timestamp at which to start, defaults to 0") endTs = flag.Int("end-timestamp", math.MaxInt32, "timestamp at which to stop, defaults to int max") numThreads = flag.Int("threads", 1, "number of workers to use to process data") @@ -100,6 +102,7 @@ func NewCassandraStore() (*gocql.Session, error) { cluster.ProtoVersion = *cqlProtocolVersion cluster.Keyspace = *cassandraKeyspace cluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: *cassandraRetries} + cluster.DisableInitialHostLookup = *cassandraDisableInitialHostLookup switch *cassandraHostSelectionPolicy { case "roundrobin": diff --git a/cmd/mt-whisper-importer-writer/main.go b/cmd/mt-whisper-importer-writer/main.go index 1045d05cc1..df98bb8d7f 100644 --- a/cmd/mt-whisper-importer-writer/main.go +++ b/cmd/mt-whisper-importer-writer/main.go @@ -94,6 +94,8 @@ var ( cassandraSchemaFile = flag.String("cassandra-schema-file", "/etc/metrictank/schema-store-cassandra.toml", "File containing the needed schemas in case database needs initializing") + cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") + gitHash = "(none)" ) @@ -153,7 +155,7 @@ func main() { log.SetLevel(log.InfoLevel) } - store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil) + store, err := cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraReadConcurrency, *cassandraReadQueueSize, 0, *cassandraRetries, *cqlProtocolVersion, *windowFactor, 60, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, nil, *cassandraDisableInitialHostLookup) if err != nil { panic(fmt.Sprintf("Failed to initialize cassandra: %q", err)) } diff --git a/docker/docker-chaos/metrictank.ini b/docker/docker-chaos/metrictank.ini index 9f49c7fe34..be4575e218 100644 --- a/docker/docker-chaos/metrictank.ini +++ b/docker/docker-chaos/metrictank.ini @@ -72,6 +72,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -356,6 +358,8 @@ password = cassandra create-keyspace = false # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/docker/docker-cluster/metrictank.ini b/docker/docker-cluster/metrictank.ini index 4b9e384206..23e29bafb2 100644 --- a/docker/docker-cluster/metrictank.ini +++ b/docker/docker-cluster/metrictank.ini @@ -72,6 +72,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -356,6 +358,8 @@ password = cassandra create-keyspace = false # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/docker/docker-cosmosdb/docker-compose.yml b/docker/docker-cosmosdb/docker-compose.yml new file mode 100644 index 0000000000..e9c7123dfb --- /dev/null +++ b/docker/docker-cosmosdb/docker-compose.yml @@ -0,0 +1,78 @@ +version: '2' + +services: + metrictank: + hostname: metrictank + image: grafana/metrictank + ports: + - "6060:6060" + - "2003:2003" + volumes: + - ../../build/metrictank:/usr/bin/metrictank + - ../../scripts/config/metrictank-docker.ini:/etc/metrictank/metrictank.ini + - ./storage-schemas.conf:/etc/metrictank/storage-schemas.conf + - ./storage-aggregation.conf:/etc/metrictank/storage-aggregation.conf + environment: + WAIT_HOSTS: kafka:9092 + WAIT_TIMEOUT: 60 + MT_STATS_ADDR: graphite:2003 + MT_CASSANDRA_ADDRS: + MT_CASSANDRA_TIMEOUT: 30000 + MT_CASSANDRA_SSL: "true" + MT_CASSANDRA_CA_PATH: "" + MT_CASSANDRA_HOST_VERIFICATION: "false" + MT_CASSANDRA_DISABLE_INITIAL_HOST_LOOKUP: "true" + MT_CASSANDRA_AUTH: "true" + MT_CASSANDRA_USERNAME: + MT_CASSANDRA_PASSWORD: + MT_CASSANDRA_IDX_HOSTS: + MT_CASSANDRA_IDX_TIMEOUT: "30s" + MT_CASSANDRA_IDX_DISABLE_INITIAL_HOST_LOOKUP: "true" + MT_CASSANDRA_IDX_NUM_CONNS: 1 + MT_CASSANDRA_IDX_SSL: "true" + MT_CASSANDRA_IDX_CA_PATH: "" + MT_CASSANDRA_IDX_HOST_VERIFICATION: "false" + MT_CASSANDRA_IDX_AUTH: "true" + MT_CASSANDRA_IDX_USERNAME: + MT_CASSANDRA_IDX_PASSWORD: + links: + - kafka + + kafka: + hostname: kafka + image: raintank/kafka:latest + environment: + ADVERTISED_HOST: kafka + NUM_PARTITIONS: 8 + ports: + - "2181:2181" + - "9092:9092" + - "9999:9999" + volumes: + - /tmp/kafka-logs + + graphite: + hostname: graphite + image: graphiteapp/graphite-statsd:latest + ports: + - "8080:80" + + grafana: + hostname: grafana + image: grafana/grafana:latest + ports: + - "3000:3000" + + tsdb-gw: + hostname: tsdb-gw + image: raintank/tsdb-gw:latest + ports: + - "8081:80" + environment: + GW_GRAPHITE_URL: http://metrictank:6060 + GW_METRICS_PUBLISH: "true" + GW_METRICS_KAFKA_COMP: snappy + GW_KAFKA_TCP_ADDR: kafka:9092 + GW_STATS_ENABLED: "true" + GW_STATS_PREFIX: "tsdb-gw.stats.dev.tsdbgw_tsdb-gw_1" + GW_STATS_ADDR: "graphite:2003" diff --git a/docker/docker-cosmosdb/storage-aggregation.conf b/docker/docker-cosmosdb/storage-aggregation.conf new file mode 100644 index 0000000000..ba13dc0d85 --- /dev/null +++ b/docker/docker-cosmosdb/storage-aggregation.conf @@ -0,0 +1,17 @@ +# This config file controls which summaries are created (using which consolidation functions) for your lower-precision archives, as defined in storage-schemas.conf +# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-aggregation-conf +# Note: +# * This file is optional. If it is not present, we will use avg for everything +# * Anything not matched also uses avg for everything +# * xFilesFactor is not honored yet. What it is in graphite is a floating point number between 0 and 1 specifying what fraction of the previous retention level's slots must have non-null values in order to aggregate to a non-null value. The default is 0.5. +# * aggregationMethod specifies the functions used to aggregate values for the next retention level. Legal methods are avg/average, sum, min, max, and last. The default is average. +# Unlike Graphite, you can specify multiple, as it is often handy to have different summaries available depending on what analysis you need to do. +# When using multiple, the first one is used for reading. In the future, we will add capabilities to select the different archives for reading. +# * the settings configured when metrictank starts are what is applied. So you can enable or disable archives by restarting metrictank. +# +# see https://github.com/grafana/metrictank/blob/master/docs/consolidation.md for related info. + +[default] +pattern = .* +xFilesFactor = 0.5 +aggregationMethod = avg,min,max diff --git a/docker/docker-cosmosdb/storage-schemas.conf b/docker/docker-cosmosdb/storage-schemas.conf new file mode 100644 index 0000000000..d753a66dd0 --- /dev/null +++ b/docker/docker-cosmosdb/storage-schemas.conf @@ -0,0 +1,56 @@ +# This config file sets up your retention rules. +# It is an extension of http://graphite.readthedocs.io/en/latest/config-carbon.html#storage-schemas-conf +# Note: +# * You can have 0 to N sections +# * The first match wins, starting from the top. If no match found, we default to single archive of minutely points, retained for 7 days in 2h chunks +# * The patterns are unanchored regular expressions, add '^' or '$' to match the beginning or end of a pattern. +# * When running a cluster of metrictank instances, all instances should have the same agg-settings. +# * Unlike whisper (graphite), the config doesn't stick: if you restart metrictank with updated settings, then those +# will be applied. The configured rollups will be saved by primary nodes and served in responses if they are ready. +# (note in particular that if you remove archives here, we will no longer read from them) +# * Retentions must be specified in order of increasing interval and retention +# * The reorderBuffer an optional buffer that temporarily keeps data points in memory as raw data and allows insertion at random order. The specified value is how many datapoints, based on the raw interval specified in the first defined retention, should be kept before they are flushed out. This is useful if the metric producers cannot guarantee that the data will arrive in order, but it is relatively memory intensive. If you are unsure whether you need this, better leave it disabled to not waste memory. +# +# A given rule is made up of at least 3 lines: the name, regex pattern, retentions and optionally the reorder buffer size. +# The retentions line can specify multiple retention definitions. You need one or more, space separated. +# +# There are 2 formats for a single retention definition: +# 1) 'series-interval:count-of-datapoints' legacy and not easy to read +# 2) 'series-interval:retention[:chunkspan:numchunks:ready]' more friendly format with optionally 3 extra fields +# +#Series intervals and retentions are specified using the following suffixes: +# +#s - second +#m - minute +#h - hour +#d - day +#y - year +# +# The final 3 fields are specific to metrictank and if unspecified, use sane defaults. +# See https://github.com/grafana/metrictank/blob/master/docs/memory-server.md for more details +# +# chunkspan: duration of chunks. e.g. 10min, 30min, 1h, 90min... +# must be valid value as described here https://github.com/grafana/metrictank/blob/master/docs/memory-server.md#valid-chunk-spans +# Defaults to a the smallest chunkspan that can hold at least 100 points. +# +# numchunks: number of raw chunks to keep in in-memory ring buffer +# See https://github.com/grafana/metrictank/blob/master/docs/memory-server.md for details and trade-offs, especially when compared to chunk-cache +# which may be a more effective method to cache data and alleviate workload for cassandra. +# Defaults to 2 +# +# ready: whether the archive is ready for querying. This is useful if you recently introduced a new archive, but it's still being populated +# so you rather query other archives, even if they don't have the retention to serve your queries +# Defaults to true +# +# Here's an example with multiple retentions: +# [apache_busyWorkers] +# pattern = ^servers\.www.*\.workers\.busyWorkers$ +# retentions = 1s:1d:10min:1,1m:21d,15m:5y:2h:1:false +# +# This example has 3 retention definitions, the first and last override some default options (to use 10minutely and 2hourly chunks and only keep one of them in memory +# and the last rollup is marked as not ready yet for querying. + +[default] +pattern = .* +retentions = 1s:10m:2min:2,1m:20m:5min:2 +# reorderBuffer = 20 diff --git a/docker/docker-dev-custom-cfg-kafka/metrictank.ini b/docker/docker-dev-custom-cfg-kafka/metrictank.ini index 640b915abf..47540e2c3d 100644 --- a/docker/docker-dev-custom-cfg-kafka/metrictank.ini +++ b/docker/docker-dev-custom-cfg-kafka/metrictank.ini @@ -72,6 +72,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -356,6 +358,8 @@ password = cassandra create-keyspace = true # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/docs/config.md b/docs/config.md index 70c55651e7..0b25372086 100644 --- a/docs/config.md +++ b/docs/config.md @@ -102,6 +102,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ``` ## Profiling and logging ## @@ -418,6 +420,8 @@ password = cassandra create-keyspace = true # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ``` ### in-memory only diff --git a/docs/tools.md b/docs/tools.md index 9d34e4cf47..84380fa9c9 100644 --- a/docs/tools.md +++ b/docs/tools.md @@ -94,6 +94,8 @@ cass config flags: write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") -create-keyspace enable the creation of the index keyspace and tables, only one node needs this (default true) + -disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table -enabled (default true) -host-verification @@ -277,6 +279,8 @@ Flags: write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") -cassandra-create-keyspace enable the creation of the metrictank keyspace (default true) + -cassandra-disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table -cassandra-host-selection-policy string (default "tokenaware,hostpool-epsilon-greedy") -cassandra-host-verification @@ -335,6 +339,8 @@ Flags: write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") -cassandra-create-keyspace enable the creation of the metrictank keyspace (default true) + -cassandra-disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table -cassandra-host-selection-policy string (default "tokenaware,hostpool-epsilon-greedy") -cassandra-host-verification @@ -455,6 +461,8 @@ Flags: max number of concurrent reads to cassandra. (default 20) -cassandra-consistency string write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") + -cassandra-disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table -cassandra-host-selection-policy string (default "tokenaware,hostpool-epsilon-greedy") -cassandra-host-verification @@ -610,6 +618,8 @@ cass config flags: write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one") -create-keyspace enable the creation of the index keyspace and tables, only one node needs this (default true) + -disable-initial-host-lookup + instruct the driver to not attempt to get host info from the system.peers table -enabled (default true) -host-verification diff --git a/idx/cassandra/cassandra.go b/idx/cassandra/cassandra.go index 71f1cfc9c2..055105cc54 100644 --- a/idx/cassandra/cassandra.go +++ b/idx/cassandra/cassandra.go @@ -3,6 +3,7 @@ package cassandra import ( "flag" "fmt" + "strconv" "strings" "sync" "time" @@ -48,27 +49,28 @@ var ( statSaveSkipped = stats.NewCounter32("idx.cassandra.save.skipped") errmetrics = cassandra.NewErrMetrics("idx.cassandra") - Enabled bool - ssl bool - auth bool - hostverification bool - createKeyspace bool - schemaFile string - keyspace string - hosts string - capath string - username string - password string - consistency string - timeout time.Duration - numConns int - writeQueueSize int - protoVer int - maxStale time.Duration - pruneInterval time.Duration - updateCassIdx bool - updateInterval time.Duration - updateInterval32 uint32 + Enabled bool + ssl bool + auth bool + hostverification bool + createKeyspace bool + schemaFile string + keyspace string + hosts string + capath string + username string + password string + consistency string + timeout time.Duration + numConns int + writeQueueSize int + protoVer int + maxStale time.Duration + pruneInterval time.Duration + updateCassIdx bool + updateInterval time.Duration + updateInterval32 uint32 + disableInitialHostLookup bool ) func ConfigSetup() *flag.FlagSet { @@ -88,7 +90,7 @@ func ConfigSetup() *flag.FlagSet { casIdx.IntVar(&protoVer, "protocol-version", 4, "cql protocol version to use") casIdx.BoolVar(&createKeyspace, "create-keyspace", true, "enable the creation of the index keyspace and tables, only one node needs this") casIdx.StringVar(&schemaFile, "schema-file", "/etc/metrictank/schema-idx-cassandra.toml", "File containing the needed schemas in case database needs initializing") - + casIdx.BoolVar(&disableInitialHostLookup, "disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") casIdx.BoolVar(&ssl, "ssl", false, "enable SSL connection to cassandra") casIdx.StringVar(&capath, "ca-path", "/etc/metrictank/ca.pem", "cassandra CA certficate path when using SSL") casIdx.BoolVar(&hostverification, "host-verification", true, "host (hostname and server cert) verification when using SSL") @@ -125,8 +127,10 @@ func New() *CasIdx { cluster := gocql.NewCluster(strings.Split(hosts, ",")...) cluster.Consistency = gocql.ParseConsistency(consistency) cluster.Timeout = timeout + cluster.ConnectTimeout = cluster.Timeout cluster.NumConns = numConns cluster.ProtoVersion = protoVer + cluster.DisableInitialHostLookup = disableInitialHostLookup if ssl { cluster.SslOpts = &gocql.SslOptions{ CaPath: capath, @@ -166,10 +170,12 @@ func (c *CasIdx) InitBare() error { // create the keyspace or ensure it exists if createKeyspace { + log.Info("cassandra-idx: ensuring that keyspace %s exist.", keyspace) err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, keyspace)).Exec() if err != nil { return fmt.Errorf("failed to initialize cassandra keyspace: %s", err) } + log.Info("cassandra-idx: ensuring that table metric_idx exist.") err = tmpSession.Query(fmt.Sprintf(schemaTable, keyspace)).Exec() if err != nil { return fmt.Errorf("failed to initialize cassandra table: %s", err) @@ -359,9 +365,8 @@ func (c *CasIdx) rebuildIndex() { if maxStale != 0 { staleTs = uint32(time.Now().Add(maxStale * -1).Unix()) } - for _, partition := range cluster.Manager.GetPartitions() { - defs = c.LoadPartition(partition, defs, staleTs) - } + defs = c.LoadPartitions(cluster.Manager.GetPartitions(), defs, staleTs) + num := c.MemoryIdx.Load(defs) log.Info("cassandra-idx Rebuilding Memory Index Complete. Imported %d. Took %s", num, time.Since(pre)) } @@ -371,8 +376,13 @@ func (c *CasIdx) Load(defs []schema.MetricDefinition, cutoff uint32) []schema.Me return c.load(defs, iter, cutoff) } -func (c *CasIdx) LoadPartition(partition int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition { - iter := c.session.Query("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx where partition=?", partition).Iter() +func (c *CasIdx) LoadPartitions(partitions []int32, defs []schema.MetricDefinition, cutoff uint32) []schema.MetricDefinition { + placeholders := make([]string, len(partitions)) + for i, p := range partitions { + placeholders[i] = strconv.Itoa(int(p)) + } + q := fmt.Sprintf("SELECT id, orgid, partition, name, interval, unit, mtype, tags, lastupdate from metric_idx where partition in (%s)", strings.Join(placeholders, ",")) + iter := c.session.Query(q).Iter() return c.load(defs, iter, cutoff) } diff --git a/metrictank-sample.ini b/metrictank-sample.ini index 2f1dcd5d51..0568456b24 100644 --- a/metrictank-sample.ini +++ b/metrictank-sample.ini @@ -75,6 +75,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -359,6 +361,8 @@ password = cassandra create-keyspace = true # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/metrictank.go b/metrictank.go index 8e16b72e94..82be3eece6 100644 --- a/metrictank.go +++ b/metrictank.go @@ -65,20 +65,21 @@ var ( publicOrg = flag.Int("public-org", 0, "org Id for publically (any org) accessible data. leave 0 to disable") // Cassandra: - cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") - cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table") - cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one") - cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "") - cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds") - cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.") - cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.") - cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.") - cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have") - cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") - cassandraWindowFactor = flag.Int("cassandra-window-factor", 20, "size of compaction window relative to TTL") - cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing") - cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") - cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the mdata keyspace and tables, only one node needs this") + cassandraAddrs = flag.String("cassandra-addrs", "localhost", "cassandra host (may be given multiple times as comma-separated list)") + cassandraKeyspace = flag.String("cassandra-keyspace", "metrictank", "cassandra keyspace to use for storing the metric data table") + cassandraConsistency = flag.String("cassandra-consistency", "one", "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one") + cassandraHostSelectionPolicy = flag.String("cassandra-host-selection-policy", "tokenaware,hostpool-epsilon-greedy", "") + cassandraTimeout = flag.Int("cassandra-timeout", 1000, "cassandra timeout in milliseconds") + cassandraReadConcurrency = flag.Int("cassandra-read-concurrency", 20, "max number of concurrent reads to cassandra.") + cassandraWriteConcurrency = flag.Int("cassandra-write-concurrency", 10, "max number of concurrent writes to cassandra.") + cassandraReadQueueSize = flag.Int("cassandra-read-queue-size", 200000, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.") + cassandraWriteQueueSize = flag.Int("cassandra-write-queue-size", 100000, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have") + cassandraRetries = flag.Int("cassandra-retries", 0, "how many times to retry a query before failing it") + cassandraWindowFactor = flag.Int("cassandra-window-factor", 20, "size of compaction window relative to TTL") + cassandraOmitReadTimeout = flag.Int("cassandra-omit-read-timeout", 60, "if a read is older than this, it will directly be omitted without executing") + cqlProtocolVersion = flag.Int("cql-protocol-version", 4, "cql protocol version to use") + cassandraCreateKeyspace = flag.Bool("cassandra-create-keyspace", true, "enable the creation of the mdata keyspace and tables, only one node needs this") + cassandraDisableInitialHostLookup = flag.Bool("cassandra-disable-initial-host-lookup", false, "instruct the driver to not attempt to get host info from the system.peers table") cassandraSSL = flag.Bool("cassandra-ssl", false, "enable SSL connection to cassandra") cassandraCaPath = flag.String("cassandra-ca-path", "/etc/metrictank/ca.pem", "cassandra CA certificate path when using SSL") @@ -293,7 +294,7 @@ func main() { /*********************************** Initialize our backendStore ***********************************/ - store, err = cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, mdata.TTLs()) + store, err = cassandraStore.NewCassandraStore(*cassandraAddrs, *cassandraKeyspace, *cassandraConsistency, *cassandraCaPath, *cassandraUsername, *cassandraPassword, *cassandraHostSelectionPolicy, *cassandraTimeout, *cassandraReadConcurrency, *cassandraWriteConcurrency, *cassandraReadQueueSize, *cassandraWriteQueueSize, *cassandraRetries, *cqlProtocolVersion, *cassandraWindowFactor, *cassandraOmitReadTimeout, *cassandraSSL, *cassandraAuth, *cassandraHostVerification, *cassandraCreateKeyspace, *cassandraSchemaFile, mdata.TTLs(), *cassandraDisableInitialHostLookup) if err != nil { log.Fatal(4, "failed to initialize cassandra. %s", err) } diff --git a/scripts/config/metrictank-docker.ini b/scripts/config/metrictank-docker.ini index eea78ad1e5..d27c22ebc5 100644 --- a/scripts/config/metrictank-docker.ini +++ b/scripts/config/metrictank-docker.ini @@ -72,6 +72,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -356,6 +358,8 @@ password = cassandra create-keyspace = true # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/scripts/config/metrictank-package.ini b/scripts/config/metrictank-package.ini index fd7e27cb07..72f097c36f 100644 --- a/scripts/config/metrictank-package.ini +++ b/scripts/config/metrictank-package.ini @@ -72,6 +72,8 @@ cassandra-auth = false cassandra-username = cassandra # password for authentication cassandra-password = cassandra +# instruct the driver to not attempt to get host info from the system.peers table +cassandra-disable-initial-host-lookup = false ## Profiling and logging ## @@ -356,6 +358,8 @@ password = cassandra create-keyspace = true # File containing the needed schemas in case database needs initializing schema-file = /etc/metrictank/schema-idx-cassandra.toml +# instruct the driver to not attempt to get host info from the system.peers table +disable-initial-host-lookup = false ### in-memory only [memory-idx] diff --git a/store/cassandra/store_cassandra.go b/store/cassandra/store_cassandra.go index bb23122267..b7d512a498 100644 --- a/store/cassandra/store_cassandra.go +++ b/store/cassandra/store_cassandra.go @@ -170,7 +170,7 @@ func GetTTLTable(ttl uint32, windowFactor int, nameFormat string) ttlTable { } } -func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, createKeyspace bool, schemaFile string, ttls []uint32) (*CassandraStore, error) { +func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, hostSelectionPolicy string, timeout, readers, writers, readqsize, writeqsize, retries, protoVer, windowFactor, omitReadTimeout int, ssl, auth, hostVerification bool, createKeyspace bool, schemaFile string, ttls []uint32, disableInitialHostLookup bool) (*CassandraStore, error) { stats.NewGauge32("store.cassandra.write_queue.size").Set(writeqsize) stats.NewGauge32("store.cassandra.num_writers").Set(writers) @@ -190,11 +190,14 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, } cluster.Consistency = gocql.ParseConsistency(consistency) cluster.Timeout = time.Duration(timeout) * time.Millisecond + cluster.ConnectTimeout = cluster.Timeout cluster.NumConns = writers cluster.ProtoVersion = protoVer + cluster.DisableInitialHostLookup = disableInitialHostLookup var err error tmpSession, err := cluster.CreateSession() if err != nil { + log.Error(3, "cassandra_store: failed to create cassandra session. %s", err.Error()) return nil, err } @@ -205,11 +208,13 @@ func NewCassandraStore(addrs, keyspace, consistency, CaPath, Username, Password, // create or verify the metrictank keyspace if createKeyspace { + log.Info("cassandra_store: ensuring that keyspace %s exists.", keyspace) err = tmpSession.Query(fmt.Sprintf(schemaKeyspace, keyspace)).Exec() if err != nil { return nil, err } for _, result := range ttlTables { + log.Info("cassandra_store: ensuring that table %s exists.", result.Table) err := tmpSession.Query(fmt.Sprintf(schemaTable, keyspace, result.Table, result.WindowSize, result.WindowSize*60*60)).Exec() if err != nil { return nil, err