Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphite fixes3 #644

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 129 additions & 41 deletions api/graphite/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"net"
"strings"
"sync"
"time"

"github.com/influxdb/influxdb/cluster"
Expand All @@ -36,17 +37,49 @@ type Server struct {
conn net.Listener
udpConn *net.UDPConn
user *cluster.ClusterAdmin
shutdown chan bool
writeSeries chan Record
allCommitted chan bool
connClosed chan bool // lets us break from the Accept() loop, see http://zhen.org/blog/graceful-shutdown-of-go-net-dot-listeners/ (quit channel)
udpEnabled bool
writers sync.WaitGroup // allows us to make sure things writing to writeSeries are done (they do blocking calls to handleMessage()) whether udp or tcp
}

// holds a point to be added into series by Name
type Record struct {
Name string
*protocol.Point
}

// the ingest could in theory be anything from 1 series(datapoint) every few minutes, upto millions of datapoints every second.
// and there might be multiple connections, each delivering a certain (not necessarily equal) fraction of the total.
// we want ingested points to be ingested (flushed) quickly (let's say at least within 100ms), but since coordinator.WriteSeriesData
// comes with a cost, we also want to buffer up the data, the more the better (up to a limit).
// So basically we need to trade these concepts off against each other, by committing

// a batch of series every commit_max_wait ms or every commit_capacity datapoints, whichever is first.

// upto how many points/series to commit in 1 go?
// let's say buffer up to about 1MiB of data, at 24B per record -> 43690 records, let's make it an even 40k
const commit_capacity = 40000

// how long to wait max before flushing a commit payload
const commit_max_wait = 100 * time.Millisecond

// the write commit payload should get written in a timely fashion.
// if not, the channel that feeds the committer will queue up to max_queue series, and then
// block, creating backpressure to the client. This allows to keep receiving metrics while
// a flush is ongoing, but not much more than we can actually handle.
const max_queue = 20000

// TODO: check that database exists and create it if not
func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}
self.listenAddress = config.GraphitePortString()
self.database = config.GraphiteDatabase
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.writeSeries = make(chan Record, max_queue)
self.allCommitted = make(chan bool, 1)
self.connClosed = make(chan bool, 1)
self.clusterConfig = clusterConfig
self.udpEnabled = config.GraphiteUdpEnabled

Expand Down Expand Up @@ -76,100 +109,162 @@ func (self *Server) ListenAndServe() {
self.udpConn, _ = net.ListenUDP("udp", udpAddress)
go self.ServeUdp(self.udpConn)
}
go self.committer()
self.Serve(self.conn)
}

func (self *Server) Serve(listener net.Listener) {
// not really sure of the use of this shutdown channel,
// as all handling is done through goroutines. maybe we should use a waitgroup
defer func() { self.shutdown <- true }()

for {
conn_in, err := listener.Accept()
if err != nil {
log.Error("GraphiteServer: Accept: ", err)
select {
case <-self.connClosed:
break
default:
}
continue
}
self.writers.Add(1)
go self.handleClient(conn_in)
}
self.writers.Wait()
close(self.writeSeries)
}

func (self *Server) ServeUdp(conn *net.UDPConn) {
var buf []byte = make([]byte, 65536)
for {
n, _, err := conn.ReadFromUDP(buf)
if err != nil {
log.Warn("Error when reading from UDP connection %s", err.Error())
log.Warn("GraphiteServer: Error when reading from UDP connection %s", err.Error())
}
self.writers.Add(1)
go self.handleUdpMessage(string(buf[:n]))
}
}

func (self *Server) handleUdpMessage(msg string) {
defer self.writers.Done()
metrics := strings.Split(msg, "\n")
for _, metric := range metrics {
reader := bufio.NewReader(strings.NewReader(metric + "\n"))
go self.handleMessage(reader)
self.handleMessage(reader)
}
}

func (self *Server) Close() {
if self.udpConn != nil {
log.Info("GraphiteService: Closing graphite UDP listener")
log.Info("GraphiteServer: Closing graphite UDP listener")
self.udpConn.Close()
}
if self.conn != nil {
log.Info("GraphiteServer: Closing graphite server")
close(self.connClosed)
self.conn.Close()
log.Info("GraphiteServer: Waiting for all graphite requests to finish before killing the process")
select {
case <-time.After(time.Second * 5):
log.Error("GraphiteServer: There seems to be a hanging graphite request. Closing anyway")
case <-self.shutdown:
log.Error("GraphiteServer: There seems to be a hanging graphite request or data flush. Closing anyway")
case <-self.allCommitted:
log.Info("GraphiteServer shut down cleanly")
}
}
}

