Skip to content

Commit

Permalink
Cosmetic
Browse files Browse the repository at this point in the history
  • Loading branch information
syepes committed Apr 9, 2020
1 parent 51ca584 commit ebcccf2
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 54 deletions.
52 changes: 13 additions & 39 deletions libbeat/outputs/influxdb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,7 @@ type client struct {
timeField string
}

func newClient(
observer outputs.Observer,
addr string,
user string,
pass string,
db string,
measurement string,
timePrecision string,
tagFields []string,
timeField string,
) *client {
func newClient(observer outputs.Observer, addr string, user string, pass string, db string, measurement string, timePrecision string, tagFields []string, timeField string) *client {
hash := make(map[string]int)
for _, f := range tagFields {
if f != "" {
Expand All @@ -60,43 +50,36 @@ func newClient(
return c
}

func (c *client) Connect() error {
var err error
debugf("connect")
func (c *client) Connect() (err error) {
logp.Debug("influxdb", "Connect")

c.conn, err = influxdb.NewHTTPClient(influxdb.HTTPConfig{
Addr: c.addr,
Username: c.username,
Password: c.password,
})
c.conn, err = influxdb.NewHTTPClient(influxdb.HTTPConfig{Addr: c.addr, Username: c.username, Password: c.password})
if err != nil {
logp.Err("Failed to create HTTP conn to influxdb: %v", err)
return err
}

logp.Info("Client to influxdb has created: %v", c.addr)

return err
}

func (c *client) Close() error {
debugf("close connection")
logp.Debug("influxdb", "Close connection")
return c.conn.Close()
}

func (c *client) Publish(batch publisher.Batch) error {
if c == nil {
panic("no client")
panic("No client")
}
if batch == nil {
panic("no batch")
panic("No batch")
}

events := batch.Events()
c.observer.NewBatch(len(events))
failed, err := c.publish(events)
if err != nil {
logp.Err("publish failed:", err)
batch.RetryEvents(failed)
c.observer.Failed(len(failed))
return err
Expand All @@ -109,7 +92,6 @@ func (c *client) publish(data []publisher.Event) ([]publisher.Event, error) {
var err error

serialized := c.serializeEvents(data)

dropped := len(data) - len(serialized)
c.observer.Dropped(dropped)
if dropped > 0 {
Expand All @@ -133,12 +115,11 @@ func (c *client) publish(data []publisher.Event) ([]publisher.Event, error) {
err = c.conn.Write(bp)

if err != nil {
logp.Err("Failed to write to influxdb: %v", err)
logp.Err("Failed to write %d records to influxdb: %v", len(data[:len(serialized)]), err)
for _, event := range data[:len(serialized)] {
logp.Info("Content: ", event.Content)
logp.Debug("influxdb", "Failed record: %v", event.Content)
}
return data[:len(serialized)], err

}

c.observer.Acked(len(serialized))
Expand Down Expand Up @@ -187,15 +168,12 @@ func (c *client) scanFields(originFields map[string]interface{}) (*string, map[s
return measurement, tags, fields
}

func (c *client) serializeEvents(
data []publisher.Event,
) []*influxdb.Point {
func (c *client) serializeEvents(data []publisher.Event) []*influxdb.Point {
to := make([]*influxdb.Point, 0, len(data))

for _, d := range data {
t := d.Content.Timestamp
if timestamp, ok := d.Content.Fields[c.timeField]; ok {

if v, ok := timestamp.(int64); ok {
if c.timePrecision == "s" {
t = time.Unix(v, 0)
Expand All @@ -208,30 +186,26 @@ func (c *client) serializeEvents(
}

measurement, tags, fields := c.scanFields(d.Content.Fields)
debugf("measurement: %s\n", *measurement)
debugf("tags: %v\n", tags)
debugf("fields: %v\n", fields)
debugf("ts: %s\n", t)
logp.Debug("influxdb", "measurement: %s, tags: %v, fields: %v, ts:", *measurement, tags, fields, t)

if measurement != nil {
point, err := influxdb.NewPoint(*measurement, tags, fields, t)
if err != nil {
logp.Err("Encoding event failed with error: %v", err)
goto end
return to
}
to = append(to, point)

} else {
point, err := influxdb.NewPoint(c.measurement, tags, fields, t)
if err != nil {
logp.Err("Encoding event failed with error: %v", err)
goto end
return to
}
to = append(to, point)
}
}

end:
return to
}

Expand Down
6 changes: 2 additions & 4 deletions libbeat/outputs/influxdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
)

type influxdbConfig struct {
LoadBalance bool `config:"loadbalance"`
Timeout time.Duration `config:"timeout"`
BulkMaxSize int `config:"bulk_max_size"`
MaxRetries int `config:"max_retries" validate:"min=-1"`
Expand All @@ -30,7 +29,6 @@ type backoff struct {

var (
defaultConfig = influxdbConfig{
LoadBalance: false,
Timeout: 5 * time.Second,
BulkMaxSize: 2048,
MaxRetries: 3,
Expand All @@ -40,8 +38,8 @@ var (
Max: 60 * time.Second,
},
Addr: "http://localhost:8086",
Db: "test_db",
Measurement: "test",
Db: "db",
Measurement: "metric",
TimePrecision: "s",
}
)
Expand Down
12 changes: 1 addition & 11 deletions libbeat/outputs/influxdb/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,18 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/outputs"
)

type influxdbOut struct {
beat beat.Info
}

var debugf = logp.MakeDebug("influxdb")

func init() {
outputs.RegisterType("influxdb", makeInfluxdb)
}

func makeInfluxdb(
_ outputs.IndexManager,
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
func makeInfluxdb(_ outputs.IndexManager, beat beat.Info, observer outputs.Observer, cfg *common.Config) (outputs.Group, error) {
var err error
config := defaultConfig
if err = cfg.Unpack(&config); err != nil {
Expand All @@ -36,7 +28,5 @@ func makeInfluxdb(
}

client := newClient(observer, config.Addr, config.Username, config.Password, config.Db, config.Measurement, config.TimePrecision, config.SendAsTags, config.SendAsTime)

// return outputs.SuccessNet(config.LoadBalance, config.BulkMaxSize, config.MaxRetries, client)
return outputs.Success(config.BulkMaxSize, config.MaxRetries, client)
}

0 comments on commit ebcccf2

Please sign in to comment.