From b688eccb77b6ee6b275064f86cb8060e49891b96 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 8 Jun 2015 10:20:54 -0600 Subject: [PATCH] Refactor OpenTSDB to a service This commit converts the OpenTSDB endpoint into a service. --- CHANGELOG.md | 1 + cmd/influxd/run/server.go | 1 + services/opentsdb/config.go | 17 +- services/opentsdb/handler.go | 180 ++++++++++++++ services/opentsdb/opentsdb.go | 392 ------------------------------ services/opentsdb/service.go | 233 +++++++++++++++++- services/opentsdb/service_test.go | 154 ++++++++++++ 7 files changed, 579 insertions(+), 399 deletions(-) create mode 100644 services/opentsdb/handler.go delete mode 100644 services/opentsdb/opentsdb.go create mode 100644 services/opentsdb/service_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 62542182793..bc9a8ba7e42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - [2816](https://github.com/influxdb/influxdb/pull/2816) -- enable UDP service. Thanks @renan- - [2824](https://github.com/influxdb/influxdb/pull/2824) -- Add missing call to WaitGroup.Done in execConn. Thanks @liyichao +- [2823](https://github.com/influxdb/influxdb/pull/2823) -- Convert OpenTSDB to a service. ## v0.9.0-rc32 [2015-06-07] diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 1646bf319ad..c65fd0a6a23 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -147,6 +147,7 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) { return } srv := opentsdb.NewService(c) + srv.PointsWriter = s.PointsWriter s.Services = append(s.Services, srv) } diff --git a/services/opentsdb/config.go b/services/opentsdb/config.go index 46569d09c31..3168d288b54 100644 --- a/services/opentsdb/config.go +++ b/services/opentsdb/config.go @@ -1,5 +1,16 @@ package opentsdb +const ( + // DefaultBindAddress is the default address that the service binds to. + DefaultBindAddress = ":4242" + + // DefaultDatabase is the default database used for writes. + DefaultDatabase = "opentsdb" + + // DefaultRetentionPolicy is the default retention policy used for writes. + DefaultRetentionPolicy = "" +) + type Config struct { Enabled bool `toml:"enabled"` BindAddress string `toml:"bind-address"` @@ -8,5 +19,9 @@ type Config struct { } func NewConfig() Config { - return Config{} + return Config{ + BindAddress: DefaultBindAddress, + Database: DefaultDatabase, + RetentionPolicy: DefaultRetentionPolicy, + } } diff --git a/services/opentsdb/handler.go b/services/opentsdb/handler.go new file mode 100644 index 00000000000..7b533d5262c --- /dev/null +++ b/services/opentsdb/handler.go @@ -0,0 +1,180 @@ +package opentsdb + +import ( + "bufio" + "compress/gzip" + "encoding/json" + "errors" + "io" + "log" + "net" + "net/http" + "time" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/tsdb" +) + +type Handler struct { + Database string + RetentionPolicy string + + PointsWriter interface { + WritePoints(p *cluster.WritePointsRequest) error + } + + Logger *log.Logger +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/api/metadata/put": + w.WriteHeader(http.StatusNoContent) + case "/api/put": + h.servePut(w, r) + default: + http.NotFound(w, r) + } +} + +// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint +func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + + // Require POST method. + if r.Method != "POST" { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + + // Wrap reader if it's gzip encoded. + var br *bufio.Reader + if r.Header.Get("Content-Encoding") == "gzip" { + zr, err := gzip.NewReader(r.Body) + if err != nil { + http.Error(w, "could not read gzip, "+err.Error(), http.StatusBadRequest) + return + } + + br = bufio.NewReader(zr) + } else { + br = bufio.NewReader(r.Body) + } + + // Lookahead at the first byte. + f, err := br.Peek(1) + if err != nil || len(f) != 1 { + http.Error(w, "peek error: "+err.Error(), http.StatusBadRequest) + return + } + + // Peek to see if this is a JSON array. + var multi bool + switch f[0] { + case '{': + case '[': + multi = true + default: + http.Error(w, "expected JSON array or hash", http.StatusBadRequest) + return + } + + // Decode JSON data into slice of points. + dps := make([]point, 1) + if dec := json.NewDecoder(br); multi { + if err = dec.Decode(&dps); err != nil { + http.Error(w, "json array decode error", http.StatusBadRequest) + return + } + } else { + if err = dec.Decode(&dps[0]); err != nil { + http.Error(w, "json object decode error", http.StatusBadRequest) + return + } + } + + // Convert points into TSDB points. + points := make([]tsdb.Point, 0, len(dps)) + for i := range dps { + p := dps[i] + + // Convert timestamp to Go time. + // If time value is over a billion then it's microseconds. + var ts time.Time + if p.Time < 10000000000 { + ts = time.Unix(p.Time, 0) + } else { + ts = time.Unix(p.Time/1000, (p.Time%1000)*1000) + } + + points = append(points, tsdb.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts)) + } + + // Write points. + if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{ + Database: h.Database, + RetentionPolicy: h.RetentionPolicy, + ConsistencyLevel: cluster.ConsistencyLevelOne, + Points: points, + }); influxdb.IsClientError(err) { + h.Logger.Println("write series error: ", err) + http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest) + return + } else if err != nil { + h.Logger.Println("write series error: ", err) + http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +// chanListener represents a listener that receives connections through a channel. +type chanListener struct { + addr net.Addr + ch chan net.Conn +} + +// newChanListener returns a new instance of chanListener. +func newChanListener(addr net.Addr) *chanListener { + return &chanListener{ + addr: addr, + ch: make(chan net.Conn), + } +} + +func (ln *chanListener) Accept() (net.Conn, error) { + conn, ok := <-ln.ch + if !ok { + return nil, errors.New("network connection closed") + } + log.Println("TSDB listener accept ", conn) + return conn, nil +} + +// Close closes the connection channel. +func (ln *chanListener) Close() error { + close(ln.ch) + return nil +} + +// Addr returns the network address of the listener. +func (ln *chanListener) Addr() net.Addr { return ln.addr } + +// readerConn represents a net.Conn with an assignable reader. +type readerConn struct { + net.Conn + r io.Reader +} + +// Read implments the io.Reader interface. +func (conn *readerConn) Read(b []byte) (n int, err error) { return conn.r.Read(b) } + +// point represents an incoming JSON data point. +type point struct { + Metric string `json:"metric"` + Time int64 `json:"timestamp"` + Value float64 `json:"value"` + Tags map[string]string `json:"tags,omitempty"` +} diff --git a/services/opentsdb/opentsdb.go b/services/opentsdb/opentsdb.go deleted file mode 100644 index 0bf40eba39a..00000000000 --- a/services/opentsdb/opentsdb.go +++ /dev/null @@ -1,392 +0,0 @@ -package opentsdb - -import ( - "bufio" - "bytes" - "compress/gzip" - "encoding/json" - "io" - "log" - "net" - "net/http" - "net/textproto" - "strconv" - "strings" - "sync" - "time" - - "github.com/influxdb/influxdb/tsdb" -) - -const ( - // DefaultPort represents the default OpenTSDB port. - DefaultPort = 4242 - - // DefaultDatabaseName is the default OpenTSDB database if none is specified. - DefaultDatabaseName = "opentsdb" -) - -// SeriesWriter defines the interface for the destination of the data. -type SeriesWriter interface { - WriteSeries(database, retentionPolicy string, points []tsdb.Point) (uint64, error) -} - -// Server is an InfluxDB input class to implement OpenTSDB's input protocols. -type Server struct { - writer SeriesWriter - - database string - - listener *net.TCPListener - tsdbhttp *tsdbHTTPListener - wg sync.WaitGroup - - addr net.Addr - mu sync.Mutex -} - -func NewServer(w SeriesWriter, retpol string, db string) *Server { - s := &Server{} - - s.writer = w - s.database = db - s.tsdbhttp = makeTSDBHTTPListener() - - return s -} - -func (s *Server) Addr() net.Addr { - s.mu.Lock() - defer s.mu.Unlock() - return s.addr -} - -// ListenAndServe start the OpenTSDB compatible server on the given -// ip and port. -func (s *Server) ListenAndServe(listenAddress string) { - var err error - - addr, err := net.ResolveTCPAddr("tcp", listenAddress) - if err != nil { - log.Println("TSDBServer: ResolveTCPAddr: ", err) - return - } - - s.listener, err = net.ListenTCP("tcp", addr) - if err != nil { - log.Println("TSDBServer: Listen: ", err) - return - } - - s.mu.Lock() - s.addr = s.listener.Addr() - s.mu.Unlock() - - s.wg.Add(1) - - // Set up the background HTTP server that we - // will pass http request to via a channel - mux := http.NewServeMux() - mux.HandleFunc("/api/metadata/put", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNoContent) - }) - mux.Handle("/api/put", s) - httpsrv := &http.Server{} - httpsrv.Handler = mux - go httpsrv.Serve(s.tsdbhttp) - - go func() { - defer s.wg.Done() - for { - conn, err := s.listener.Accept() - if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { - log.Println("openTSDB TCP listener closed") - return - } - if err != nil { - log.Println("error accepting openTSDB: ", err.Error()) - continue - } - s.wg.Add(1) - go s.HandleConnection(conn) - } - }() -} - -func (s *Server) Close() error { - var err error - if s.listener != nil { - err = (*s.listener).Close() - } - s.wg.Wait() - s.listener = nil - return err -} - -// HandleConnection takes each new connection and attempts to -// determine if it should be handled by the HTTP handler, if -// parsing as a HTTP request fails, we'll pass it to the -// telnet handler -func (s *Server) HandleConnection(conn net.Conn) { - var peekbuf bytes.Buffer - t := io.TeeReader(conn, &peekbuf) - r := bufio.NewReader(t) - - _, httperr := http.ReadRequest(r) - - splice := io.MultiReader(&peekbuf, conn) - bufsplice := bufio.NewReader(splice) - newc := &tsdbConn{ - Conn: conn, - rdr: bufsplice, - } - - if httperr == nil { - s.tsdbhttp.acc <- tsdbHTTPReq{ - conn: newc, - } - } else { - s.HandleTelnet(newc) - } -} - -// HandleTelnet accepts OpenTSDB's telnet protocol -// Each telnet command consists of a line of the form: -// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 -func (s *Server) HandleTelnet(conn net.Conn) { - reader := bufio.NewReader(conn) - tp := textproto.NewReader(reader) - - defer conn.Close() - defer s.wg.Done() - - for { - line, err := tp.ReadLine() - if err != nil { - log.Println("error reading from openTSDB connection", err.Error()) - return - } - - inputStrs := strings.Fields(line) - - if len(inputStrs) == 1 && inputStrs[0] == "version" { - conn.Write([]byte("InfluxDB TSDB proxy")) - continue - } - - if len(inputStrs) < 4 || inputStrs[0] != "put" { - log.Println("TSDBServer: malformed line, skipping: ", line) - continue - } - - measurement := inputStrs[1] - tsStr := inputStrs[2] - valueStr := inputStrs[3] - tagStrs := inputStrs[4:] - - var t time.Time - ts, err := strconv.ParseInt(tsStr, 10, 64) - if err != nil { - log.Println("TSDBServer: malformed time, skipping: ", tsStr) - } - - switch len(tsStr) { - case 10: - t = time.Unix(ts, 0) - break - case 13: - t = time.Unix(ts/1000, (ts%1000)*1000) - break - default: - log.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr) - continue - } - - tags := make(map[string]string) - for t := range tagStrs { - parts := strings.SplitN(tagStrs[t], "=", 2) - if len(parts) != 2 { - log.Println("TSDBServer: malformed tag data", tagStrs[t]) - continue - } - k := parts[0] - - tags[k] = parts[1] - } - - fields := make(map[string]interface{}) - fields["value"], err = strconv.ParseFloat(valueStr, 64) - if err != nil { - log.Println("TSDBServer: could not parse value as float: ", valueStr) - continue - } - - p := tsdb.NewPoint(measurement, tags, fields, t) - - _, err = s.writer.WriteSeries(s.database, "", []tsdb.Point{p}) - if err != nil { - log.Println("TSDB cannot write data: ", err) - continue - } - } -} - -/* - tsdbDP is a struct to unmarshal OpenTSDB /api/put data into - Request is either tsdbDP, or a []tsdbDP - { - "metric": "sys.cpu.nice", - "timestamp": 1346846400, - "value": 18, - "tags": { - "host": "web01", - "dc": "lga" - } - } -*/ -type tsdbDP struct { - Metric string `json:"metric"` - Time int64 `json:"timestamp"` - Value float64 `json:"value"` - Tags map[string]string `json:"tags,omitempty"` -} - -// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint -func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - defer r.Body.Close() - defer s.wg.Done() - - if r.Method != "POST" { - http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) - return - } - - dps := make([]tsdbDP, 1) - var br *bufio.Reader - - // We need to peek and see if this is an array or a single - // DP - multi := false - - if r.Header.Get("Content-Encoding") == "gzip" { - zr, err := gzip.NewReader(r.Body) - if err != nil { - http.Error(w, "Could not read gzip, "+err.Error(), http.StatusBadRequest) - return - } - - br = bufio.NewReader(zr) - } else { - br = bufio.NewReader(r.Body) - } - - f, err := br.Peek(1) - - if err != nil || len(f) != 1 { - http.Error(w, "Could not peek at JSON data, "+err.Error(), http.StatusBadRequest) - return - } - - switch f[0] { - case '{': - case '[': - multi = true - default: - http.Error(w, "Expected JSON array or hash", http.StatusBadRequest) - return - } - - dec := json.NewDecoder(br) - - if multi { - err = dec.Decode(&dps) - } else { - err = dec.Decode(&dps[0]) - } - - if err != nil { - http.Error(w, "Could not decode JSON as TSDB data", http.StatusBadRequest) - return - } - - var idps []tsdb.Point - for dpi := range dps { - dp := dps[dpi] - - var ts time.Time - if dp.Time < 10000000000 { - ts = time.Unix(dp.Time, 0) - } else { - ts = time.Unix(dp.Time/1000, (dp.Time%1000)*1000) - } - - fields := make(map[string]interface{}) - fields["value"] = dp.Value - if err != nil { - continue - } - p := tsdb.NewPoint(dp.Metric, dp.Tags, fields, ts) - idps = append(idps, p) - } - _, err = s.writer.WriteSeries(s.database, "", idps) - if err != nil { - log.Println("TSDB cannot write data: ", err) - } - - w.WriteHeader(http.StatusNoContent) -} - -// tsdbHTTPListener is a listener that takes connects from a channel -// rather than directly from a network socket -type tsdbHTTPListener struct { - addr net.Addr - cls chan struct{} - acc chan tsdbHTTPReq -} - -// tsdbHTTPReq represents a incoming connection that we have established -// to be a valid http request. -type tsdbHTTPReq struct { - conn net.Conn - err error -} - -func (l *tsdbHTTPListener) Accept() (c net.Conn, err error) { - select { - case newc := <-l.acc: - log.Println("TSDB listener accept ", newc) - return newc.conn, newc.err - case <-l.cls: - close(l.cls) - close(l.acc) - return nil, nil - } -} - -func (l *tsdbHTTPListener) Close() error { - l.cls <- struct{}{} - return nil -} - -func (l *tsdbHTTPListener) Addr() net.Addr { - return l.addr -} - -func makeTSDBHTTPListener() *tsdbHTTPListener { - return &tsdbHTTPListener{ - acc: make(chan tsdbHTTPReq), - cls: make(chan struct{}), - } -} - -// tsdbConn is a net.Conn implmentation used to splice peeked buffer content -// to the pre-existing net.Conn that was peeked into -type tsdbConn struct { - rdr *bufio.Reader - net.Conn -} - -// Read implmeents the io.Reader interface -func (c *tsdbConn) Read(b []byte) (n int, err error) { - return c.rdr.Read(b) -} diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 3d28d8eaf2f..1c9912be35e 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -1,11 +1,232 @@ package opentsdb -import "net" +import ( + "bufio" + "bytes" + "io" + "log" + "net" + "net/http" + "net/textproto" + "os" + "strconv" + "strings" + "sync" + "time" -type Service struct{} + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/tsdb" +) -func NewService(c Config) *Service { return &Service{} } +// Service manages the listener and handler for an HTTP endpoint. +type Service struct { + ln net.Listener // main listener + httpln *chanListener // http channel-based listener -func (s *Service) Open() error { return nil } -func (s *Service) Close() error { return nil } -func (s *Service) Addr() net.Addr { return nil } + wg sync.WaitGroup + err chan error + + BindAddress string + Database string + RetentionPolicy string + + PointsWriter interface { + WritePoints(p *cluster.WritePointsRequest) error + } + + Logger *log.Logger +} + +// NewService returns a new instance of Service. +func NewService(c Config) *Service { + s := &Service{ + err: make(chan error), + BindAddress: c.BindAddress, + Database: c.Database, + RetentionPolicy: c.RetentionPolicy, + Logger: log.New(os.Stderr, "[opentsdb] ", log.LstdFlags), + } + return s +} + +// Open starts the service +func (s *Service) Open() error { + // Open listener. + ln, err := net.Listen("tcp", s.BindAddress) + if err != nil { + return err + } + s.ln = ln + s.httpln = newChanListener(ln.Addr()) + + s.Logger.Println("listening on:", ln.Addr().String()) + + // Begin listening for connections. + s.wg.Add(2) + go s.serveHTTP() + go s.serve() + + return nil +} + +// Close closes the underlying listener. +func (s *Service) Close() error { + if s.ln != nil { + return s.ln.Close() + } + + s.wg.Wait() + return nil +} + +// SetLogger sets the internal logger to the logger passed in. +func (s *Service) SetLogger(l *log.Logger) { s.Logger = l } + +// Err returns a channel for fatal errors that occur on the listener. +func (s *Service) Err() <-chan error { return s.err } + +// Addr returns the listener's address. Returns nil if listener is closed. +func (s *Service) Addr() net.Addr { + if s.ln == nil { + return nil + } + return s.ln.Addr() +} + +// serve serves the handler from the listener. +func (s *Service) serve() { + defer s.wg.Done() + + for { + // Wait for next connection. + conn, err := s.ln.Accept() + if opErr, ok := err.(*net.OpError); ok && !opErr.Temporary() { + s.Logger.Println("openTSDB TCP listener closed") + return + } else if err != nil { + s.Logger.Println("error accepting openTSDB: ", err.Error()) + continue + } + + // Handle connection in separate goroutine. + go s.handleConn(conn) + } +} + +// handleConn processes conn. This is run in a separate goroutine. +func (s *Service) handleConn(conn net.Conn) { + // Read header into buffer to check if it's HTTP. + var buf bytes.Buffer + r := bufio.NewReader(io.TeeReader(conn, &buf)) + + // Attempt to parse connection as HTTP. + _, err := http.ReadRequest(r) + + // Rebuild connection from buffer and remaining connection data. + bufr := bufio.NewReader(io.MultiReader(&buf, conn)) + conn = &readerConn{Conn: conn, r: bufr} + + // If no HTTP parsing error occurred then process as HTTP. + if err == nil { + s.httpln.ch <- conn + return + } + + // Otherwise handle in telnet format. + s.wg.Add(1) + s.handleTelnetConn(conn) +} + +// handleTelnetConn accepts OpenTSDB's telnet protocol. +// Each telnet command consists of a line of the form: +// put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0 +func (s *Service) handleTelnetConn(conn net.Conn) { + defer conn.Close() + defer s.wg.Done() + + // Wrap connection in a text protocol reader. + r := textproto.NewReader(bufio.NewReader(conn)) + for { + line, err := r.ReadLine() + if err != nil { + s.Logger.Println("error reading from openTSDB connection", err.Error()) + return + } + + inputStrs := strings.Fields(line) + + if len(inputStrs) == 1 && inputStrs[0] == "version" { + conn.Write([]byte("InfluxDB TSDB proxy")) + continue + } + + if len(inputStrs) < 4 || inputStrs[0] != "put" { + s.Logger.Println("TSDBServer: malformed line, skipping: ", line) + continue + } + + measurement := inputStrs[1] + tsStr := inputStrs[2] + valueStr := inputStrs[3] + tagStrs := inputStrs[4:] + + var t time.Time + ts, err := strconv.ParseInt(tsStr, 10, 64) + if err != nil { + s.Logger.Println("TSDBServer: malformed time, skipping: ", tsStr) + } + + switch len(tsStr) { + case 10: + t = time.Unix(ts, 0) + break + case 13: + t = time.Unix(ts/1000, (ts%1000)*1000) + break + default: + s.Logger.Println("TSDBServer: time must be 10 or 13 chars, skipping: ", tsStr) + continue + } + + tags := make(map[string]string) + for t := range tagStrs { + parts := strings.SplitN(tagStrs[t], "=", 2) + if len(parts) != 2 { + s.Logger.Println("TSDBServer: malformed tag data", tagStrs[t]) + continue + } + k := parts[0] + + tags[k] = parts[1] + } + + fields := make(map[string]interface{}) + fields["value"], err = strconv.ParseFloat(valueStr, 64) + if err != nil { + s.Logger.Println("TSDBServer: could not parse value as float: ", valueStr) + continue + } + + p := tsdb.NewPoint(measurement, tags, fields, t) + if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{ + Database: s.Database, + RetentionPolicy: s.RetentionPolicy, + ConsistencyLevel: cluster.ConsistencyLevelOne, + Points: []tsdb.Point{p}, + }); err != nil { + s.Logger.Println("TSDB cannot write data: ", err) + continue + } + } +} + +// serveHTTP handles connections in HTTP format. +func (s *Service) serveHTTP() { + srv := &http.Server{Handler: &Handler{ + Database: s.Database, + RetentionPolicy: s.RetentionPolicy, + PointsWriter: s.PointsWriter, + Logger: s.Logger, + }} + srv.Serve(s.httpln) +} diff --git a/services/opentsdb/service_test.go b/services/opentsdb/service_test.go new file mode 100644 index 00000000000..5ca6dc5a0fd --- /dev/null +++ b/services/opentsdb/service_test.go @@ -0,0 +1,154 @@ +package opentsdb_test + +import ( + "io/ioutil" + "log" + "net" + "net/http" + "reflect" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/davecgh/go-spew/spew" + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/services/opentsdb" + "github.com/influxdb/influxdb/tsdb" +) + +// Ensure a point can be written via the telnet protocol. +func TestService_Telnet(t *testing.T) { + t.Parallel() + + s := NewService("db0") + if err := s.Open(); err != nil { + t.Fatal(err) + } + defer s.Close() + + // Mock points writer. + var called int32 + s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error { + atomic.StoreInt32(&called, 1) + + if req.Database != "db0" { + t.Fatalf("unexpected database: %s", req.Database) + } else if req.RetentionPolicy != "" { + t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy) + } else if !reflect.DeepEqual(req.Points, []tsdb.Point{ + tsdb.NewPoint( + "sys.cpu.user", + map[string]string{"host": "webserver01", "cpu": "0"}, + map[string]interface{}{"value": 42.5}, + time.Unix(1356998400, 0), + ), + }) { + spew.Dump(req.Points) + t.Fatalf("unexpected points: %#v", req.Points) + } + return nil + } + + // Open connection to the service. + conn, err := net.Dial("tcp", s.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer conn.Close() + + // Write telnet data and close. + if _, err := conn.Write([]byte("put sys.cpu.user 1356998400 42.5 host=webserver01 cpu=0")); err != nil { + t.Fatal(err) + } + if err := conn.Close(); err != nil { + t.Fatal(err) + } + time.Sleep(10 * time.Millisecond) + + // Verify that the writer was called. + if atomic.LoadInt32(&called) == 0 { + t.Fatal("points writer not called") + } +} + +// Ensure a point can be written via the HTTP protocol. +func TestService_HTTP(t *testing.T) { + t.Parallel() + + s := NewService("db0") + if err := s.Open(); err != nil { + t.Fatal(err) + } + defer s.Close() + + // Mock points writer. + var called bool + s.PointsWriter.WritePointsFn = func(req *cluster.WritePointsRequest) error { + called = true + if req.Database != "db0" { + t.Fatalf("unexpected database: %s", req.Database) + } else if req.RetentionPolicy != "" { + t.Fatalf("unexpected retention policy: %s", req.RetentionPolicy) + } else if !reflect.DeepEqual(req.Points, []tsdb.Point{ + tsdb.NewPoint( + "sys.cpu.nice", + map[string]string{"dc": "lga", "host": "web01"}, + map[string]interface{}{"value": 18.0}, + time.Unix(1346846400, 0), + ), + }) { + spew.Dump(req.Points) + t.Fatalf("unexpected points: %#v", req.Points) + } + return nil + } + + // Write HTTP request to server. + resp, err := http.Post("http://"+s.Addr().String()+"/api/put", "application/json", strings.NewReader(`{"metric":"sys.cpu.nice", "timestamp":1346846400, "value":18, "tags":{"host":"web01", "dc":"lga"}}`)) + if err != nil { + t.Fatal(err) + } + defer resp.Body.Close() + + // Verify status and body. + if resp.StatusCode != http.StatusNoContent { + t.Fatalf("unexpected status code: %d", resp.StatusCode) + } + + // Verify that the writer was called. + if !called { + t.Fatal("points writer not called") + } +} + +type Service struct { + *opentsdb.Service + PointsWriter PointsWriter +} + +// NewService returns a new instance of Service. +func NewService(database string) *Service { + s := &Service{ + Service: opentsdb.NewService(opentsdb.Config{ + BindAddress: "127.0.0.1:0", + Database: database, + }), + } + s.Service.PointsWriter = &s.PointsWriter + + if !testing.Verbose() { + s.Logger = log.New(ioutil.Discard, "", log.LstdFlags) + } + + return s +} + +// PointsWriter represents a mock impl of PointsWriter. +type PointsWriter struct { + WritePointsFn func(*cluster.WritePointsRequest) error +} + +func (w *PointsWriter) WritePoints(p *cluster.WritePointsRequest) error { + return w.WritePointsFn(p) +}