func (self *Server) writePoints(series *protocol.Series) error {
serie := []*protocol.Series{series}
err := self.coordinator.WriteSeriesData(self.user, self.database, serie)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, serie)
if err != nil {
log.Warn("GraphiteServer: failed to write series after getting new auth: %s", err.Error())
func (self *Server) committer() {
defer func() { self.allCommitted <- true }()

commit := func(toCommit map[string]*protocol.Series) {
if len(toCommit) == 0 {
return
}
commitPayload := make([]*protocol.Series, len(toCommit))
i := 0
for _, serie := range toCommit {
commitPayload[i] = serie
i++
}
log.Debug("GraphiteServer committing %d series", len(toCommit))
err := self.coordinator.WriteSeriesData(self.user, self.database, commitPayload)
if err != nil {
switch err.(type) {
case AuthorizationError:
// user information got stale, get a fresh one (this should happen rarely)
self.getAuth()
err = self.coordinator.WriteSeriesData(self.user, self.database, commitPayload)
if err != nil {
log.Warn("GraphiteServer: failed to write %d series after getting new auth: %s\n", len(toCommit), err.Error())
}
default:
log.Warn("GraphiteServer: failed write %d series: %s\n", len(toCommit), err.Error())
}
default:
log.Warn("GraphiteServer: failed write series: %s", err.Error())
}
}
return err

timer := time.NewTimer(commit_max_wait)
toCommit := make(map[string]*protocol.Series)
pointsPending := 0

CommitLoop:
for {
select {
case record, ok := <-self.writeSeries:
if ok {
pointsPending += 1
if series, seen := toCommit[record.Name]; seen {
series.Points = append(series.Points, record.Point)
} else {
points := make([]*protocol.Point, 1, 1)
points[0] = record.Point
toCommit[record.Name] = &protocol.Series{
Name: &record.Name,
Fields: []string{"value"},
Points: points,
}
}
} else {
// no more input, commit whatever we have and break
commit(toCommit)
break CommitLoop
}
// if capacity reached, commit
if pointsPending == commit_capacity {
commit(toCommit)
toCommit = make(map[string]*protocol.Series)
pointsPending = 0
timer.Reset(commit_max_wait)
}
case <-timer.C:
commit(toCommit)
toCommit = make(map[string]*protocol.Series)
pointsPending = 0
timer.Reset(commit_max_wait)
}
}
}

func (self *Server) handleClient(conn net.Conn) {
defer conn.Close()
defer self.writers.Done()
reader := bufio.NewReader(conn)
for {
err := self.handleMessage(reader)
if err != nil {
if io.EOF == err {
log.Debug("Client closed graphite connection")
log.Debug("GraphiteServer: Client closed graphite connection")
return
}
log.Error(err)
return
log.Error("GraphiteServer:", err)
}
}
}

func (self *Server) handleMessage(reader *bufio.Reader) error {
func (self *Server) handleMessage(reader *bufio.Reader) (err error) {
graphiteMetric := &GraphiteMetric{}
err := graphiteMetric.Read(reader)
err = graphiteMetric.Read(reader)
if err != nil {
return err
return
}
values := []*protocol.FieldValue{}
if graphiteMetric.isInt {
Expand All @@ -183,14 +278,7 @@ func (self *Server) handleMessage(reader *bufio.Reader) error {
Values: values,
SequenceNumber: &sn,
}
series := &protocol.Series{
Name: &graphiteMetric.name,
Fields: []string{"value"},
Points: []*protocol.Point{point},
}
// little inefficient for now, later we might want to add multiple series in 1 writePoints request
if err := self.writePoints(series); err != nil {
log.Error("Error in graphite plugin: %s", err)
}
return nil
record := Record{graphiteMetric.name, point}
self.writeSeries <- record
return
}
9 changes: 5 additions & 4 deletions api/graphite/graphite_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ type GraphiteMetric struct {
timestamp int64
}

// returns err == io.EOF when we hit EOF without any further data
func (self *GraphiteMetric) Read(reader *bufio.Reader) error {
buf, err := reader.ReadBytes('\n')
str := strings.TrimSpace(string(buf))
if err != nil {
if err != io.EOF {
return fmt.Errorf("GraphiteServer: connection closed uncleanly/broken: %s\n", err.Error())
return fmt.Errorf("connection closed uncleanly/broken: %s\n", err.Error())
}
if len(str) > 0 {
return fmt.Errorf("GraphiteServer: incomplete read, line read: '%s'. neglecting line because connection closed because of %s\n", str, err.Error())
if str == "" {
return err
}
return err
// else we got EOF but also data, so just try to process it as valid data
}
elements := strings.Fields(str)
if len(elements) != 3 {
Expand Down
6 changes: 6 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ func (self *Server) Stop() {
self.HttpApi.Close()
log.Info("Api server stopped")

if self.Config.GraphiteEnabled {
log.Info("Stopping GraphiteServer")
self.GraphiteApi.Close()
log.Info("GraphiteServer stopped")
}

log.Info("Stopping admin server")
self.AdminServer.Close()
log.Info("admin server stopped")
Expand Down