Skip to content

Commit

Permalink
Add UDP service back
Browse files Browse the repository at this point in the history
  • Loading branch information
renan-strauss committed Jun 3, 2015
1 parent 44d38cb commit 043613d
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/monitor"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ type Config struct {
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDP udp.Config `toml:"udp"`

// Snapshot SnapshotConfig `toml:"snapshot"`
Monitoring monitor.Config `toml:"monitoring"`
Expand Down
8 changes: 8 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -55,6 +56,7 @@ func NewServer(c *Config, joinURLs string) *Server {
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
s.appendUDPService(c.UDP)
for _, g := range c.Graphites {
s.appendGraphiteService(g)
}
Expand Down Expand Up @@ -96,6 +98,12 @@ func (s *Server) appendGraphiteService(c graphite.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendUDPService(c udp.Config) {
srv := udp.NewService(c)
srv.Server.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {
Expand Down
3 changes: 3 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ enabled = false
enabled = false
# bind-address = "0.0.0.0"
# port = 4444
# database = "udp_database"
# batch-size = 0 # How many points to batch up internally before writing.
# batch-timeout = "0ms" # Maximum time to wait before sending batch, regardless of current size.

# Broker configuration. Brokers are nodes which participate in distributed
# consensus.
Expand Down
13 changes: 13 additions & 0 deletions services/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package udp

import "github.com/influxdb/influxdb/toml"

type Config struct {
Enabled bool `toml:"enabled"`
BindAdress string `toml:"bind-adress"`
Port int `toml:"port"`

Database string `toml:"database"`
BatchSize int `toml:"batch-size"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
}
36 changes: 36 additions & 0 deletions services/udp/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package udp

import (
"net"
"strconv"
"time"
)

type Service struct {
Server *Server

iface string
}

func NewService(c Config) *Service {
server := NewServer(c.Database)
server.SetBatchSize(c.BatchSize)
server.SetBatchTimeout(time.Duration(c.BatchTimeout))

return &Service{
iface: net.JoinHostPort(c.BindAdress, strconv.Itoa(c.Port)),
Server: server,
}
}

func (s *Service) Open() error {
return s.Server.ListenAndServe(s.iface)
}

func (s *Service) Close() error {
return s.Server.Close()
}

func (s *Service) Addr() net.Addr {
return s.Server.Addr()
}
131 changes: 131 additions & 0 deletions services/udp/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package udp

import (
"log"
"net"
"os"
"sync"
"time"

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)

const (
udpBufferSize = 65536
)

type Server struct {
conn *net.UDPConn
addr *net.UDPAddr
wg sync.WaitGroup

batchSize int
batchTimeout time.Duration
database string

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}

Logger *log.Logger
}

func NewServer(db string) *Server {
return &Server{
database: db,
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
}
}

func (s *Server) SetBatchSize(sz int) { s.batchSize = sz }
func (s *Server) SetBatchTimeout(d time.Duration) { s.batchTimeout = d }

func (s *Server) ListenAndServe(iface string) (err error) {
s.addr, err = net.ResolveUDPAddr("udp", iface)
if err != nil {
s.Logger.Printf("Failed to resolve UDP address %s: %s", iface, err)
return err
}

s.conn, err = net.ListenUDP("udp", s.addr)
if err != nil {
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err)
return err
}

s.Logger.Printf("Started listening on %s", iface)

batcher := tsdb.NewPointBatcher(s.batchSize, s.batchTimeout)
batcher.Start()

// Start processing batches.
var wg sync.WaitGroup
done := make(chan struct{})
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case batch := <-batcher.Out():
err = s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.database,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: batch,
})
if err != nil {
s.Logger.Printf("Failed to write point batch to database %q: %s\n", s.database, err)
} else {
s.Logger.Printf("Wrote a batch of %d points to %s", len(batch), s.database)
}
case <-done:
return
}
}
}()

s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
buf := make([]byte, udpBufferSize)
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.Logger.Printf("Failed to read UDP message: %s", err)
batcher.Flush()
close(done)
wg.Wait()
return
}

points, err := tsdb.ParsePoints(buf[:n])
if err != nil {
s.Logger.Printf("Failed to parse points: %s", err)
continue
}

s.Logger.Printf("Received write for %d points on database %s", len(points), s.database)

for _, point := range points {
batcher.In() <- point
}
}
}()

return nil
}

func (s *Server) Close() error {
var err error
if s.conn != nil {
err = s.conn.Close()
}
s.wg.Done()

return err
}

func (s *Server) Addr() net.Addr {
return s.addr
}

0 comments on commit 043613d

Please sign in to comment.