Skip to content

Commit

Permalink
Merge pull request #3795 from influxdb/throttle-import
Browse files Browse the repository at this point in the history
Throttle import
  • Loading branch information
corylanou committed Aug 22, 2015
2 parents a9e3b9d + 6d5d697 commit 3036940
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ Please see the *Features* section below for full details.
- [#3721](https://github.com/influxdb/influxdb/pull/3721): interpret number literals compared against time as nanoseconds from epoch
- [#3514](https://github.com/influxdb/influxdb/issues/3514): Implement WAL outside BoltDB with compaction
- [#3544](https://github.com/influxdb/influxdb/pull/3544): Implement compression on top of BoltDB
- [#3795](https://github.com/influxdb/influxdb/pull/3795): Throttle import

### Bugfixes
- [#3405](https://github.com/influxdb/influxdb/pull/3405): Prevent database panic when fields are missing. Thanks @jhorwit2
Expand Down
14 changes: 12 additions & 2 deletions cmd/influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ var (
)

const (
default_format = "column"
// defaultFormat is the default format of the results when issuing queries
defaultFormat = "column"

// defaultPPS is the default points per second that the import will throttle at
// by default it's 0, which means it will not throttle
defaultPPS = 0
)

type CommandLine struct {
Expand All @@ -46,6 +51,7 @@ type CommandLine struct {
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
}
Expand All @@ -60,11 +66,12 @@ func main() {
fs.StringVar(&c.Password, "password", c.Password, `Password to connect to the server. Leaving blank will prompt for password (--password="").`)
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.")
fs.StringVar(&c.Format, "format", default_format, "Format specifies the format of the server responses: json, csv, or column.")
fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.")
fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.")
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
fs.BoolVar(&c.Import, "import", false, "Import a previous database.")
fs.IntVar(&c.PPS, "pps", defaultPPS, "How many points per second the import will allow. By default it is zero and will not throttle importing.")
fs.StringVar(&c.Path, "path", "", "path to the file to import")
fs.BoolVar(&c.Compressed, "compressed", false, "set to true if the import file is compressed")

Expand Down Expand Up @@ -93,6 +100,8 @@ func main() {
Turns on pretty print for the json format.
-import
Import a previous database export from file
-pps
How many points per second the import will allow. By default it is zero and will not throttle importing.
-path
Path to file to import
-compressed
Expand Down Expand Up @@ -169,6 +178,7 @@ Examples:
config.Version = version
config.URL = u
config.Compressed = c.Compressed
config.PPS = c.PPS

i := v8.NewImporter(config)
if err := i.Import(); err != nil {
Expand Down
71 changes: 60 additions & 11 deletions importer/v8/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/url"
"os"
"strings"
"time"

"github.com/influxdb/influxdb/client"
)
Expand All @@ -25,6 +26,7 @@ type Config struct {
Path string
Version string
Compressed bool
PPS int
}

// NewConfig returns an initialized *Config
Expand All @@ -34,14 +36,17 @@ func NewConfig() *Config {

// Importer is the importer used for importing 0.8 data
type Importer struct {
client *client.Client
database string
retentionPolicy string
config *Config
batch []string
totalInserts int
failedInserts int
totalCommands int
client *client.Client
database string
retentionPolicy string
config *Config
batch []string
totalInserts int
failedInserts int
totalCommands int
throttlePointsWritten int
lastWrite time.Time
throttle *time.Ticker
}

// NewImporter will return an intialized Importer struct
Expand Down Expand Up @@ -108,8 +113,18 @@ func (i *Importer) Import() error {
// Get our reader
scanner := bufio.NewScanner(r)

// Process the scanner
// Process the DDL
i.processDDL(scanner)

// Set up our throttle channel. Since there is effectively no other activity at this point
// the smaller resolution gets us much closer to the requested PPS
i.throttle = time.NewTicker(time.Microsecond)
defer i.throttle.Stop()

// Prime the last write
i.lastWrite = time.Now()

// Process the DML
i.processDML(scanner)

// Check if we had any errors scanning the file
Expand All @@ -135,6 +150,7 @@ func (i *Importer) processDDL(scanner *bufio.Scanner) {
}

func (i *Importer) processDML(scanner *bufio.Scanner) {
start := time.Now()
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
Expand All @@ -146,7 +162,7 @@ func (i *Importer) processDML(scanner *bufio.Scanner) {
if strings.HasPrefix(line, "#") {
continue
}
i.batchAccumulator(line)
i.batchAccumulator(line, start)
}
}

Expand All @@ -166,7 +182,7 @@ func (i *Importer) queryExecutor(command string) {
i.execute(command)
}

func (i *Importer) batchAccumulator(line string) {
func (i *Importer) batchAccumulator(line string, start time.Time) {
i.batch = append(i.batch, line)
if len(i.batch) == batchSize {
if e := i.batchWrite(); e != nil {
Expand All @@ -178,10 +194,43 @@ func (i *Importer) batchAccumulator(line string) {
i.totalInserts += len(i.batch)
}
i.batch = i.batch[:0]
// Give some status feedback every 100000 lines processed
processed := i.totalInserts + i.failedInserts
if processed%100000 == 0 {
since := time.Since(start)
pps := float64(processed) / since.Seconds()
log.Printf("Processed %d lines. Time elapsed: %s. Points per second (PPS): %d", processed, since.String(), int64(pps))
}
}
}

func (i *Importer) batchWrite() error {
// Accumulate the batch size to see how many points we have written this second
i.throttlePointsWritten += len(i.batch)

// Find out when we last wrote data
since := time.Since(i.lastWrite)

// Check to see if we've exceeded our points per second for the current timeframe
var currentPPS int
if since.Seconds() > 0 {
currentPPS = int(float64(i.throttlePointsWritten) / since.Seconds())
} else {
currentPPS = i.throttlePointsWritten
}

// If our currentPPS is greater than the PPS specified, then we wait and retry
if int(currentPPS) > i.config.PPS && i.config.PPS != 0 {
// Wait for the next tick
<-i.throttle.C

// Decrement the batch size back out as it is going to get called again
i.throttlePointsWritten -= len(i.batch)
return i.batchWrite()
}

_, e := i.client.WriteLineProtocol(strings.Join(i.batch, "\n"), i.database, i.retentionPolicy, i.config.Precision, i.config.WriteConsistency)
i.throttlePointsWritten = 0
i.lastWrite = time.Now()
return e
}

0 comments on commit 3036940

Please sign in to comment.