Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added new rp and meas flags as filters, changed internal structs for meas and fields #14

Merged
merged 2 commits into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 92 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ Allows the user to copy DB schemas from DB1 to DB2. DB schema are DBs and RPs.
___Syntax___

```
./bin/syncflux -action replicaschema [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-newrp <newrp_name>]
./bin/syncflux -action replicaschema [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-rp <rp_regex_selector>] [-newrp <newrp_name>] [-meas <meas_regex_selector>]
```

___Description of syntax___
Expand Down Expand Up @@ -282,7 +282,7 @@ Influx02 schema
```


*Example 3*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3)
*Example 3*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from rp1

```
Influx01 schema
Expand All @@ -297,7 +297,7 @@ Influx01 schema
```

```bash
./bin/syncflux -action "replicaschema" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3"
./bin/syncflux -action "replicaschema" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -rp "^rp1$"
```

The result will be that the schema of Influx01 will be replicated on Influx02
Expand All @@ -307,10 +307,9 @@ Influx02 schema
----------------
|-- db3
|-- rp1*
|-- rp2
```

*Example 4*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and set the defaultrp to rp3
*Example 4*: Copy schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and set the defaultrp to rp3

```bash
Influx01 schema
Expand Down Expand Up @@ -338,6 +337,23 @@ Influx02 schema
|-- rp2
```

*Example 5*: Copy data and schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*"

```bash
Influx01 schema
----------------

|-- db1
|-- rp1*
|-- cpu
|-- mem
|-- swap
|-- ...
|-- rp2
|-- db2
|-- rp1*
|-- rp2
```

#### Copy data

Expand All @@ -347,7 +363,7 @@ Allows the user to copy DB data from master to slave. DB schema are DBs and RPs.
___Syntax___

```
./bin/syncflux -action copy [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-newrp <newrp_name>] { [-start <start_time>] [-endtime <end_time>] , [-full] }
./bin/syncflux -action copy [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-rp <rp_regex_selector>] [-newrp <newrp_name>] [-meas <meas_regex_selector>] { [-start <start_time>] [-endtime <end_time>] , [-full] }
```

___Description of syntax___
Expand Down Expand Up @@ -397,7 +413,7 @@ Influx02 schema
```


*Example 2*: Copy data from Influx01-DB1 to Influx02 on a time window
*Example 2*: Copy data from Influx01-DB1 to Influx02 on a time window and only from rp1

```bash
Influx01 schema
Expand All @@ -412,10 +428,10 @@ Influx01 schema
```

```bash
./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -start -10h end -5h
./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -rp "^rp1$" -start -10h end -5h
```

The command above will repicate all data from Influx01 to InfluxDB but only from db1 and with a time window from -10h to -5h
The command above will repicate all data from Influx01 to InfluxDB but only from db1.rp1 and with a time window from -10h to -5h

```bash
Influx02 schema
Expand Down Expand Up @@ -482,6 +498,40 @@ Influx02 schema
|-- rp2
```

*Example 5*: Copy data from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*"

```bash
Influx01 schema
----------------

|-- db1
|-- rp1*
|-- cpu
|-- mem
|-- swap
|-- ...
|-- rp2
|-- db2
|-- rp1*
|-- rp2
```


```bash
./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -mes "cpu.*"
```

The command above will replicate all data from Influx01-db1 to InfluxDB on a new DB called 'db3' and a new defaultrp called rp3

```bash
Influx02 schema
----------------
|-- db3
|-- rp3*
|-- cpu
|-- rp2
```

#### Copy data + schema

Allows the user to copy DB data from master to slave. DB schema are DBs and RPs.
Expand All @@ -490,7 +540,7 @@ Allows the user to copy DB data from master to slave. DB schema are DBs and RPs.
___Syntax___

```
./bin/syncflux -action fullcopy [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-newrp <newrp_name>] { [-start <start_time>] [-endtime <end_time>] , [-full] }
./bin/syncflux -action fullcopy [-master <master_id>] [-slave <slave_id>] [-db <db_regex_selector>] [-newdb <newdb_name>] [-rp <rp_regex_selector>] [-newrp <newrp_name>] [-meas <meas_regex_selector>] { [-start <start_time>] [-endtime <end_time>] , [-full] }
```

___Description of syntax___
Expand Down Expand Up @@ -628,7 +678,39 @@ Influx02 schema
|-- rp2
```

*Example 5*: Copy data and schema from Influx01-DB1 to Influx02-DB3 (new db called DB3) and only from meas "cpu.*"

```bash
Influx01 schema
----------------

|-- db1
|-- rp1*
|-- cpu
|-- mem
|-- swap
|-- ...
|-- rp2
|-- db2
|-- rp1*
|-- rp2
```


```bash
./bin/syncflux -action "copy" -master "influx01" -slave "influx02" -db "^db1$" -newdb "db3" -mes "cpu.*"
```

The command above will create the schema and will replicate all data from Influx01-db1 to InfluxDB on a new DB called 'db3' and a new defaultrp called rp3

```bash
Influx02 schema
----------------
|-- db3
|-- rp3*
|-- cpu
|-- rp2
```

### Run as a HA Cluster monitor

Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ func initCluster(master string, slave string) *HACluster {
}
}

