Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert OpenTSDB to service #2823

Merged
merged 1 commit into from
Jun 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- [2816](https://github.com/influxdb/influxdb/pull/2816) -- enable UDP service. Thanks @renan-
- [2824](https://github.com/influxdb/influxdb/pull/2824) -- Add missing call to WaitGroup.Done in execConn. Thanks @liyichao
- [2823](https://github.com/influxdb/influxdb/pull/2823) -- Convert OpenTSDB to a service.

## v0.9.0-rc32 [2015-06-07]

Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (s *Server) appendOpenTSDBService(c opentsdb.Config) {
return
}
srv := opentsdb.NewService(c)
srv.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

Expand Down
17 changes: 16 additions & 1 deletion services/opentsdb/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package opentsdb

const (
// DefaultBindAddress is the default address that the service binds to.
DefaultBindAddress = ":4242"

// DefaultDatabase is the default database used for writes.
DefaultDatabase = "opentsdb"

// DefaultRetentionPolicy is the default retention policy used for writes.
DefaultRetentionPolicy = ""
)

type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`
Expand All @@ -8,5 +19,9 @@ type Config struct {
}

func NewConfig() Config {
return Config{}
return Config{
BindAddress: DefaultBindAddress,
Database: DefaultDatabase,
RetentionPolicy: DefaultRetentionPolicy,
}
}
180 changes: 180 additions & 0 deletions services/opentsdb/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package opentsdb

import (
"bufio"
"compress/gzip"
"encoding/json"
"errors"
"io"
"log"
"net"
"net/http"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)

type Handler struct {
Database string
RetentionPolicy string

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}

Logger *log.Logger
}

func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/api/metadata/put":
w.WriteHeader(http.StatusNoContent)
case "/api/put":
h.servePut(w, r)
default:
http.NotFound(w, r)
}
}

// ServeHTTP implements OpenTSDB's HTTP /api/put endpoint
func (h *Handler) servePut(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

// Require POST method.
if r.Method != "POST" {
http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
return
}

// Wrap reader if it's gzip encoded.
var br *bufio.Reader
if r.Header.Get("Content-Encoding") == "gzip" {
zr, err := gzip.NewReader(r.Body)
if err != nil {
http.Error(w, "could not read gzip, "+err.Error(), http.StatusBadRequest)
return
}

br = bufio.NewReader(zr)
} else {
br = bufio.NewReader(r.Body)
}

// Lookahead at the first byte.
f, err := br.Peek(1)
if err != nil || len(f) != 1 {
http.Error(w, "peek error: "+err.Error(), http.StatusBadRequest)
return
}

// Peek to see if this is a JSON array.
var multi bool
switch f[0] {
case '{':
case '[':
multi = true
default:
http.Error(w, "expected JSON array or hash", http.StatusBadRequest)
return
}

// Decode JSON data into slice of points.
dps := make([]point, 1)
if dec := json.NewDecoder(br); multi {
if err = dec.Decode(&dps); err != nil {
http.Error(w, "json array decode error", http.StatusBadRequest)
return
}
} else {
if err = dec.Decode(&dps[0]); err != nil {
http.Error(w, "json object decode error", http.StatusBadRequest)
return
}
}

// Convert points into TSDB points.
points := make([]tsdb.Point, 0, len(dps))
for i := range dps {
p := dps[i]

// Convert timestamp to Go time.
// If time value is over a billion then it's microseconds.
var ts time.Time
if p.Time < 10000000000 {
ts = time.Unix(p.Time, 0)
} else {
ts = time.Unix(p.Time/1000, (p.Time%1000)*1000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming we are converting to ms here? But a comment to clarify would be helpful.

}

points = append(points, tsdb.NewPoint(p.Metric, p.Tags, map[string]interface{}{"value": p.Value}, ts))
}

// Write points.
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: h.Database,
RetentionPolicy: h.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks hardcoded. Does this mean clients will have no control over consistency level with openTSDB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to convert the old server over to a service right now. We can add consistency level separately.

Points: points,
}); influxdb.IsClientError(err) {
h.Logger.Println("write series error: ", err)
http.Error(w, "write series error: "+err.Error(), http.StatusBadRequest)
return
} else if err != nil {
h.Logger.Println("write series error: ", err)
http.Error(w, "write series error: "+err.Error(), http.StatusInternalServerError)
return
}

w.WriteHeader(http.StatusNoContent)
}

// chanListener represents a listener that receives connections through a channel.
type chanListener struct {
addr net.Addr
ch chan net.Conn
}

// newChanListener returns a new instance of chanListener.
func newChanListener(addr net.Addr) *chanListener {
return &chanListener{
addr: addr,
ch: make(chan net.Conn),
}
}

func (ln *chanListener) Accept() (net.Conn, error) {
conn, ok := <-ln.ch
if !ok {
return nil, errors.New("network connection closed")
}
log.Println("TSDB listener accept ", conn)
return conn, nil
}

// Close closes the connection channel.
func (ln *chanListener) Close() error {
close(ln.ch)
return nil
}

// Addr returns the network address of the listener.
func (ln *chanListener) Addr() net.Addr { return ln.addr }

// readerConn represents a net.Conn with an assignable reader.
type readerConn struct {
net.Conn
r io.Reader
}

// Read implments the io.Reader interface.
func (conn *readerConn) Read(b []byte) (n int, err error) { return conn.r.Read(b) }

// point represents an incoming JSON data point.
type point struct {
Metric string `json:"metric"`
Time int64 `json:"timestamp"`
Value float64 `json:"value"`
Tags map[string]string `json:"tags,omitempty"`
}
Loading