Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

Commit 9669930

Browse files
committed
configure the importer-writer the same way as MT
this makes the configuration of the importer-writer consistent with the way how MT and other MT-tools are configured. this makes sense to do now because with the added support for the bigtable store and index the configuration would get too complex if we'd try to come up with a way to configure all of that via cli arguments, so its easier and more consistent to just do it in the same way like everything else.
1 parent 632e0d3 commit 9669930

File tree

2 files changed

+78
-216
lines changed

2 files changed

+78
-216
lines changed

cmd/mt-whisper-importer-writer/main.go

+72-109
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package main
22

33
import (
4-
"errors"
54
"flag"
65
"fmt"
7-
"math"
86
"net/http"
97
"os"
108
"strings"
119
"time"
1210

13-
"github.com/gocql/gocql"
11+
"github.com/grafana/globalconf"
12+
"github.com/raintank/dur"
13+
"github.com/raintank/schema"
14+
log "github.com/sirupsen/logrus"
15+
1416
"github.com/grafana/metrictank/cluster"
1517
"github.com/grafana/metrictank/cluster/partitioner"
1618
"github.com/grafana/metrictank/idx"
@@ -20,54 +22,17 @@ import (
2022
"github.com/grafana/metrictank/mdata"
2123
bigTableStore "github.com/grafana/metrictank/store/bigtable"
2224
cassandraStore "github.com/grafana/metrictank/store/cassandra"
23-
"github.com/raintank/dur"
24-
"github.com/raintank/schema"
25-
log "github.com/sirupsen/logrus"
2625
)
2726

2827
var (
29-
globalFlags = flag.NewFlagSet("global config flags", flag.ExitOnError)
30-
31-
exitOnError = globalFlags.Bool(
32-
"exit-on-error",
33-
true,
34-
"Exit with a message when there's an error",
35-
)
36-
verbose = globalFlags.Bool(
37-
"verbose",
38-
false,
39-
"More detailed logging",
40-
)
41-
httpEndpoint = globalFlags.String(
42-
"http-endpoint",
43-
"127.0.0.1:8080",
44-
"The http endpoint to listen on",
45-
)
46-
ttlsStr = globalFlags.String(
47-
"ttls",
48-
"35d",
49-
"list of ttl strings used by MT separated by ','",
50-
)
51-
partitionScheme = globalFlags.String(
52-
"partition-scheme",
53-
"bySeries",
54-
"method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)",
55-
)
56-
uriPath = globalFlags.String(
57-
"uri-path",
58-
"/chunks",
59-
"the URI on which we expect chunks to get posted",
60-
)
61-
numPartitions = globalFlags.Int(
62-
"num-partitions",
63-
1,
64-
"Number of Partitions",
65-
)
66-
overwriteChunks = globalFlags.Bool(
67-
"overwrite-chunks",
68-
true,
69-
"If true existing chunks may be overwritten",
70-
)
28+
confFile = flag.String("config", "/etc/metrictank/metrictank.ini", "configuration file path")
29+
exitOnError = flag.Bool("exit-on-error", false, "Exit with a message when there's an error")
30+
httpEndpoint = flag.String("http-endpoint", "127.0.0.1:8080", "The http endpoint to listen on")
31+
ttlsStr = flag.String("ttls", "35d", "list of ttl strings used by MT separated by ','")
32+
partitionScheme = flag.String("partition-scheme", "bySeries", "method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries)")
33+
uriPath = flag.String("uri-path", "/chunks", "the URI on which we expect chunks to get posted")
34+
numPartitions = flag.Int("num-partitions", 1, "Number of Partitions")
35+
logLevel = flag.String("log-level", "info", "log level. panic|fatal|error|warning|info|debug")
7136

7237
version = "(none)"
7338
)
@@ -87,71 +52,69 @@ func init() {
8752
}
8853

8954
func main() {
90-
storeConfig := cassandraStore.NewStoreConfig()
91-
// we don't use the cassandraStore's writeQueue, so we hard code this to 0.
92-
storeConfig.WriteQueueSize = 0
93-
94-
// flags from cassandra/config.go, Cassandra
95-
globalFlags.StringVar(&storeConfig.Addrs, "cassandra-addrs", storeConfig.Addrs, "cassandra host (may be given multiple times as comma-separated list)")
96-
globalFlags.StringVar(&storeConfig.Keyspace, "cassandra-keyspace", storeConfig.Keyspace, "cassandra keyspace to use for storing the metric data table")
97-
globalFlags.StringVar(&storeConfig.Consistency, "cassandra-consistency", storeConfig.Consistency, "write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one")
98-
globalFlags.StringVar(&storeConfig.HostSelectionPolicy, "cassandra-host-selection-policy", storeConfig.HostSelectionPolicy, "")
99-
globalFlags.StringVar(&storeConfig.Timeout, "cassandra-timeout", storeConfig.Timeout, "cassandra timeout")
100-
globalFlags.IntVar(&storeConfig.ReadConcurrency, "cassandra-read-concurrency", storeConfig.ReadConcurrency, "max number of concurrent reads to cassandra.")
101-
globalFlags.IntVar(&storeConfig.WriteConcurrency, "cassandra-write-concurrency", storeConfig.WriteConcurrency, "max number of concurrent writes to cassandra.")
102-
globalFlags.IntVar(&storeConfig.ReadQueueSize, "cassandra-read-queue-size", storeConfig.ReadQueueSize, "max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel.")
103-
//flag.IntVar(&storeConfig.WriteQueueSize, "write-queue-size", storeConfig.WriteQueueSize, "write queue size per cassandra worker. should be large engough to hold all at least the total number of series expected, divided by how many workers you have")
104-
globalFlags.IntVar(&storeConfig.Retries, "cassandra-retries", storeConfig.Retries, "how many times to retry a query before failing it")
105-
globalFlags.IntVar(&storeConfig.WindowFactor, "cassandra-window-factor", storeConfig.WindowFactor, "size of compaction window relative to TTL")
106-
globalFlags.StringVar(&storeConfig.OmitReadTimeout, "cassandra-omit-read-timeout", storeConfig.OmitReadTimeout, "if a read is older than this, it will directly be omitted without executing")
107-
globalFlags.IntVar(&storeConfig.CqlProtocolVersion, "cql-protocol-version", storeConfig.CqlProtocolVersion, "cql protocol version to use")
108-
globalFlags.BoolVar(&storeConfig.CreateKeyspace, "cassandra-create-keyspace", storeConfig.CreateKeyspace, "enable the creation of the mdata keyspace and tables, only one node needs this")
109-
globalFlags.BoolVar(&storeConfig.DisableInitialHostLookup, "cassandra-disable-initial-host-lookup", storeConfig.DisableInitialHostLookup, "instruct the driver to not attempt to get host info from the system.peers table")
110-
globalFlags.BoolVar(&storeConfig.SSL, "cassandra-ssl", storeConfig.SSL, "enable SSL connection to cassandra")
111-
globalFlags.StringVar(&storeConfig.CaPath, "cassandra-ca-path", storeConfig.CaPath, "cassandra CA certificate path when using SSL")
112-
globalFlags.BoolVar(&storeConfig.HostVerification, "cassandra-host-verification", storeConfig.HostVerification, "host (hostname and server cert) verification when using SSL")
113-
globalFlags.BoolVar(&storeConfig.Auth, "cassandra-auth", storeConfig.Auth, "enable cassandra authentication")
114-
globalFlags.StringVar(&storeConfig.Username, "cassandra-username", storeConfig.Username, "username for authentication")
115-
globalFlags.StringVar(&storeConfig.Password, "cassandra-password", storeConfig.Password, "password for authentication")
116-
globalFlags.StringVar(&storeConfig.SchemaFile, "cassandra-schema-file", storeConfig.SchemaFile, "File containing the needed schemas in case database needs initializing")
117-
118-
cassFlags := cassandra.ConfigSetup()
119-
120-
flag.Usage = func() {
121-
fmt.Println("mt-whisper-importer-writer")
122-
fmt.Println()
123-
fmt.Println("Opens an endpoint to send data to, which then gets stored in the MT internal DB(s)")
124-
fmt.Println()
125-
fmt.Printf("Usage:\n\n")
126-
fmt.Printf(" mt-whisper-importer-writer [global config flags] <idxtype> [idx config flags] \n\n")
127-
fmt.Printf("global config flags:\n\n")
128-
globalFlags.PrintDefaults()
129-
fmt.Println()
130-
fmt.Printf("idxtype: only 'cass' supported for now\n\n")
131-
fmt.Printf("cass config flags:\n\n")
132-
cassFlags.PrintDefaults()
133-
fmt.Println()
134-
fmt.Println("EXAMPLES:")
135-
fmt.Println("mt-whisper-importer-writer -cassandra-addrs=192.168.0.1 -cassandra-keyspace=mydata -exit-on-error=true -fake-avg-aggregates=true -http-endpoint=0.0.0.0:8080 -num-partitions=8 -partition-scheme=bySeries -ttls=8d,2y -uri-path=/chunks -verbose=true -cassandra-window-factor=20 cass -hosts=192.168.0.1:9042 -keyspace=mydata")
136-
}
137-
138-
if len(os.Args) == 2 && (os.Args[1] == "-h" || os.Args[1] == "--help") {
139-
flag.Usage()
140-
os.Exit(0)
141-
}
142-
143-
var cassI int
144-
for i, v := range os.Args {
145-
if v == "cass" {
146-
cassI = i
147-
}
55+
flag.Parse()
56+
57+
// Only try and parse the conf file if it exists
58+
path := ""
59+
if _, err := os.Stat(*confFile); err == nil {
60+
path = *confFile
14861
}
149-
if cassI == 0 {
150-
fmt.Println("only indextype 'cass' supported")
151-
flag.Usage()
62+
config, err := globalconf.NewWithOptions(&globalconf.Options{
63+
Filename: path,
64+
EnvPrefix: "MT_",
65+
})
66+
if err != nil {
67+
fmt.Fprintf(os.Stderr, "FATAL: configuration file error: %s", err)
15268
os.Exit(1)
15369
}
15470

71+
mdata.ConfigSetup()
72+
cassandra.ConfigSetup()
73+
cassandraStore.ConfigSetup()
74+
bigtable.ConfigSetup()
75+
bigTableStore.ConfigSetup()
76+
77+
config.ParseAll()
78+
79+
formatter := &logger.TextFormatter{}
80+
formatter.TimestampFormat = "2006-01-02 15:04:05.000"
81+
log.SetFormatter(formatter)
82+
lvl, err := log.ParseLevel(*logLevel)
83+
if err != nil {
84+
log.Fatalf("failed to parse log-level, %s", err.Error())
85+
}
86+
log.SetLevel(lvl)
87+
log.Infof("logging level set to '%s'", *logLevel)
88+
89+
// the specified port is not relevant as we don't use clustering with this tool
90+
cluster.Init("mt-whisper-importer-writer", version, time.Now(), "http", int(80))
91+
92+
mdata.ConfigProcess()
93+
cassandra.ConfigProcess()
94+
bigtable.ConfigProcess()
95+
bigTableStore.ConfigProcess(mdata.MaxChunkSpan())
96+
97+
splits := strings.Split(*ttlsStr, ",")
98+
ttls := make([]uint32, 0)
99+
for _, split := range splits {
100+
ttls = append(ttls, dur.MustParseNDuration("ttl", split))
101+
}
102+
103+
if (cassandraStore.CliConfig.Enabled && bigTableStore.CliConfig.Enabled) || !(cassandraStore.CliConfig.Enabled || bigTableStore.CliConfig.Enabled) {
104+
log.Fatalf("exactly 1 backend store plugin must be enabled. cassandra: %t bigtable: %t", cassandraStore.CliConfig.Enabled, bigTableStore.CliConfig.Enabled)
105+
}
106+
if (cassandra.CliConfig.Enabled && bigtable.CliConfig.Enabled) || !(cassandra.CliConfig.Enabled || bigtable.CliConfig.Enabled) {
107+
log.Fatalf("exactly 1 backend index plugin must be enabled. cassandra: %t bigtable: %t", cassandra.CliConfig.Enabled, bigtable.CliConfig.Enabled)
108+
}
109+
110+
var index idx.MetricIndex
111+
if cassandra.CliConfig.Enabled {
112+
index = cassandra.New(cassandra.CliConfig)
113+
}
114+
if bigtable.CliConfig.Enabled {
115+
index = bigtable.New(bigtable.CliConfig)
116+
}
117+
155118
var store mdata.Store
156119
if cassandraStore.CliConfig.Enabled {
157120
store, err = cassandraStore.NewCassandraStore(cassandraStore.CliConfig, mdata.TTLs())

docs/tools.md

+6-107
Original file line numberDiff line numberDiff line change
@@ -702,123 +702,22 @@ Usage of ./mt-whisper-importer-reader:
702702
## mt-whisper-importer-writer
703703

704704
```
705-
mt-whisper-importer-writer
706-
707-
Opens an endpoint to send data to, which then gets stored in the MT internal DB(s)
708-
709-
Usage:
710-
711-
mt-whisper-importer-writer [global config flags] <idxtype> [idx config flags]
712-
713-
global config flags:
714-
715-
-cassandra-addrs string
716-
cassandra host (may be given multiple times as comma-separated list) (default "localhost")
717-
-cassandra-auth
718-
enable cassandra authentication
719-
-cassandra-ca-path string
720-
cassandra CA certificate path when using SSL (default "/etc/metrictank/ca.pem")
721-
-cassandra-consistency string
722-
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
723-
-cassandra-create-keyspace
724-
enable the creation of the mdata keyspace and tables, only one node needs this (default true)
725-
-cassandra-disable-initial-host-lookup
726-
instruct the driver to not attempt to get host info from the system.peers table
727-
-cassandra-host-selection-policy string
728-
(default "tokenaware,hostpool-epsilon-greedy")
729-
-cassandra-host-verification
730-
host (hostname and server cert) verification when using SSL (default true)
731-
-cassandra-keyspace string
732-
cassandra keyspace to use for storing the metric data table (default "metrictank")
733-
-cassandra-omit-read-timeout string
734-
if a read is older than this, it will directly be omitted without executing (default "60s")
735-
-cassandra-password string
736-
password for authentication (default "cassandra")
737-
-cassandra-read-concurrency int
738-
max number of concurrent reads to cassandra. (default 20)
739-
-cassandra-read-queue-size int
740-
max number of outstanding reads before reads will be dropped. This is important if you run queries that result in many reads in parallel. (default 200000)
741-
-cassandra-retries int
742-
how many times to retry a query before failing it
743-
-cassandra-schema-file string
744-
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-store-cassandra.toml")
745-
-cassandra-ssl
746-
enable SSL connection to cassandra
747-
-cassandra-timeout string
748-
cassandra timeout (default "1s")
749-
-cassandra-username string
750-
username for authentication (default "cassandra")
751-
-cassandra-window-factor int
752-
size of compaction window relative to TTL (default 20)
753-
-cassandra-write-concurrency int
754-
max number of concurrent writes to cassandra. (default 10)
755-
-cql-protocol-version int
756-
cql protocol version to use (default 4)
705+
Usage of ./mt-whisper-importer-writer:
706+
-config string
707+
configuration file path (default "/etc/metrictank/metrictank.ini")
757708
-exit-on-error
758-
Exit with a message when there's an error (default true)
709+
Exit with a message when there's an error
759710
-http-endpoint string
760711
The http endpoint to listen on (default "127.0.0.1:8080")
712+
-log-level string
713+
log level. panic|fatal|error|warning|info|debug (default "info")
761714
-num-partitions int
762715
Number of Partitions (default 1)
763-
-overwrite-chunks
764-
If true existing chunks may be overwritten (default true)
765716
-partition-scheme string
766717
method used for partitioning metrics. This should match the settings of tsdb-gw. (byOrg|bySeries) (default "bySeries")
767718
-ttls string
768719
list of ttl strings used by MT separated by ',' (default "35d")
769720
-uri-path string
770721
the URI on which we expect chunks to get posted (default "/chunks")
771-
-verbose
772-
More detailed logging
773-
774-
idxtype: only 'cass' supported for now
775-
776-
cass config flags:
777-
778-
-auth
779-
enable cassandra user authentication
780-
-ca-path string
781-
cassandra CA certficate path when using SSL (default "/etc/metrictank/ca.pem")
782-
-consistency string
783-
write consistency (any|one|two|three|quorum|all|local_quorum|each_quorum|local_one (default "one")
784-
-create-keyspace
785-
enable the creation of the index keyspace and tables, only one node needs this (default true)
786-
-disable-initial-host-lookup
787-
instruct the driver to not attempt to get host info from the system.peers table
788-
-enabled
789-
(default true)
790-
-host-verification
791-
host (hostname and server cert) verification when using SSL (default true)
792-
-hosts string
793-
comma separated list of cassandra addresses in host:port form (default "localhost:9042")
794-
-init-load-concurrency int
795-
Number of partitions to load concurrently on startup. (default 1)
796-
-keyspace string
797-
Cassandra keyspace to store metricDefinitions in. (default "metrictank")
798-
-num-conns int
799-
number of concurrent connections to cassandra (default 10)
800-
-password string
801-
password for authentication (default "cassandra")
802-
-protocol-version int
803-
cql protocol version to use (default 4)
804-
-prune-interval duration
805-
Interval at which the index should be checked for stale series. (default 3h0m0s)
806-
-schema-file string
807-
File containing the needed schemas in case database needs initializing (default "/etc/metrictank/schema-idx-cassandra.toml")
808-
-ssl
809-
enable SSL connection to cassandra
810-
-timeout duration
811-
cassandra request timeout (default 1s)
812-
-update-cassandra-index
813-
synchronize index changes to cassandra. not all your nodes need to do this. (default true)
814-
-update-interval duration
815-
frequency at which we should update the metricDef lastUpdate field, use 0s for instant updates (default 3h0m0s)
816-
-username string
817-
username for authentication (default "cassandra")
818-
-write-queue-size int
819-
Max number of metricDefs allowed to be unwritten to cassandra (default 100000)
820-
821-
EXAMPLES:
822-
mt-whisper-importer-writer -cassandra-addrs=192.168.0.1 -cassandra-keyspace=mydata -exit-on-error=true -fake-avg-aggregates=true -http-endpoint=0.0.0.0:8080 -num-partitions=8 -partition-scheme=bySeries -ttls=8d,2y -uri-path=/chunks -verbose=true -cassandra-window-factor=20 cass -hosts=192.168.0.1:9042 -keyspace=mydata
823722
```
824723

0 commit comments

Comments
 (0)