Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batching to Graphite inputs #2683

Merged
merged 6 commits into from
May 28, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
- [1997](https://github.com/influxdb/influxdb/pull/1997): Update SELECT * to return tag values.
- [2599](https://github.com/influxdb/influxdb/issues/2599): Add "epoch" URL param and return JSON time values as epoch instead of date strings.
- [2682](https://github.com/influxdb/influxdb/issues/2682): Adding pr checklist to CONTRIBUTING.md
- [2683](https://github.com/influxdb/influxdb/issues/2683): Add batching support to Graphite inputs.

### Bugfixes
- [2635](https://github.com/influxdb/influxdb/issues/2635): Fix querying against boolean field in WHERE clause.
52 changes: 30 additions & 22 deletions batcher.go
Original file line number Diff line number Diff line change
@@ -27,38 +27,46 @@ type PointBatcherStats struct {
TimeoutTotal uint64 // Nubmer of timeouts that occurred.
}

// Start starts the batching process. It should be called from a goroutine.
func (b *PointBatcher) Start(in <-chan Point, out chan<- []Point) {
// Start starts the batching process. Returns the in and out channels for points
// and point-batches respectively.
func (b *PointBatcher) Start() (chan<- Point, <-chan []Point) {
var timer *time.Timer
var batch []Point
var timerCh <-chan time.Time

for {
select {
case p := <-in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]Point, 0, b.size)
timer = time.NewTimer(b.duration)
timerCh = timer.C
}
in := make(chan Point)
out := make(chan []Point)

go func() {
for {
select {
case p := <-in:
atomic.AddUint64(&b.stats.PointTotal, 1)
if batch == nil {
batch = make([]Point, 0, b.size)
timer = time.NewTimer(b.duration)
timerCh = timer.C
}

batch = append(batch, p)
if len(batch) >= b.size { // 0 means send immediately.
atomic.AddUint64(&b.stats.SizeTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
timerCh = nil
}

batch = append(batch, p)
if len(batch) == b.size {
atomic.AddUint64(&b.stats.SizeTotal, 1)
case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
timerCh = nil
}

case <-timerCh:
atomic.AddUint64(&b.stats.TimeoutTotal, 1)
out <- batch
atomic.AddUint64(&b.stats.BatchTotal, 1)
batch = nil
}
}
}()

return in, out
}

// Stats returns a PointBatcherStats object for the PointBatcher. While the each statistic should be
15 changes: 3 additions & 12 deletions batcher_test.go
Original file line number Diff line number Diff line change
@@ -13,10 +13,7 @@ func TestBatch_Size(t *testing.T) {
t.Fatal("failed to create batcher for size test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
go func() {
@@ -39,10 +36,7 @@ func TestBatch_Timeout(t *testing.T) {
t.Fatal("failed to create batcher for timeout test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
go func() {
@@ -65,10 +59,7 @@ func TestBatch_MultipleBatches(t *testing.T) {
t.Fatal("failed to create batcher for size test")
}

in := make(chan Point)
out := make(chan []Point)

go batcher.Start(in, out)
in, out := batcher.Start()

var p Point
var b []Point
3 changes: 3 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
@@ -510,6 +510,9 @@ type Graphite struct {
Protocol string `toml:"protocol"`
NamePosition string `toml:"name-position"`
NameSeparator string `toml:"name-separator"`

BatchSize int `toml:"batch-size"`
BatchTimeout Duration `toml:"batch-timeout"`
}

// ConnnectionString returns the connection string for this Graphite config in the form host:port.
4 changes: 4 additions & 0 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
@@ -414,6 +414,10 @@ func (cmd *RunCommand) Open(config *Config, join string) *Node {
log.Fatalf("failed to initialize %s Graphite server: %s", graphiteConfig.Protocol, err.Error())
}

// Configure batching.
g.SetBatchSize(graphiteConfig.BatchSize)
g.SetBatchTimeout(time.Duration(graphiteConfig.BatchTimeout))

err = g.ListenAndServe(graphiteConfig.ConnectionString())
if err != nil {
log.Fatalf("failed to start %s Graphite server: %s", graphiteConfig.Protocol, err.Error())
2 changes: 2 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
@@ -48,6 +48,8 @@ enabled = false
# name-position = "last"
# name-separator = "-"
# database = "" # store graphite data in this database
# batch-size = 0 # How many points to batch up internally before writing.
# batch-timeout = 0ms # Maximum time to wait before sending batch, regardless of current size.

# Configure the collectd input.
[collectd]
2 changes: 2 additions & 0 deletions graphite/graphite.go
Original file line number Diff line number Diff line change
@@ -38,6 +38,8 @@ type SeriesWriter interface {

// Server defines the interface all Graphite servers support.
type Server interface {
SetBatchSize(sz int)
SetBatchTimeout(t time.Duration)
ListenAndServe(iface string) error
Host() string
Close() error
42 changes: 34 additions & 8 deletions graphite/graphite_tcp.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb"
)
@@ -16,9 +17,12 @@ type TCPServer struct {
writer SeriesWriter
parser *Parser
database string
listener *net.Listener

wg sync.WaitGroup
batchSize int
batchTimeout time.Duration

listener *net.Listener
wg sync.WaitGroup

Logger *log.Logger

@@ -36,6 +40,9 @@ func NewTCPServer(p *Parser, w SeriesWriter, db string) *TCPServer {
}
}

func (t *TCPServer) SetBatchSize(sz int) { t.batchSize = sz }
func (t *TCPServer) SetBatchTimeout(d time.Duration) { t.batchTimeout = d }

// ListenAndServe instructs the TCPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port
func (t *TCPServer) ListenAndServe(iface string) error {
@@ -95,11 +102,35 @@ func (t *TCPServer) handleConnection(conn net.Conn) {
defer conn.Close()
defer t.wg.Done()

batcher := influxdb.NewPointBatcher(t.batchSize, t.batchTimeout)
in, out := batcher.Start()
reader := bufio.NewReader(conn)

// Start processing batches.
var wg sync.WaitGroup
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case batch := <-out:
_, e := t.writer.WriteSeries(t.database, "", batch)
if e != nil {
t.Logger.Printf("failed to write point batch to database %q: %s\n", t.database, e)
}
case <-done:
return
}
}
}()

for {
// Read up to the next newline.
buf, err := reader.ReadBytes('\n')
if err != nil {
close(done)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need some type of flusher on the batcher so that when we shut down we can ensure it drains the batch before we exit?

wg.Wait()
return
}

@@ -112,11 +143,6 @@ func (t *TCPServer) handleConnection(conn net.Conn) {
t.Logger.Printf("unable to parse data: %s", err)
continue
}

// Send the data to the writer.
_, e := t.writer.WriteSeries(t.database, "", []influxdb.Point{point})
if e != nil {
t.Logger.Printf("failed to write data point to database %q: %s\n", t.database, e)
}
in <- point
}
}
38 changes: 32 additions & 6 deletions graphite/graphite_udp.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb"
)
@@ -23,6 +24,9 @@ type UDPServer struct {
addr *net.UDPAddr
wg sync.WaitGroup

batchSize int
batchTimeout time.Duration

Logger *log.Logger

host string
@@ -39,6 +43,9 @@ func NewUDPServer(p *Parser, w SeriesWriter, db string) *UDPServer {
return &u
}

func (u *UDPServer) SetBatchSize(sz int) { u.batchSize = sz }
func (u *UDPServer) SetBatchTimeout(d time.Duration) { u.batchTimeout = d }

// ListenAndServer instructs the UDPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port.
func (u *UDPServer) ListenAndServe(iface string) error {
@@ -60,26 +67,45 @@ func (u *UDPServer) ListenAndServe(iface string) error {

u.host = u.addr.String()

batcher := influxdb.NewPointBatcher(u.batchSize, u.batchTimeout)
in, out := batcher.Start()

// Start processing batches.
var wg sync.WaitGroup
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case batch := <-out:
_, e := u.writer.WriteSeries(u.database, "", batch)
if e != nil {
u.Logger.Printf("failed to write point batch to database %q: %s\n", u.database, e)
}
case <-done:
return
}
}
}()

buf := make([]byte, udpBufferSize)
u.wg.Add(1)
go func() {
defer u.wg.Done()
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
close(done)
wg.Wait()
return
}
for _, line := range strings.Split(string(buf[:n]), "\n") {
point, err := u.parser.Parse(line)
if err != nil {
continue
}

// Send the data to the writer.
_, e := u.writer.WriteSeries(u.database, "", []influxdb.Point{point})
if e != nil {
u.Logger.Printf("failed to write data point: %s\n", e)
}
in <- point
}
}
}()