func ReplSch(master string, slave string, dbs string, newdb string, newrp string) {
func ReplSch(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string) {

Cluster = initCluster(master, slave)

schema, err := Cluster.GetSchema(dbs)
schema, err := Cluster.GetSchema(dbs, rps, meas)
if err != nil {
log.Errorf("Can not copy data , error on get Schema: %s", err)
return
Expand All @@ -173,11 +173,11 @@ func ReplSch(master string, slave string, dbs string, newdb string, newrp string

}

func SchCopy(master string, slave string, dbs string, newdb string, newrp string, start time.Time, end time.Time, full bool) {
func SchCopy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) {

Cluster = initCluster(master, slave)

schema, err := Cluster.GetSchema(dbs)
schema, err := Cluster.GetSchema(dbs, rps, meas)
if err != nil {
log.Errorf("Can not copy data , error on get Schema: %s", err)
return
Expand Down Expand Up @@ -207,11 +207,11 @@ func SchCopy(master string, slave string, dbs string, newdb string, newrp string

}

func Copy(master string, slave string, dbs string, newdb string, newrp string, start time.Time, end time.Time, full bool) {
func Copy(master string, slave string, dbs string, newdb string, rps string, newrp string, meas string, start time.Time, end time.Time, full bool) {

Cluster = initCluster(master, slave)

schema, err := Cluster.GetSchema(dbs)
schema, err := Cluster.GetSchema(dbs, rps, meas)
if err != nil {
log.Errorf("Can not copy data , error on get Schema: %s", err)
return
Expand Down Expand Up @@ -243,7 +243,7 @@ func HAMonitorStart(master string, slave string) {

Cluster = initCluster(master, slave)

schema, _ := Cluster.GetSchema("")
schema, _ := Cluster.GetSchema("", "", "")

switch MainConfig.General.InitialReplication {
case "schema":
Expand Down
28 changes: 14 additions & 14 deletions pkg/agent/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,11 @@ func GetRetentionPolicies(con client.Client, db string) ([]*RetPol, error) {
return rparray, nil
}

func GetFields(c client.Client, sdb string, meas string) map[string]string {
ret := make(map[string]string)
func GetFields(c client.Client, sdb string, meas string, defrp string) map[string]*FieldSch {

cmd := "show field keys from " + meas
fields := make(map[string]*FieldSch)

cmd := "show field keys from \"" + defrp + "\"." + meas
//get measurements from database
q := client.Query{
Command: cmd,
Expand All @@ -258,16 +259,15 @@ func GetFields(c client.Client, sdb string, meas string) map[string]string {
for _, row := range values {
fieldname := row[0].(string)
fieldtype := row[1].(string)
ret[fieldname] = fieldtype
fields[fieldname] = &FieldSch{Name: fieldname, Type: fieldtype}
log.Debugf("Detected Field [%s] type [%s] on measurement [%s]", fieldname, fieldtype, meas)
}

}

return ret
return fields
}

func GetMeasurements(c client.Client, sdb string) []string {
func GetMeasurements(c client.Client, sdb string, mesafilter string) []*MeasurementSch {

cmd := "show measurements"
//get measurements from database
Expand All @@ -276,7 +276,7 @@ func GetMeasurements(c client.Client, sdb string) []string {
Database: sdb,
}

var measurements []string
var measurements []*MeasurementSch

response, err := c.Query(q)
if err != nil {
Expand All @@ -297,7 +297,8 @@ func GetMeasurements(c client.Client, sdb string) []string {

for _, row := range values {
measurement := fmt.Sprintf("%v", row[0])
measurements = append(measurements, measurement)
measurements = append(measurements, &MeasurementSch{Name: measurement, Fields: nil})

time.Sleep(3 * time.Millisecond)
}

Expand All @@ -323,7 +324,7 @@ func StrUnixNano2Time(tstamp string) (time.Time, error) {
return time.Unix(sec, nsec), nil
}

func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string]string) (client.BatchPoints, int64, error) {
func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string]*FieldSch) (client.BatchPoints, int64, error) {
var totalpoints int64
RWMaxRetries := MainConfig.General.RWMaxRetries
RWRetryDelay := MainConfig.General.RWRetryDelay
Expand Down Expand Up @@ -427,7 +428,7 @@ func ReadDB(c client.Client, sdb, srp, ddb, drp, cmd string, fieldmap map[string
switch vt := val.(type) {
case json.Number:
tp := fieldmap[ser.Columns[i]]
switch tp {
switch tp.Type {
case "float":
conv, err := vt.Float64()
if err != nil {
Expand Down Expand Up @@ -587,8 +588,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr
var totalpoints int64
totalpoints = 0

for m, sch := range dbschema.Ftypes {

for m, sch := range dbschema.Measurements {
m := m
sch := sch

Expand All @@ -597,7 +597,7 @@ func SyncDBRP(src *InfluxMonitor, dst *InfluxMonitor, sdb string, ddb string, sr
log.Tracef("Processing measurement %s with schema #%+v", m, sch)
log.Debugf("processing Database %s Measurement %s from %d to %d", sdb, m, startsec, endsec)
getvalues := fmt.Sprintf("select * from \"%v\" where time > %vs and time < %vs group by *", m, startsec, endsec)
batchpoints, np, err := ReadDB(src.cli, sdb, srp.Name, ddb, drp.Name, getvalues, sch)
batchpoints, np, err := ReadDB(src.cli, sdb, srp.Name, ddb, drp.Name, getvalues, sch.Fields)
if err != nil {
log.Errorf("error in read %s", err)
return
Expand Down
Loading