From 91d0151634a134ae6faad90f654afdf6ca24b5d1 Mon Sep 17 00:00:00 2001 From: Prudhvi Chaitanya Dhulipalla Date: Tue, 23 Apr 2019 22:25:29 -0700 Subject: [PATCH 1/2] Upgrade datadog to 2.2.0 for UDS bug fixes --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- .../DataDog/datadog-go/statsd/options.go | 109 +++++++++++ .../DataDog/datadog-go/statsd/statsd.go | 171 +++++++++++++----- .../DataDog/datadog-go/statsd/udp.go | 33 ++++ .../DataDog/datadog-go/statsd/uds.go | 56 ------ .../DataDog/datadog-go/statsd/uds_async.go | 113 ++++++++++++ .../DataDog/datadog-go/statsd/uds_blocking.go | 92 ++++++++++ 8 files changed, 475 insertions(+), 107 deletions(-) create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/options.go create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/uds_async.go create mode 100644 vendor/github.com/DataDog/datadog-go/statsd/uds_blocking.go diff --git a/Gopkg.lock b/Gopkg.lock index d60975040..c4e811d0d 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,12 @@ [[projects]] - digest = "1:6d83aad9c98e13079ad8a4dbc740892edf5384bcd07f1d789bb66cd8ecfae2cd" + digest = "1:36975300f25d328c37e87d801e2e34038c53edcf15be8c2afb147f00de409386" name = "github.com/DataDog/datadog-go" packages = ["statsd"] pruneopts = "NUT" - revision = "e67964b4021ad3a334e748e8811eb3cd6becbc6e" - version = "2.1.0" + revision = "8a13fa761f51039f900738876f2837198fba804f" + version = "2.2.0" [[projects]] digest = "1:e30fbdce732588b475a444cbeabbcbef1c6c08b8d4f3efc142853551a1b0ab13" diff --git a/Gopkg.toml b/Gopkg.toml index f43b4d4fe..773fe4916 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -22,7 +22,7 @@ [[constraint]] name = "github.com/DataDog/datadog-go" - version = "2.1.0" + version = "2.2.0" [[constraint]] name = "github.com/Shopify/sarama" diff --git a/vendor/github.com/DataDog/datadog-go/statsd/options.go b/vendor/github.com/DataDog/datadog-go/statsd/options.go new file mode 100644 index 000000000..2c5a59cd5 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/options.go @@ -0,0 +1,109 @@ +package statsd + +import "time" + +var ( + // DefaultNamespace is the default value for the Namespace option + DefaultNamespace = "" + // DefaultTags is the default value for the Tags option + DefaultTags = []string{} + // DefaultBuffered is the default value for the Buffered option + DefaultBuffered = false + // DefaultMaxMessagesPerPayload is the default value for the MaxMessagesPerPayload option + DefaultMaxMessagesPerPayload = 16 + // DefaultAsyncUDS is the default value for the AsyncUDS option + DefaultAsyncUDS = false + // DefaultWriteTimeoutUDS is the default value for the WriteTimeoutUDS option + DefaultWriteTimeoutUDS = 1 * time.Millisecond +) + +// Options contains the configuration options for a client. +type Options struct { + // Namespace to prepend to all metrics, events and service checks name. + Namespace string + // Tags are global tags to be applied to every metrics, events and service checks. + Tags []string + // Buffered allows to pack multiple DogStatsD messages in one payload. Messages will be buffered + // until the total size of the payload exceeds MaxMessagesPerPayload metrics, events and/or service + // checks or after 100ms since the payload startedto be built. + Buffered bool + // MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain. + // Note that this option only takes effect when the client is buffered. + MaxMessagesPerPayload int + // AsyncUDS allows to switch between async and blocking mode for UDS. + // Blocking mode allows for error checking but does not guarentee that calls won't block the execution. + AsyncUDS bool + // WriteTimeoutUDS is the timeout after which a UDS packet is dropped. + WriteTimeoutUDS time.Duration +} + +func resolveOptions(options []Option) (*Options, error) { + o := &Options{ + Namespace: DefaultNamespace, + Tags: DefaultTags, + Buffered: DefaultBuffered, + MaxMessagesPerPayload: DefaultMaxMessagesPerPayload, + AsyncUDS: DefaultAsyncUDS, + WriteTimeoutUDS: DefaultWriteTimeoutUDS, + } + + for _, option := range options { + err := option(o) + if err != nil { + return nil, err + } + } + + return o, nil +} + +// Option is a client option. Can return an error if validation fails. +type Option func(*Options) error + +// WithNamespace sets the Namespace option. +func WithNamespace(namespace string) Option { + return func(o *Options) error { + o.Namespace = namespace + return nil + } +} + +// WithTags sets the Tags option. +func WithTags(tags []string) Option { + return func(o *Options) error { + o.Tags = tags + return nil + } +} + +// Buffered sets the Buffered option. +func Buffered() Option { + return func(o *Options) error { + o.Buffered = true + return nil + } +} + +// WithMaxMessagesPerPayload sets the MaxMessagesPerPayload option. +func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option { + return func(o *Options) error { + o.MaxMessagesPerPayload = maxMessagesPerPayload + return nil + } +} + +// WithAsyncUDS sets the AsyncUDS option. +func WithAsyncUDS() Option { + return func(o *Options) error { + o.AsyncUDS = true + return nil + } +} + +// WithWriteTimeoutUDS sets the WriteTimeoutUDS option. +func WithWriteTimeoutUDS(writeTimeoutUDS time.Duration) Option { + return func(o *Options) error { + o.WriteTimeoutUDS = writeTimeoutUDS + return nil + } +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go index 0ae4dac03..71a113cfc 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -29,6 +29,7 @@ import ( "fmt" "io" "math/rand" + "os" "strconv" "strings" "sync" @@ -60,6 +61,12 @@ traffic instead of UDP. */ const UnixAddressPrefix = "unix://" +// Client-side entity ID injection for container tagging +const ( + entityIDEnvName = "DD_ENTITY_ID" + entityIDTagName = "dd.internal.entity_id" +) + /* Stat suffixes */ @@ -96,7 +103,7 @@ type Client struct { // BufferLength is the length of the buffer in commands. bufferLength int flushTime time.Duration - commands []string + commands [][]byte buffer bytes.Buffer stop chan struct{} sync.Mutex @@ -104,82 +111,116 @@ type Client struct { // New returns a pointer to a new Client given an addr in the format "hostname:port" or // "unix:///path/to/socket". -func New(addr string) (*Client, error) { - if strings.HasPrefix(addr, UnixAddressPrefix) { - w, err := newUdsWriter(addr[len(UnixAddressPrefix)-1:]) - if err != nil { - return nil, err - } - return NewWithWriter(w) +func New(addr string, options ...Option) (*Client, error) { + o, err := resolveOptions(options) + if err != nil { + return nil, err + } + + var w statsdWriter + + if !strings.HasPrefix(addr, UnixAddressPrefix) { + w, err = newUDPWriter(addr) + } else if o.AsyncUDS { + w, err = newAsyncUdsWriter(addr[len(UnixAddressPrefix)-1:]) + } else { + w, err = newBlockingUdsWriter(addr[len(UnixAddressPrefix)-1:]) } - w, err := newUDPWriter(addr) if err != nil { return nil, err } - return NewWithWriter(w) + w.SetWriteTimeout(o.WriteTimeoutUDS) + + c := Client{ + Namespace: o.Namespace, + Tags: o.Tags, + writer: w, + } + + // Inject DD_ENTITY_ID as a constant tag if found + entityID := os.Getenv(entityIDEnvName) + if entityID != "" { + entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID) + c.Tags = append(c.Tags, entityTag) + } + + if o.Buffered { + c.bufferLength = o.MaxMessagesPerPayload + c.commands = make([][]byte, 0, o.MaxMessagesPerPayload) + c.flushTime = time.Millisecond * 100 + c.stop = make(chan struct{}, 1) + go c.watch() + } + + return &c, nil } // NewWithWriter creates a new Client with given writer. Writer is a // io.WriteCloser + SetWriteTimeout(time.Duration) error func NewWithWriter(w statsdWriter) (*Client, error) { client := &Client{writer: w, SkipErrors: false} + + // Inject DD_ENTITY_ID as a constant tag if found + entityID := os.Getenv(entityIDEnvName) + if entityID != "" { + entityTag := fmt.Sprintf("%s:%s", entityIDTagName, entityID) + client.Tags = append(client.Tags, entityTag) + } + return client, nil } // NewBuffered returns a Client that buffers its output and sends it in chunks. // Buflen is the length of the buffer in number of commands. +// +// When addr is empty, the client will default to a UDP client and use the DD_AGENT_HOST +// and (optionally) the DD_DOGSTATSD_PORT environment variables to build the target address. func NewBuffered(addr string, buflen int) (*Client, error) { - client, err := New(addr) - if err != nil { - return nil, err - } - client.bufferLength = buflen - client.commands = make([]string, 0, buflen) - client.flushTime = time.Millisecond * 100 - client.stop = make(chan struct{}, 1) - go client.watch() - return client, nil + return New(addr, Buffered(), WithMaxMessagesPerPayload(buflen)) } // format a message from its name, value, tags and rate. Also adds global // namespace and tags. -func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) string { - var buf bytes.Buffer +func (c *Client) format(name string, value interface{}, suffix []byte, tags []string, rate float64) []byte { + // preallocated buffer, stack allocated as long as it doesn't escape + buf := make([]byte, 0, 200) + if c.Namespace != "" { - buf.WriteString(c.Namespace) + buf = append(buf, c.Namespace...) } - buf.WriteString(name) - buf.WriteString(":") + buf = append(buf, name...) + buf = append(buf, ':') switch val := value.(type) { case float64: - buf.Write(strconv.AppendFloat([]byte{}, val, 'f', 6, 64)) + buf = strconv.AppendFloat(buf, val, 'f', 6, 64) case int64: - buf.Write(strconv.AppendInt([]byte{}, val, 10)) + buf = strconv.AppendInt(buf, val, 10) case string: - buf.WriteString(val) + buf = append(buf, val...) default: // do nothing } - buf.Write(suffix) + buf = append(buf, suffix...) if rate < 1 { - buf.WriteString(`|@`) - buf.WriteString(strconv.FormatFloat(rate, 'f', -1, 64)) + buf = append(buf, "|@"...) + buf = strconv.AppendFloat(buf, rate, 'f', -1, 64) } - writeTagString(&buf, c.Tags, tags) + buf = appendTagString(buf, c.Tags, tags) - return buf.String() + // non-zeroing copy to avoid referencing a larger than necessary underlying array + return append([]byte(nil), buf...) } // SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. func (c *Client) SetWriteTimeout(d time.Duration) error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } return c.writer.SetWriteTimeout(d) } @@ -203,7 +244,7 @@ func (c *Client) watch() { } } -func (c *Client) append(cmd string) error { +func (c *Client) append(cmd []byte) error { c.Lock() defer c.Unlock() c.commands = append(c.commands, cmd) @@ -216,7 +257,7 @@ func (c *Client) append(cmd string) error { return nil } -func (c *Client) joinMaxSize(cmds []string, sep string, maxSize int) ([][]byte, []int) { +func (c *Client) joinMaxSize(cmds [][]byte, sep string, maxSize int) ([][]byte, []int) { c.buffer.Reset() //clear buffer var frames [][]byte @@ -236,13 +277,13 @@ func (c *Client) joinMaxSize(cmds []string, sep string, maxSize int) ([][]byte, if elem != 0 { c.buffer.Write(sepBytes) } - c.buffer.WriteString(cmd) + c.buffer.Write(cmd) elem++ } else { frames = append(frames, copyAndResetBuffer(&c.buffer)) ncmds = append(ncmds, elem) // if cmd is bigger than maxSize it will get flushed on next loop - c.buffer.WriteString(cmd) + c.buffer.Write(cmd) elem = 1 } } @@ -266,7 +307,7 @@ func copyAndResetBuffer(buf *bytes.Buffer) []byte { // Flush forces a flush of the pending commands in the buffer func (c *Client) Flush() error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } c.Lock() defer c.Unlock() @@ -298,7 +339,7 @@ func (c *Client) flushLocked() error { return err } -func (c *Client) sendMsg(msg string) error { +func (c *Client) sendMsg(msg []byte) error { // return an error if message is bigger than MaxUDPPayloadSize if len(msg) > MaxUDPPayloadSize { return errors.New("message size exceeds MaxUDPPayloadSize") @@ -309,7 +350,7 @@ func (c *Client) sendMsg(msg string) error { return c.append(msg) } - _, err := c.writer.Write([]byte(msg)) + _, err := c.writer.Write(msg) if c.SkipErrors { return nil @@ -320,7 +361,7 @@ func (c *Client) sendMsg(msg string) error { // send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } if rate < 1 && rand.Float64() > rate { return nil @@ -378,13 +419,13 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r // Event sends the provided Event. func (c *Client) Event(e *Event) error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } stat, err := e.Encode(c.Tags...) if err != nil { return err } - return c.sendMsg(stat) + return c.sendMsg([]byte(stat)) } // SimpleEvent sends an event with the provided title and text. @@ -396,13 +437,13 @@ func (c *Client) SimpleEvent(title, text string) error { // ServiceCheck sends the provided ServiceCheck. func (c *Client) ServiceCheck(sc *ServiceCheck) error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } stat, err := sc.Encode(c.Tags...) if err != nil { return err } - return c.sendMsg(stat) + return c.sendMsg([]byte(stat)) } // SimpleServiceCheck sends an serviceCheck with the provided name and status. @@ -414,7 +455,7 @@ func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) erro // Close the client connection. func (c *Client) Close() error { if c == nil { - return nil + return fmt.Errorf("Client is nil") } select { case c.stop <- struct{}{}: @@ -678,3 +719,39 @@ func writeTagString(w io.Writer, tagList1, tagList2 []string) { io.WriteString(w, removeNewlines(tag)) } } + +func appendTagString(buf []byte, tagList1, tagList2 []string) []byte { + if len(tagList1) == 0 { + if len(tagList2) == 0 { + return buf + } + tagList1 = tagList2 + tagList2 = nil + } + + buf = append(buf, "|#"...) + buf = appendWithoutNewlines(buf, tagList1[0]) + for _, tag := range tagList1[1:] { + buf = append(buf, ',') + buf = appendWithoutNewlines(buf, tag) + } + for _, tag := range tagList2 { + buf = append(buf, ',') + buf = appendWithoutNewlines(buf, tag) + } + return buf +} + +func appendWithoutNewlines(buf []byte, s string) []byte { + // fastpath for strings without newlines + if strings.IndexByte(s, '\n') == -1 { + return append(buf, s...) + } + + for _, b := range []byte(s) { + if b != '\n' { + buf = append(buf, b) + } + } + return buf +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/udp.go b/vendor/github.com/DataDog/datadog-go/statsd/udp.go index 8af522c5b..9ddff421c 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/udp.go +++ b/vendor/github.com/DataDog/datadog-go/statsd/udp.go @@ -2,10 +2,18 @@ package statsd import ( "errors" + "fmt" "net" + "os" "time" ) +const ( + autoHostEnvName = "DD_AGENT_HOST" + autoPortEnvName = "DD_DOGSTATSD_PORT" + defaultUDPPort = "8125" +) + // udpWriter is an internal class wrapping around management of UDP connection type udpWriter struct { conn net.Conn @@ -13,6 +21,13 @@ type udpWriter struct { // New returns a pointer to a new udpWriter given an addr in the format "hostname:port". func newUDPWriter(addr string) (*udpWriter, error) { + if addr == "" { + addr = addressFromEnvironment() + } + if addr == "" { + return nil, errors.New("No address passed and autodetection from environment failed") + } + udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err @@ -38,3 +53,21 @@ func (w *udpWriter) Write(data []byte) (int, error) { func (w *udpWriter) Close() error { return w.conn.Close() } + +func (w *udpWriter) remoteAddr() net.Addr { + return w.conn.RemoteAddr() +} + +func addressFromEnvironment() string { + autoHost := os.Getenv(autoHostEnvName) + if autoHost == "" { + return "" + } + + autoPort := os.Getenv(autoPortEnvName) + if autoPort == "" { + autoPort = defaultUDPPort + } + + return fmt.Sprintf("%s:%s", autoHost, autoPort) +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/uds.go b/vendor/github.com/DataDog/datadog-go/statsd/uds.go index 31154ab4d..cc2537e00 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/uds.go +++ b/vendor/github.com/DataDog/datadog-go/statsd/uds.go @@ -1,7 +1,6 @@ package statsd import ( - "net" "time" ) @@ -10,58 +9,3 @@ UDSTimeout holds the default timeout for UDS socket writes, as they can get blocking when the receiving buffer is full. */ const defaultUDSTimeout = 1 * time.Millisecond - -// udsWriter is an internal class wrapping around management of UDS connection -type udsWriter struct { - // Address to send metrics to, needed to allow reconnection on error - addr net.Addr - // Established connection object, or nil if not connected yet - conn net.Conn - // write timeout - writeTimeout time.Duration -} - -// New returns a pointer to a new udsWriter given a socket file path as addr. -func newUdsWriter(addr string) (*udsWriter, error) { - udsAddr, err := net.ResolveUnixAddr("unixgram", addr) - if err != nil { - return nil, err - } - // Defer connection to first Write - writer := &udsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} - return writer, nil -} - -// SetWriteTimeout allows the user to set a custom write timeout -func (w *udsWriter) SetWriteTimeout(d time.Duration) error { - w.writeTimeout = d - return nil -} - -// Write data to the UDS connection with write timeout and minimal error handling: -// create the connection if nil, and destroy it if the statsd server has disconnected -func (w *udsWriter) Write(data []byte) (int, error) { - // Try connecting (first packet or connection lost) - if w.conn == nil { - conn, err := net.Dial(w.addr.Network(), w.addr.String()) - if err != nil { - return 0, err - } - w.conn = conn - } - w.conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) - n, e := w.conn.Write(data) - if e != nil { - // Statsd server disconnected, retry connecting at next packet - w.conn = nil - return 0, e - } - return n, e -} - -func (w *udsWriter) Close() error { - if w.conn != nil { - return w.conn.Close() - } - return nil -} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/uds_async.go b/vendor/github.com/DataDog/datadog-go/statsd/uds_async.go new file mode 100644 index 000000000..39d4ccb23 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/uds_async.go @@ -0,0 +1,113 @@ +package statsd + +import ( + "fmt" + "net" + "time" +) + +// asyncUdsWriter is an internal class wrapping around management of UDS connection +type asyncUdsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + // Established connection object, or nil if not connected yet + conn net.Conn + // write timeout + writeTimeout time.Duration + // datagramQueue is the queue of datagrams ready to be sent + datagramQueue chan []byte + stopChan chan struct{} +} + +// New returns a pointer to a new asyncUdsWriter given a socket file path as addr. +func newAsyncUdsWriter(addr string) (*asyncUdsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + + writer := &asyncUdsWriter{ + addr: udsAddr, + conn: nil, + writeTimeout: defaultUDSTimeout, + // 8192 * 8KB = 65.5MB + datagramQueue: make(chan []byte, 8192), + stopChan: make(chan struct{}, 1), + } + + go writer.sendLoop() + return writer, nil +} + +func (w *asyncUdsWriter) sendLoop() { + for { + select { + case datagram := <-w.datagramQueue: + w.write(datagram) + case <-w.stopChan: + return + } + } +} + +// SetWriteTimeout allows the user to set a custom write timeout +func (w *asyncUdsWriter) SetWriteTimeout(d time.Duration) error { + w.writeTimeout = d + return nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *asyncUdsWriter) Write(data []byte) (int, error) { + select { + case w.datagramQueue <- data: + return len(data), nil + default: + return 0, fmt.Errorf("uds datagram queue is full (the agent might not be able to keep up)") + } +} + +// write writes the given data to the UDS. +// This function is **not** thread safe. +func (w *asyncUdsWriter) write(data []byte) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + + conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + n, err := conn.Write(data) + + if e, isNetworkErr := err.(net.Error); !isNetworkErr || !e.Temporary() { + // err is not temporary, Statsd server disconnected, retry connecting at next packet + w.unsetConnection() + return 0, e + } + + return n, err +} + +func (w *asyncUdsWriter) Close() error { + close(w.stopChan) + if w.conn != nil { + return w.conn.Close() + } + return nil +} + +func (w *asyncUdsWriter) ensureConnection() (net.Conn, error) { + if w.conn != nil { + return w.conn, nil + } + + newConn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return nil, err + } + w.conn = newConn + return newConn, nil +} + +func (w *asyncUdsWriter) unsetConnection() { + w.conn = nil +} diff --git a/vendor/github.com/DataDog/datadog-go/statsd/uds_blocking.go b/vendor/github.com/DataDog/datadog-go/statsd/uds_blocking.go new file mode 100644 index 000000000..70ee99ab3 --- /dev/null +++ b/vendor/github.com/DataDog/datadog-go/statsd/uds_blocking.go @@ -0,0 +1,92 @@ +package statsd + +import ( + "net" + "sync" + "time" +) + +// blockingUdsWriter is an internal class wrapping around management of UDS connection +type blockingUdsWriter struct { + // Address to send metrics to, needed to allow reconnection on error + addr net.Addr + // Established connection object, or nil if not connected yet + conn net.Conn + // write timeout + writeTimeout time.Duration + sync.RWMutex // used to lock conn / writer can replace it +} + +// New returns a pointer to a new blockingUdsWriter given a socket file path as addr. +func newBlockingUdsWriter(addr string) (*blockingUdsWriter, error) { + udsAddr, err := net.ResolveUnixAddr("unixgram", addr) + if err != nil { + return nil, err + } + // Defer connection to first Write + writer := &blockingUdsWriter{addr: udsAddr, conn: nil, writeTimeout: defaultUDSTimeout} + return writer, nil +} + +// SetWriteTimeout allows the user to set a custom write timeout +func (w *blockingUdsWriter) SetWriteTimeout(d time.Duration) error { + w.writeTimeout = d + return nil +} + +// Write data to the UDS connection with write timeout and minimal error handling: +// create the connection if nil, and destroy it if the statsd server has disconnected +func (w *blockingUdsWriter) Write(data []byte) (int, error) { + conn, err := w.ensureConnection() + if err != nil { + return 0, err + } + + conn.SetWriteDeadline(time.Now().Add(w.writeTimeout)) + n, e := conn.Write(data) + + if err, isNetworkErr := e.(net.Error); !isNetworkErr || !err.Temporary() { + // Statsd server disconnected, retry connecting at next packet + w.unsetConnection() + return 0, e + } + return n, e +} + +func (w *blockingUdsWriter) Close() error { + if w.conn != nil { + return w.conn.Close() + } + return nil +} + +func (w *blockingUdsWriter) ensureConnection() (net.Conn, error) { + // Check if we've already got a socket we can use + w.RLock() + currentConn := w.conn + w.RUnlock() + + if currentConn != nil { + return currentConn, nil + } + + // Looks like we might need to connect - try again with write locking. + w.Lock() + defer w.Unlock() + if w.conn != nil { + return w.conn, nil + } + + newConn, err := net.Dial(w.addr.Network(), w.addr.String()) + if err != nil { + return nil, err + } + w.conn = newConn + return newConn, nil +} + +func (w *blockingUdsWriter) unsetConnection() { + w.Lock() + defer w.Unlock() + w.conn = nil +} From c0f2e208c9967d3669551cd80dfd249ccf5593b7 Mon Sep 17 00:00:00 2001 From: Prudhvi Chaitanya Dhulipalla Date: Fri, 5 Jul 2019 14:14:25 -0700 Subject: [PATCH 2/2] Fixing specs after upgrading datadog --- Gopkg.lock | 5 ++--- Gopkg.toml | 2 +- scopedstatsd/client_test.go | 6 +++-- .../DataDog/datadog-go/statsd/statsd.go | 22 ++++++++++++++----- 4 files changed, 23 insertions(+), 12 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index c4e811d0d..0ff626f26 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -2,12 +2,11 @@ [[projects]] - digest = "1:36975300f25d328c37e87d801e2e34038c53edcf15be8c2afb147f00de409386" + digest = "1:6c364e65eec127e77ec8b0696f1422890af3c19a252f2b72b98784ab24b9752e" name = "github.com/DataDog/datadog-go" packages = ["statsd"] pruneopts = "NUT" - revision = "8a13fa761f51039f900738876f2837198fba804f" - version = "2.2.0" + revision = "f6e76752dd64e7329d6b314f92a08748a78c2250" [[projects]] digest = "1:e30fbdce732588b475a444cbeabbcbef1c6c08b8d4f3efc142853551a1b0ab13" diff --git a/Gopkg.toml b/Gopkg.toml index 773fe4916..a954dd6bd 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -22,7 +22,7 @@ [[constraint]] name = "github.com/DataDog/datadog-go" - version = "2.2.0" + revision = "f6e76752dd64e7329d6b314f92a08748a78c2250" [[constraint]] name = "github.com/Shopify/sarama" diff --git a/scopedstatsd/client_test.go b/scopedstatsd/client_test.go index 44d3438d1..503465268 100644 --- a/scopedstatsd/client_test.go +++ b/scopedstatsd/client_test.go @@ -3,6 +3,8 @@ package scopedstatsd import ( "testing" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) @@ -10,7 +12,7 @@ func TestEnsure(t *testing.T) { var theNilOne Client = nil ensured := Ensure(theNilOne) assert.NotNil(t, ensured) - assert.NoError(t, ensured.Count("hi", 0, nil, 1.0)) + assert.Error(t, errors.New("statsd client is nil"), ensured.Count("hi", 0, nil, 1.0)) } func TestDoesSomething(t *testing.T) { @@ -49,7 +51,7 @@ func TestDoesSomething(t *testing.T) { }, } for _, fn := range testFuncs { - assert.NoError(t, fn()) + assert.Error(t, errors.New("statsd client is nil"), fn()) } }) } diff --git a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go index 71a113cfc..2557f2d4a 100644 --- a/vendor/github.com/DataDog/datadog-go/statsd/statsd.go +++ b/vendor/github.com/DataDog/datadog-go/statsd/statsd.go @@ -67,6 +67,16 @@ const ( entityIDTagName = "dd.internal.entity_id" ) +type noClientErr string + +// ErrNoClient is returned if statsd reporting methods are invoked on +// a nil client. +const ErrNoClient = noClientErr("statsd client is nil") + +func (e noClientErr) Error() string { + return string(e) +} + /* Stat suffixes */ @@ -220,7 +230,7 @@ func (c *Client) format(name string, value interface{}, suffix []byte, tags []st // SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP. func (c *Client) SetWriteTimeout(d time.Duration) error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } return c.writer.SetWriteTimeout(d) } @@ -307,7 +317,7 @@ func copyAndResetBuffer(buf *bytes.Buffer) []byte { // Flush forces a flush of the pending commands in the buffer func (c *Client) Flush() error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } c.Lock() defer c.Unlock() @@ -361,7 +371,7 @@ func (c *Client) sendMsg(msg []byte) error { // send handles sampling and sends the message over UDP. It also adds global namespace prefixes and tags. func (c *Client) send(name string, value interface{}, suffix []byte, tags []string, rate float64) error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } if rate < 1 && rand.Float64() > rate { return nil @@ -419,7 +429,7 @@ func (c *Client) TimeInMilliseconds(name string, value float64, tags []string, r // Event sends the provided Event. func (c *Client) Event(e *Event) error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } stat, err := e.Encode(c.Tags...) if err != nil { @@ -437,7 +447,7 @@ func (c *Client) SimpleEvent(title, text string) error { // ServiceCheck sends the provided ServiceCheck. func (c *Client) ServiceCheck(sc *ServiceCheck) error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } stat, err := sc.Encode(c.Tags...) if err != nil { @@ -455,7 +465,7 @@ func (c *Client) SimpleServiceCheck(name string, status ServiceCheckStatus) erro // Close the client connection. func (c *Client) Close() error { if c == nil { - return fmt.Errorf("Client is nil") + return ErrNoClient } select { case c.stop <- struct{}{}: