Skip to content

Commit

Permalink
Merge pull request #4222 from influxdb/graphite_tcp_blocking
Browse files Browse the repository at this point in the history
Graphite TCP should not block system shutdown
  • Loading branch information
otoolep committed Sep 28, 2015
2 parents b079d20 + 9de3125 commit 2db82ee
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 68 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)
- [#4222](https://github.com/influxdb/influxdb/pull/4222): Graphite TCP connections should not block shutdown
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor
- [#1577](https://github.com/influxdb/influxdb/issues/1577): selectors (e.g. min, max, first, last) should have equivalents to return the actual point

Expand Down
10 changes: 8 additions & 2 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,18 @@ func (m *Monitor) SetLogger(l *log.Logger) {
}

// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) {
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
return nil
}

// DeregisterDiagnosticsClient deregisters a diagnostics client by name.
func (m *Monitor) DeregisterDiagnosticsClient(name string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.diagRegistrations, name)
}

// Statistics returns the combined statistics for all expvar data. The given
Expand Down
134 changes: 68 additions & 66 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,6 @@ const (
leaderWaitTimeout = 30 * time.Second
)

// Initialize the graphite stats and diags
func init() {
tcpConnections = make(map[string]*tcpConnectionDiag)
}

// Package-level tracking of connections for diagnostics.
var monitorOnce sync.Once

type tcpConnectionDiag struct {
conn net.Conn
connectTime time.Time
}

var tcpConnectionsMu sync.Mutex
var tcpConnections map[string]*tcpConnectionDiag

func addConnection(c net.Conn) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()
tcpConnections[c.RemoteAddr().String()] = &tcpConnectionDiag{
conn: c,
connectTime: time.Now().UTC(),
}
}
func removeConnection(c net.Conn) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()
delete(tcpConnections, c.RemoteAddr().String())
}

func handleDiagnostics() (*monitor.Diagnostic, error) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()

d := &monitor.Diagnostic{
Columns: []string{"local", "remote", "connect time"},
Rows: make([][]interface{}, 0, len(tcpConnections)),
}
for _, v := range tcpConnections {
_ = v
d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime})
}
return d, nil
}

// statistics gathered by the graphite package.
const (
statPointsReceived = "points_rx"
Expand All @@ -82,6 +37,15 @@ const (
statConnectionsHandled = "connections_handled"
)

type tcpConnection struct {
conn net.Conn
connectTime time.Time
}

func (c *tcpConnection) Close() {
c.conn.Close()
}

type Service struct {
bindAddress string
database string
Expand All @@ -94,8 +58,10 @@ type Service struct {
batcher *tsdb.PointBatcher
parser *Parser

logger *log.Logger
statMap *expvar.Map
logger *log.Logger
statMap *expvar.Map
tcpConnectionsMu sync.Mutex
tcpConnections map[string]*tcpConnection

ln net.Listener
addr net.Addr
Expand All @@ -105,7 +71,8 @@ type Service struct {
done chan struct{}

Monitor interface {
RegisterDiagnosticsClient(name string, client monitor.DiagsClient) error
RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
DeregisterDiagnosticsClient(name string)
}
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -122,14 +89,15 @@ func NewService(c Config) (*Service, error) {
d := c.WithDefaults()

s := Service{
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
done: make(chan struct{}),
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
}

consistencyLevel, err := cluster.ParseConsistencyLevel(d.ConsistencyLevel)
Expand Down Expand Up @@ -161,14 +129,10 @@ func (s *Service) Open() error {
tags := map[string]string{"proto": s.protocol, "bind": s.bindAddress}
s.statMap = influxdb.NewStatistics(key, "graphite", tags)

// One Graphite service hooks up diagnostics for all Graphite functionality.
monitorOnce.Do(func() {
if s.Monitor == nil {
s.logger.Println("no monitor service available, no monitoring will be performed")
return
}
s.Monitor.RegisterDiagnosticsClient("graphite", monitor.DiagsClientFunc(handleDiagnostics))
})
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient(key, s)
}

if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
s.logger.Printf("Failed to detect a cluster leader: %s", err.Error())
Expand Down Expand Up @@ -202,9 +166,18 @@ func (s *Service) Open() error {
s.logger.Printf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String())
return nil
}
func (s *Service) closeAllConnections() {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
for _, c := range s.tcpConnections {
c.Close()
}
}

// Close stops all data processing on the Graphite input.
func (s *Service) Close() error {
s.closeAllConnections()

if s.ln != nil {
s.ln.Close()
}
Expand Down Expand Up @@ -262,11 +235,11 @@ func (s *Service) openTCPServer() (net.Addr, error) {
func (s *Service) handleTCPConnection(conn net.Conn) {
defer s.wg.Done()
defer conn.Close()
defer removeConnection(conn)
defer s.statMap.Add(statConnectionsActive, -1)
addConnection(conn)
defer s.untrackConnection(conn)
s.statMap.Add(statConnectionsActive, 1)
s.statMap.Add(statConnectionsHandled, 1)
s.trackConnection(conn)

reader := bufio.NewReader(conn)

Expand All @@ -286,6 +259,20 @@ func (s *Service) handleTCPConnection(conn net.Conn) {
}
}

func (s *Service) trackConnection(c net.Conn) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
s.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{
conn: c,
connectTime: time.Now().UTC(),
}
}
func (s *Service) untrackConnection(c net.Conn) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
delete(s.tcpConnections, c.RemoteAddr().String())
}

// openUDPServer opens the Graphite input in UDP mode and starts processing incoming data.
func (s *Service) openUDPServer() (net.Addr, error) {
addr, err := net.ResolveUDPAddr("udp", s.bindAddress)
Expand Down Expand Up @@ -370,3 +357,18 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
}
}
}

func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()

d := &monitor.Diagnostic{
Columns: []string{"local", "remote", "connect time"},
Rows: make([][]interface{}, 0, len(s.tcpConnections)),
}
for _, v := range s.tcpConnections {
_ = v
d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime})
}
return d, nil
}

0 comments on commit 2db82ee

Please sign in to comment.