From 62f3b4486a644adc4c0c632ab4cfc03ea5123c62 Mon Sep 17 00:00:00 2001 From: David Norton Date: Thu, 26 May 2016 12:32:56 -0400 Subject: [PATCH] batch SELECT INTO writes --- CHANGELOG.md | 1 + coordinator/points_writer_test.go | 118 ++++++++++++++++++++++++++++++ coordinator/statement_executor.go | 101 +++++++++++++++++++++++-- 3 files changed, 213 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d99e7e7715a..43f3fbe41a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ - [#6727](https://github.com/influxdata/influxdb/issues/6727): queries with strings that look like dates end up with date types, not string types - [#6250](https://github.com/influxdata/influxdb/issues/6250): Slow startup time - [#6753](https://github.com/influxdata/influxdb/issues/6753): Prevent panic if there are no values. +- [#6685](https://github.com/influxdata/influxdb/issues/6685): Batch SELECT INTO / CQ writes ## v0.13.0 [2016-05-12] diff --git a/coordinator/points_writer_test.go b/coordinator/points_writer_test.go index 6e0cc7a7976..511b48b2df2 100644 --- a/coordinator/points_writer_test.go +++ b/coordinator/points_writer_test.go @@ -251,6 +251,124 @@ func TestPointsWriter_WritePoints(t *testing.T) { } } +type fakePointsWriter struct { + WritePointsIntoFn func(*coordinator.IntoWriteRequest) error +} + +func (f *fakePointsWriter) WritePointsInto(req *coordinator.IntoWriteRequest) error { + return f.WritePointsIntoFn(req) +} + +func TestBufferedPointsWriter(t *testing.T) { + db := "db0" + rp := "rp0" + capacity := 10000 + + writePointsIntoCnt := 0 + pointsWritten := []models.Point{} + + reset := func() { + writePointsIntoCnt = 0 + pointsWritten = pointsWritten[:0] + } + + fakeWriter := &fakePointsWriter{ + WritePointsIntoFn: func(req *coordinator.IntoWriteRequest) error { + writePointsIntoCnt++ + pointsWritten = append(pointsWritten, req.Points...) + return nil + }, + } + + w := coordinator.NewBufferedPointsWriter(fakeWriter, db, rp, capacity) + + // Test that capacity and length are correct for new buffered writer. + if w.Cap() != capacity { + t.Fatalf("exp %d, got %d", capacity, w.Cap()) + } else if w.Len() != 0 { + t.Fatalf("exp %d, got %d", 0, w.Len()) + } + + // Test flushing an empty buffer. + if err := w.Flush(); err != nil { + t.Fatal(err) + } else if writePointsIntoCnt > 0 { + t.Fatalf("exp 0, got %d", writePointsIntoCnt) + } + + // Test writing zero points. + if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ + Database: db, + RetentionPolicy: rp, + Points: []models.Point{}, + }); err != nil { + t.Fatal(err) + } else if writePointsIntoCnt > 0 { + t.Fatalf("exp 0, got %d", writePointsIntoCnt) + } else if w.Len() > 0 { + t.Fatalf("exp 0, got %d", w.Len()) + } + + // Test writing single large bunch of points points. + req := coordinator.WritePointsRequest{ + Database: db, + RetentionPolicy: rp, + } + + numPoints := int(float64(capacity) * 5.5) + for i := 0; i < numPoints; i++ { + req.AddPoint("cpu", float64(i), time.Unix(0, 0).Add(time.Duration(i)*time.Second), nil) + } + + r := coordinator.IntoWriteRequest(req) + if err := w.WritePointsInto(&r); err != nil { + t.Fatal(err) + } else if writePointsIntoCnt != 5 { + t.Fatalf("exp 5, got %d", writePointsIntoCnt) + } else if w.Len() != capacity/2 { + t.Fatalf("exp %d, got %d", capacity/2, w.Len()) + } else if len(pointsWritten) != numPoints-capacity/2 { + t.Fatalf("exp %d, got %d", numPoints-capacity/2, len(pointsWritten)) + } + + if err := w.Flush(); err != nil { + t.Fatal(err) + } else if writePointsIntoCnt != 6 { + t.Fatalf("exp 6, got %d", writePointsIntoCnt) + } else if w.Len() != 0 { + t.Fatalf("exp 0, got %d", w.Len()) + } else if len(pointsWritten) != numPoints { + t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) + } else if !reflect.DeepEqual(r.Points, pointsWritten) { + t.Fatal("points don't match") + } + + reset() + + // Test writing points one at a time. + for i, _ := range r.Points { + if err := w.WritePointsInto(&coordinator.IntoWriteRequest{ + Database: db, + RetentionPolicy: rp, + Points: r.Points[i : i+1], + }); err != nil { + t.Fatal(err) + } + } + + if err := w.Flush(); err != nil { + t.Fatal(err) + } else if writePointsIntoCnt != 6 { + t.Fatalf("exp 6, got %d", writePointsIntoCnt) + } else if w.Len() != 0 { + t.Fatalf("exp 0, got %d", w.Len()) + } else if len(pointsWritten) != numPoints { + t.Fatalf("exp %d, got %d", numPoints, len(pointsWritten)) + } else if !reflect.DeepEqual(r.Points, pointsWritten) { + t.Fatal("points don't match") + } +} + var shardID uint64 type fakeShardWriter struct { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index 16c841aff6e..c4906280524 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -17,6 +17,10 @@ import ( "github.com/influxdata/influxdb/tsdb" ) +type pointsWriter interface { + WritePointsInto(*IntoWriteRequest) error +} + // StatementExecutor executes a statement in the query. type StatementExecutor struct { MetaClient MetaClient @@ -28,9 +32,7 @@ type StatementExecutor struct { Monitor *monitor.Monitor // Used for rewriting points back into system for SELECT INTO statements. - PointsWriter interface { - WritePointsInto(*IntoWriteRequest) error - } + PointsWriter pointsWriter // Select statement limits MaxSelectPointN int @@ -507,6 +509,12 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // Emit rows to the results channel. var writeN int64 var emitted bool + + var pointsWriter *BufferedPointsWriter + if stmt.Target != nil { + pointsWriter = NewBufferedPointsWriter(e.PointsWriter, stmt.Target.Measurement.Database, stmt.Target.Measurement.RetentionPolicy, 10000) + } + for { row, err := em.Emit() if err != nil { @@ -523,7 +531,7 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // Write points back into system for INTO statements. if stmt.Target != nil { - if err := e.writeInto(stmt, row); err != nil { + if err := e.writeInto(pointsWriter, stmt, row); err != nil { return err } writeN += int64(len(row.Values)) @@ -545,8 +553,12 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen emitted = true } - // Emit write count if an INTO statement. + // Flush remaing points and emit write count if an INTO statement. if stmt.Target != nil { + if err := pointsWriter.Flush(); err != nil { + return err + } + var messages []*influxql.Message if ctx.ReadOnly { messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) @@ -779,7 +791,82 @@ func (e *StatementExecutor) executeShowUsersStatement(q *influxql.ShowUsersState return []*models.Row{row}, nil } -func (e *StatementExecutor) writeInto(stmt *influxql.SelectStatement, row *models.Row) error { +type BufferedPointsWriter struct { + w pointsWriter + buf []models.Point + database string + retentionPolicy string +} + +func NewBufferedPointsWriter(w pointsWriter, database, retentionPolicy string, capacity int) *BufferedPointsWriter { + return &BufferedPointsWriter{ + w: w, + buf: make([]models.Point, 0, capacity), + database: database, + retentionPolicy: retentionPolicy, + } +} + +func (w *BufferedPointsWriter) WritePointsInto(req *IntoWriteRequest) error { + // Make sure we're buffering points only for the expected destination. + if req.Database != w.database || req.RetentionPolicy != w.retentionPolicy { + return fmt.Errorf("writer for %s.%s can't write into %s.%s", w.database, w.retentionPolicy, req.Database, req.RetentionPolicy) + } + + for i := 0; i < len(req.Points); { + // Get the available space in the buffer. + avail := cap(w.buf) - len(w.buf) + + // Calculate number of points to copy into the buffer. + n := len(req.Points[i:]) + if n > avail { + n = avail + } + + // Copy points into buffer. + w.buf = append(w.buf, req.Points[i:n+i]...) + + // Advance the index by number of points copied. + i += n + + // If buffer is full, flush points to underlying writer. + if len(w.buf) == cap(w.buf) { + if err := w.Flush(); err != nil { + return err + } + } + } + + return nil +} + +// Flush writes all buffered points to the underlying writer. +func (w *BufferedPointsWriter) Flush() error { + if len(w.buf) == 0 { + return nil + } + + if err := w.w.WritePointsInto(&IntoWriteRequest{ + Database: w.database, + RetentionPolicy: w.retentionPolicy, + Points: w.buf, + }); err != nil { + return err + } + + // Clear the buffer. + w.buf = w.buf[:0] + + return nil +} + +// Len returns the number of points buffered. +func (w *BufferedPointsWriter) Len() int { return len(w.buf) } + +// Cap returns the capacity (in points) of the buffer. +func (w *BufferedPointsWriter) Cap() int { return cap(w.buf) } + +func (e *StatementExecutor) writeInto(w pointsWriter, stmt *influxql.SelectStatement, row *models.Row) error { if stmt.Target.Measurement.Database == "" { return errNoDatabaseInTarget } @@ -801,7 +888,7 @@ func (e *StatementExecutor) writeInto(stmt *influxql.SelectStatement, row *model return err } - if err := e.PointsWriter.WritePointsInto(&IntoWriteRequest{ + if err := w.WritePointsInto(&IntoWriteRequest{ Database: stmt.Target.Measurement.Database, RetentionPolicy: stmt.Target.Measurement.RetentionPolicy, Points: points,