Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TCP UDP start up into server.Start() #4903

Merged
merged 1 commit into from
Aug 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta1...master[Check the HEAD di

- Update init scripts to use the `test config` subcommand instead of the deprecated `-configtest` flag. {issue}4600[4600]
- Get by default the credentials for connecting to Kibana from the Elasticsearch output configuration. {pull}4867[4867]
- Move TCP UDP start up into `server.Start()` {pull}4903[4903]

*Auditbeat*

Expand Down
3 changes: 2 additions & 1 deletion metricbeat/helper/server/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewHttpServer(mb mb.BaseMetricSet) (server.Server, error) {
return h, nil
}

func (h *HttpServer) Start() {
func (h *HttpServer) Start() error {
go func() {

logp.Info("Starting http server on %s", h.server.Addr)
Expand All @@ -67,6 +67,7 @@ func (h *HttpServer) Start() {
}
}()

return nil
}

func (h *HttpServer) Stop() {
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/helper/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const (
// Server is an interface that can be used to implement servers which can accept data.
type Server interface {
// Start is used to start the server at a well defined port.
Start()
Start() error
// Stop the server.
Stop()
// Get a channel of events.
Expand Down
25 changes: 15 additions & 10 deletions metricbeat/helper/server/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"fmt"
"net"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type TcpServer struct {
tcpAddr *net.TCPAddr
listener *net.TCPListener
receiveBufferSize int
done chan struct{}
Expand Down Expand Up @@ -42,25 +45,27 @@ func NewTcpServer(base mb.BaseMetricSet) (server.Server, error) {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", config.Host, config.Port)
return &TcpServer{
listener: listener,
tcpAddr: addr,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *TcpServer) Start() {
go g.WatchMetrics()
func (g *TcpServer) Start() error {
listener, err := net.ListenTCP("tcp", g.tcpAddr)
if err != nil {
return errors.Wrap(err, "failed to start TCP server")
}
g.listener = listener
logp.Info("Started listening for TCP on: %s", g.tcpAddr.String())

go g.watchMetrics()
return nil
}

func (g *TcpServer) WatchMetrics() {
func (g *TcpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
Expand Down
14 changes: 7 additions & 7 deletions metricbeat/helper/server/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ func GetTestTcpServer(host string, port int) (server.Server, error) {
return nil, err
}

listener, err := net.ListenTCP("tcp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for TCP on: %s:%d", host, port)
return &TcpServer{
listener: listener,
tcpAddr: addr,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
Expand All @@ -43,7 +38,12 @@ func TestTcpServer(t *testing.T) {
t.FailNow()
}

svc.Start()
err = svc.Start()
if err != nil {
t.Error(err)
t.FailNow()
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()
Expand Down
26 changes: 16 additions & 10 deletions metricbeat/helper/server/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"fmt"
"net"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/mb"
)

type UdpServer struct {
udpaddr *net.UDPAddr
listener *net.UDPConn
receiveBufferSize int
done chan struct{}
Expand Down Expand Up @@ -43,25 +46,28 @@ func NewUdpServer(base mb.BaseMetricSet) (server.Server, error) {
return nil, err
}

listener, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for UDP on: %s:%d", config.Host, config.Port)
return &UdpServer{
listener: listener,
udpaddr: addr,
receiveBufferSize: config.ReceiveBufferSize,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
}, nil
}

func (g *UdpServer) Start() {
go g.WatchMetrics()
func (g *UdpServer) Start() error {
listener, err := net.ListenUDP("udp", g.udpaddr)
if err != nil {
return errors.Wrap(err, "failed to start UDP server")
}

logp.Info("Started listening for UDP on: %s", g.udpaddr.String())
g.listener = listener

go g.watchMetrics()
return nil
}

func (g *UdpServer) WatchMetrics() {
func (g *UdpServer) watchMetrics() {
buffer := make([]byte, g.receiveBufferSize)
for {
select {
Expand Down
12 changes: 6 additions & 6 deletions metricbeat/helper/server/udp/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,9 @@ func GetTestUdpServer(host string, port int) (server.Server, error) {
return nil, err
}

listener, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}

logp.Info("Started listening for UDP on: %s:%d", host, port)
return &UdpServer{
listener: listener,
udpaddr: addr,
receiveBufferSize: 1024,
done: make(chan struct{}),
eventQueue: make(chan server.Event),
Expand All @@ -44,6 +39,11 @@ func TestUdpServer(t *testing.T) {
}

svc.Start()
if err != nil {
t.Error(err)
t.FailNow()
}

defer svc.Stop()
writeToServer(t, "test1", host, port)
msg := <-svc.GetEvents()
Expand Down
10 changes: 9 additions & 1 deletion metricbeat/module/graphite/server/server.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package server

import (
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
serverhelper "github.com/elastic/beats/metricbeat/helper/server"
"github.com/elastic/beats/metricbeat/helper/server/tcp"
"github.com/elastic/beats/metricbeat/helper/server/udp"
Expand Down Expand Up @@ -61,7 +64,12 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Run method provides the Graphite server with a reporter with which events can be reported.
func (m *MetricSet) Run(reporter mb.PushReporter) {
// Start event watcher
m.server.Start()
if err := m.server.Start(); err != nil {
err = errors.Wrap(err, "failed to start graphite server")
logp.Err("%v", err)
reporter.Error(err)
return
}

for {
select {
Expand Down