Skip to content

Commit

Permalink
add database/retention policy context switching. log failed imports
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Jul 29, 2015
1 parent c41ad57 commit fc46fd3
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions cmd/influx/importer/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ func NewV8Config(username, password, precision, writeConsistency, file, version
}

type V8 struct {
client *client.Client
database string
retentionPolicy string
config *V8Config
wg sync.WaitGroup
line, command chan string
done chan struct{}
batch []string
totalInserts, totalCommands int
client *client.Client
database string
retentionPolicy string
config *V8Config
wg sync.WaitGroup
line, command chan string
done chan struct{}
batch []string
totalInserts, failedInserts, totalCommands int
}

func NewV8(config *V8Config) *V8 {
Expand Down Expand Up @@ -82,6 +82,7 @@ func (v8 *V8) Import() error {
if v8.totalInserts > 0 {
log.Printf("Processed %d commands\n", v8.totalCommands)
log.Printf("Processed %d inserts\n", v8.totalInserts)
log.Printf("Failed %d inserts\n", v8.failedInserts)
}
}()

Expand Down Expand Up @@ -149,6 +150,12 @@ func (v8 *V8) processDDL(scanner *bufio.Scanner) {
func (v8 *V8) processDML(scanner *bufio.Scanner) {
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "# CONTEXT-DATABASE:") {
v8.database = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "# CONTEXT-RETENTION-POLICY:") {
v8.retentionPolicy = strings.TrimSpace(strings.Split(line, ":")[1])
}
if strings.HasPrefix(line, "#") {
continue
}
Expand Down Expand Up @@ -189,8 +196,12 @@ func (v8 *V8) batchAccumulator() {
case l := <-v8.line:
v8.batch = append(v8.batch, l)
if len(v8.batch) == batchSize {
v8.batchWrite()
v8.totalInserts += len(v8.batch)
if e := v8.batchWrite(); e != nil {
log.Println("error writing batch: ", e)
v8.failedInserts += len(v8.batch)
} else {
v8.totalInserts += len(v8.batch)
}
v8.batch = v8.batch[:0]
}
case <-v8.done:
Expand All @@ -200,9 +211,7 @@ func (v8 *V8) batchAccumulator() {
}
}

func (v8 *V8) batchWrite() {
func (v8 *V8) batchWrite() error {
_, e := v8.client.WriteLineProtocol(strings.Join(v8.batch, "\n"), v8.database, v8.retentionPolicy, v8.config.precision, v8.config.writeConsistency)
if e != nil {
log.Println("error writing batch: ", e)
}
return e
}

0 comments on commit fc46fd3

Please sign in to comment.