diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 63c0f091a887..eb41639e18cb 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -12,55 +12,68 @@ }, { "ImportPath": "github.com/elastic/libbeat/beat", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/cfgfile", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/common", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/logp", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/outputs", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/publisher", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/service", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/cfgfile", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/common", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/logp", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/outputs", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/publisher", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/elastic/libbeat/service", - "Rev": "3ba2a2c92c8e8ead89bd51cb27281e757f6d3313" + "Comment": "v1.0.0-rc1-49-gd16e409", + "Rev": "d16e4096e4074a5fddbc82de03b00f0a40e3ad4e" }, { "ImportPath": "github.com/garyburd/redigo/internal", diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/common/streambuf/streambuf.go b/Godeps/_workspace/src/github.com/elastic/libbeat/common/streambuf/streambuf.go index 278f75ef396d..07b654babb8f 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/common/streambuf/streambuf.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/common/streambuf/streambuf.go @@ -335,3 +335,73 @@ func (b *Buffer) CollectWithSuffix(count int, delim []byte) ([]byte, error) { b.Advance(total) return data, nil } + +// Index returns offset of seq in unprocessed buffer. +// Returns -1 if seq can not be found. +func (b *Buffer) Index(seq []byte) int { + return b.IndexFrom(0, seq) +} + +// IndexFrom returns offset of seq in unprocessed buffer start at from. +// Returns -1 if seq can not be found. +func (b *Buffer) IndexFrom(from int, seq []byte) int { + if b.err != nil { + return -1 + } + + idx := bytes.Index(b.data[b.mark+from:], seq) + if idx < 0 { + return -1 + } + + return idx + from + b.mark +} + +// IndexByte returns offset of byte in unprocessed buffer. +// Returns -1 if byte not in buffer. +func (b *Buffer) IndexByte(byte byte) int { + if b.err != nil { + return -1 + } + + idx := bytes.IndexByte(b.data[b.mark:], byte) + if idx < 0 { + return -1 + } + return idx + b.mark +} + +// CollectUntil collects all bytes until delim was found (including delim). +func (b *Buffer) CollectUntil(delim []byte) ([]byte, error) { + if b.err != nil { + return nil, b.err + } + + idx := bytes.Index(b.data[b.mark:], delim) + if idx < 0 { + return nil, b.bufferEndError() + } + + end := b.mark + idx + len(delim) + data := b.data[b.mark:end] + b.Advance(len(data)) + return data, nil +} + +// CollectUntilByte collects all bytes until delim was found (including delim). +func (b *Buffer) CollectUntilByte(delim byte) ([]byte, error) { + if b.err != nil { + return nil, b.err + } + + idx := bytes.IndexByte(b.data[b.offset:], delim) + if idx < 0 { + b.offset = b.mark + b.available + return nil, b.bufferEndError() + } + + end := b.offset + idx + 1 + data := b.data[b.mark:end] + b.Advance(len(data)) + return data, nil +} diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go b/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go index 5bfac072c907..dc25c5452023 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/logp/log.go @@ -5,6 +5,7 @@ import ( "log" "os" "runtime/debug" + "time" ) type Priority int @@ -58,6 +59,8 @@ func send(calldepth int, level Priority, prefix string, format string, v ...inte _log.logger.Output(calldepth, fmt.Sprintf(prefix+format, v...)) } if _log.toFile { + // Creates a timestamp for the file log message and formats it + prefix = time.Now().Format(time.RFC3339) + " " + prefix _log.rotator.WriteLine([]byte(fmt.Sprintf(prefix+format, v...))) } } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go index 7f465da0a2ce..b1d962494fa6 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/elasticsearch/client.go @@ -63,6 +63,9 @@ func (client *Client) Clone() *Client { return newClient } +// PublishEvents sends all events to elasticsearch. On error a slice with all +// events not published or confirmed to be processed by elasticsearch will be +// returned. The input slice backing memory will be reused by return the value. func (client *Client) PublishEvents( events []common.MapStr, ) ([]common.MapStr, error) { diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go index f17d2830f246..0b6209705302 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/client.go @@ -24,17 +24,15 @@ type lumberjackClient struct { TransportClient windowSize int maxOkWindowSize int // max window size sending was successful for + maxWindowSize int timeout time.Duration countTimeoutErr int } -// TODO: make max window size configurable const ( minWindowSize int = 1 - maxWindowSize int = 1024 defaultStartMaxWindowSize int = 10 - - maxAllowedTimeoutErr int = 3 + maxAllowedTimeoutErr int = 3 ) // errors @@ -52,11 +50,16 @@ var ( codeCompressed = []byte{codeVersion, 'C'} ) -func newLumberjackClient(conn TransportClient, timeout time.Duration) *lumberjackClient { +func newLumberjackClient( + conn TransportClient, + maxWindowSize int, + timeout time.Duration, +) *lumberjackClient { return &lumberjackClient{ TransportClient: conn, windowSize: defaultStartMaxWindowSize, timeout: timeout, + maxWindowSize: maxWindowSize, } } @@ -65,11 +68,27 @@ func (l *lumberjackClient) PublishEvent(event common.MapStr) error { return err } +// PublishEvents sends all events to logstash. On error a slice with all events +// not published or confirmed to be processed by logstash will be returned. func (l *lumberjackClient) PublishEvents( events []common.MapStr, ) ([]common.MapStr, error) { + for len(events) > 0 { + n, err := l.publishWindowed(events) + events = events[n:] + if err != nil { + return events, err + } + } + return nil, nil +} + +// publishWindowed published events with current maximum window size to logstash +// returning the total number of events send (due to window size, or acks until +// failure). +func (l *lumberjackClient) publishWindowed(events []common.MapStr) (int, error) { if len(events) == 0 { - return nil, nil + return 0, nil } // prepare message payload @@ -78,7 +97,7 @@ func (l *lumberjackClient) PublishEvents( } count, payload, err := l.compressEvents(events) if err != nil { - return events, err + return 0, err } if count == 0 { @@ -86,17 +105,17 @@ func (l *lumberjackClient) PublishEvents( // as exported so no one tries to send/encode the same events once again // The compress/encode function already prints critical per failed encoding // failure. - return nil, nil + return len(events), nil } // send window size: if err = l.sendWindowSize(count); err != nil { - return l.onFail(events, err) + return l.onFail(0, err) } // send payload if err = l.sendCompressed(payload); err != nil { - return l.onFail(events, err) + return l.onFail(0, err) } // wait for ACK (accept partial ACK to reset timeout) @@ -106,7 +125,7 @@ func (l *lumberjackClient) PublishEvents( // read until all acks ackSeq, err = l.readACK() if err != nil { - return l.onFail(events[ackSeq:], err) + return l.onFail(int(ackSeq), err) } } @@ -116,10 +135,10 @@ func (l *lumberjackClient) PublishEvents( if l.maxOkWindowSize < l.windowSize { l.maxOkWindowSize = l.windowSize - if l.windowSize < maxWindowSize { + if l.windowSize < l.maxWindowSize { l.windowSize = l.windowSize + l.windowSize/2 - if l.windowSize > maxWindowSize { - l.windowSize = maxWindowSize + if l.windowSize > l.maxWindowSize { + l.windowSize = l.maxWindowSize } } } else if l.windowSize < l.maxOkWindowSize { @@ -129,36 +148,33 @@ func (l *lumberjackClient) PublishEvents( } } - return nil, nil + return len(events), nil } -func (l *lumberjackClient) onFail( - events []common.MapStr, - err error, -) ([]common.MapStr, error) { +func (l *lumberjackClient) onFail(n int, err error) (int, error) { // if timeout error, back off and ignore error nerr, ok := err.(net.Error) if !ok || !nerr.Timeout() { // no timeout error, close connection and return error _ = l.Close() - return events, err + return n, err } // if we've seen 3 consecutive timeout errors, close connection l.countTimeoutErr++ if l.countTimeoutErr == maxAllowedTimeoutErr { _ = l.Close() - return events, err + return n, err } // timeout error. reduce window size and return 0 published events. Send // mode might try to publish again with reduce window size or ask another - // client to send events (round robin load balancer) + // client to send events l.windowSize = l.windowSize / 2 if l.windowSize < minWindowSize { l.windowSize = minWindowSize } - return events, nil + return n, nil } func (l *lumberjackClient) compressEvents( diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go index dda47e3ff8c4..fbe92370f3f6 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/logstash/logstash.go @@ -45,6 +45,7 @@ const ( logstashDefaultTimeout = 30 * time.Second logstasDefaultMaxTimeout = 90 * time.Second defaultSendRetries = 3 + defaultMaxWindowSize = 1024 ) var waitRetry = time.Duration(1) * time.Second @@ -57,11 +58,7 @@ func (lj *logstash) init( config outputs.MothershipConfig, topologyExpire int, ) error { - useTLS := false - if config.TLS != nil { - useTLS = !config.TLS.Disabled - } - + useTLS := (config.TLS != nil) timeout := logstashDefaultTimeout if config.Timeout != 0 { timeout = time.Duration(config.Timeout) * time.Second @@ -72,6 +69,11 @@ func (lj *logstash) init( defaultPort = config.Port } + maxWindowSize := defaultMaxWindowSize + if config.BulkMaxSize != nil { + maxWindowSize = *config.BulkMaxSize + } + var clients []mode.ProtocolClient var err error if useTLS { @@ -81,15 +83,17 @@ func (lj *logstash) init( return err } - clients, err = mode.MakeClients(config, makeClientFactory(timeout, - func(host string) (TransportClient, error) { - return newTLSClient(host, defaultPort, tlsConfig) - })) + clients, err = mode.MakeClients(config, + makeClientFactory(maxWindowSize, timeout, + func(host string) (TransportClient, error) { + return newTLSClient(host, defaultPort, tlsConfig) + })) } else { - clients, err = mode.MakeClients(config, makeClientFactory(timeout, - func(host string) (TransportClient, error) { - return newTCPClient(host, defaultPort) - })) + clients, err = mode.MakeClients(config, + makeClientFactory(maxWindowSize, timeout, + func(host string) (TransportClient, error) { + return newTCPClient(host, defaultPort) + })) } if err != nil { return err @@ -131,6 +135,7 @@ func (lj *logstash) init( } func makeClientFactory( + maxWindowSize int, timeout time.Duration, makeTransp func(string) (TransportClient, error), ) func(string) (mode.ProtocolClient, error) { @@ -139,7 +144,7 @@ func makeClientFactory( if err != nil { return nil, err } - return newLumberjackClient(transp, timeout), nil + return newLumberjackClient(transp, maxWindowSize, timeout), nil } } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go index 7d51b3c3b3f8..9ccc91ce859b 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/outputs.go @@ -8,7 +8,6 @@ import ( ) type MothershipConfig struct { - Enabled bool Save_topology bool Host string Port int @@ -97,7 +96,7 @@ func InitOutputs( var plugins []OutputPlugin = nil for name, plugin := range enabledOutputPlugins { config, exists := configs[name] - if !exists || !config.Enabled { + if !exists { continue } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/tls.go b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/tls.go index f05ee9f8c5f0..01c169e20ea6 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/tls.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/outputs/tls.go @@ -32,7 +32,6 @@ var ( // TLSConfig defines config file options for TLS clients. type TLSConfig struct { - Disabled bool `yaml:"disabled"` Certificate string `yaml:"certificate"` CertificateKey string `yaml:"certificate_key"` CAs []string `yaml:"certificate_authorities"` @@ -48,7 +47,7 @@ type TLSConfig struct { // will be configured. If no CAs are configured, the host CA will be used by go // built-in TLS support. func LoadTLSConfig(config *TLSConfig) (*tls.Config, error) { - if config == nil || config.Disabled { + if config == nil { return nil, nil } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go index 5aa800ea490f..6a6354bfa39b 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/preprocess.go @@ -153,11 +153,11 @@ func updateEventAddresses(publisher *PublisherType, event common.MapStr) bool { //get the direction of the transaction: outgoing (as client)/incoming (as server) if publisher.IsPublisherIP(dst.Ip) { - // outgoing transaction - event["direction"] = "out" - } else { - //incoming transaction + // incoming transaction event["direction"] = "in" + } else { + //outgoing transaction + event["direction"] = "out" } } diff --git a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go index 40fd51a21ec0..31298e64647a 100644 --- a/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go +++ b/Godeps/_workspace/src/github.com/elastic/libbeat/publisher/worker.go @@ -60,9 +60,9 @@ func (p *messageWorker) run() { } func (p *messageWorker) shutdown() { + p.handler.onStop() stopQueue(p.queue) p.ws.wg.Done() - p.handler.onStop() } func (p *messageWorker) send(m message) { diff --git a/etc/topbeat.yml b/etc/topbeat.yml index 938f87fd0e28..c3bcdb975e15 100644 --- a/etc/topbeat.yml +++ b/etc/topbeat.yml @@ -21,22 +21,18 @@ input: ############################# Output ########################################## # Configure what outputs to use when sending the data collected by the beat. -# You can enable one or multiple outputs by setting enabled option to true. +# Multiple outputs may be used. output: ### Elasticsearch as output elasticsearch: - - # Set to true to enable elasticsearch output - enabled: true - # Array of hosts to connect to. # Scheme and port can be left out and will be set to the default (http and 9200) # In case you specify and additional path, the scheme is required: http://localhost:9200/path # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Optional protocol and basic auth credentials. These are deprectated. + # Optional protocol and basic auth credentials. These are deprecated. #protocol: "https" #username: "admin" #password: "s3cr3t" @@ -66,22 +62,18 @@ output: # The number of seconds to wait for new events between two bulk API index requests. # If `bulk_max_size` is reached before this interval expires, addition bulk index # requests are made. - #flush_interval + #flush_interval: 1 # Boolean that sets if the topology is kept in Elasticsearch. The default is # false. This option makes sense only for Packetbeat. - #save_topology: + #save_topology: false - #The time to live in seconds for the topology information that is stored in - #Elasticsearch. The default is 15 seconds. - #topology_expire: + # The time to live in seconds for the topology information that is stored in + # Elasticsearch. The default is 15 seconds. + #topology_expire: 15 # tls configuration. By default is off. #tls: - # If disabled is set to true, the tls section will be ignored and the - # host its certificate authorities will be used. - #disabled: true - # List of root certificates for HTTPS server verifications #certificate_authorities: ["/etc/pki/root/ca.pem"] @@ -109,12 +101,9 @@ output: # Configure maximum TLS version allowed for connection to logstash #max_version: 1.2 - ### Logstash as output + ### Logstash as output #logstash: - # Uncomment out this option if you want to output to Logstash. The default is false. - #enabled: true - # The Logstash hosts #hosts: ["localhost:5044"] @@ -131,8 +120,6 @@ output: # Optional TLS. By default is off. #tls: - #disabled: true - # List of root certificates for HTTPS server verifications #certificate_authorities: ["/etc/pki/root/ca.pem"] @@ -155,12 +142,8 @@ output: #curve_types: [] - ### File as output #file: - # Enabling file output - #enabled: false - # Path to the directory where to save the generated files. The option is mandatory. #path: "/tmp/topbeat" @@ -176,12 +159,10 @@ output: # is 7 files. #number_of_files: 7 + ### Console output # console: - # Enabling console output - #enabled: false - - # pretty print json event + # Pretty print json event #pretty: false diff --git a/scripts/Makefile b/scripts/Makefile index ea9238a48eca..55db0236235a 100644 --- a/scripts/Makefile +++ b/scripts/Makefile @@ -1,4 +1,3 @@ -#!/bin/bash ### VARIABLE SETUP ###