From d6f40cdb554ee6218201434b4fa6c88033e51c99 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 17 May 2016 09:13:32 -0600 Subject: [PATCH 1/7] initial http subscriber support --- services/subscriber/http.go | 36 ++++++++ services/subscriber/service.go | 148 ++++++++++++++++++--------------- 2 files changed, 115 insertions(+), 69 deletions(-) create mode 100644 services/subscriber/http.go diff --git a/services/subscriber/http.go b/services/subscriber/http.go new file mode 100644 index 00000000000..44d53f9717a --- /dev/null +++ b/services/subscriber/http.go @@ -0,0 +1,36 @@ +package subscriber + +import ( + "github.com/influxdata/influxdb/client/v2" + "github.com/influxdata/influxdb/coordinator" +) + +// HTTP supports writing points over HTTP using the line protocol. +type HTTP struct { + c client.Client +} + +// NewHTTP returns a new HTTP points writer with default options. +func NewHTTP(addr string) (*HTTP, error) { + conf := client.HTTPConfig{ + Addr: addr, + } + c, err := client.NewHTTPClient(conf) + if err != nil { + return nil, err + } + return &HTTP{c: c}, nil +} + +// WritePoints writes points over HTTP transport. +func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) { + bp, _ := client.NewBatchPoints(client.BatchPointsConfig{ + Database: p.Database, + RetentionPolicy: p.RetentionPolicy, + }) + for _, p := range p.Points { + bp.AddPoint(client.NewPointFrom(p)) + } + err = h.c.Write(bp) + return +} diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 1e6c071bbe0..36b4bcba365 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -1,6 +1,7 @@ package subscriber // import "github.com/influxdata/influxdb/services/subscriber" import ( + "errors" "expvar" "fmt" "io" @@ -38,13 +39,13 @@ type subEntry struct { // to defined third party destinations. // Subscriptions are defined per database and retention policy. type Service struct { - subs map[subEntry]PointsWriter MetaClient interface { Databases() []meta.DatabaseInfo WaitForDataChanged() chan struct{} } NewPointsWriter func(u url.URL) (PointsWriter, error) Logger *log.Logger + update chan struct{} statMap *expvar.Map points chan *coordinator.WritePointsRequest wg sync.WaitGroup @@ -56,13 +57,11 @@ type Service struct { // NewService returns a subscriber service with given settings func NewService(c Config) *Service { return &Service{ - subs: make(map[subEntry]PointsWriter), NewPointsWriter: newPointsWriter, Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), - points: make(chan *coordinator.WritePointsRequest), + points: make(chan *coordinator.WritePointsRequest, 100), closed: true, - closing: make(chan struct{}), } } @@ -70,21 +69,17 @@ func NewService(c Config) *Service { func (s *Service) Open() error { s.mu.Lock() defer s.mu.Unlock() - - s.closing = make(chan struct{}) - if s.MetaClient == nil { - panic("no meta store") + return errors.New("no meta store") } s.closed = false - // Perform initial update - s.Update() + s.closing = make(chan struct{}) + s.update = make(chan struct{}) - s.wg.Add(1) - go s.writePoints() - // Do not wait for this goroutine since it block until a meta change occurs. + s.wg.Add(2) + go s.run() go s.waitForMetaUpdates() s.Logger.Println("opened service") @@ -95,14 +90,10 @@ func (s *Service) Open() error { func (s *Service) Close() error { s.mu.Lock() defer s.mu.Unlock() - close(s.points) s.closed = true - select { - case <-s.closing: - // do nothing - default: - close(s.closing) - } + + close(s.points) + close(s.closing) s.wg.Wait() s.Logger.Println("closed service") @@ -116,61 +107,30 @@ func (s *Service) SetLogOutput(w io.Writer) { } func (s *Service) waitForMetaUpdates() { + defer s.wg.Done() for { ch := s.MetaClient.WaitForDataChanged() select { case <-ch: - //Check that we haven't been closed before performing update. - s.mu.Lock() - if s.closed { - s.mu.Unlock() - s.Logger.Println("service closed not updating") - return + err := s.Update() + if err != nil { + s.Logger.Println("error updating subscriptions:", err) } - s.mu.Unlock() - s.Update() case <-s.closing: return } } - } // Update will start new and stop deleted subscriptions. func (s *Service) Update() error { - dbis := s.MetaClient.Databases() - allEntries := make(map[subEntry]bool, 0) - // Add in new subscriptions - for _, dbi := range dbis { - for _, rpi := range dbi.RetentionPolicies { - for _, si := range rpi.Subscriptions { - se := subEntry{ - db: dbi.Name, - rp: rpi.Name, - name: si.Name, - } - allEntries[se] = true - if _, ok := s.subs[se]; ok { - continue - } - sub, err := s.createSubscription(se, si.Mode, si.Destinations) - if err != nil { - return err - } - s.subs[se] = sub - } - } - } - - // Remove deleted subs - for se := range s.subs { - if !allEntries[se] { - delete(s.subs, se) - s.Logger.Println("deleted old subscription for", se.db, se.rp) - } + // signal update + select { + case s.update <- struct{}{}: + return nil + case <-s.closing: + return errors.New("service closed cannot update") } - - return nil } func (s *Service) createSubscription(se subEntry, mode string, destinations []string) (PointsWriter, error) { @@ -219,20 +179,68 @@ func (s *Service) Points() chan<- *coordinator.WritePointsRequest { } // read points off chan and write them -func (s *Service) writePoints() { +func (s *Service) run() { defer s.wg.Done() - for p := range s.points { - for se, sub := range s.subs { - if p.Database == se.db && p.RetentionPolicy == se.rp { - err := sub.WritePoints(p) + subs := make(map[subEntry]PointsWriter) + // Perform initial update + s.updateSubs(subs) + for { + select { + case <-s.update: + s.updateSubs(subs) + case p, ok := <-s.points: + if !ok { + return + } + for se, sub := range subs { + if p.Database == se.db && p.RetentionPolicy == se.rp { + err := sub.WritePoints(p) + if err != nil { + s.Logger.Println(err) + s.statMap.Add(statWriteFailures, 1) + } + } + } + s.statMap.Add(statPointsWritten, int64(len(p.Points))) + } + } +} + +func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error { + dbis := s.MetaClient.Databases() + allEntries := make(map[subEntry]bool, 0) + // Add in new subscriptions + for _, dbi := range dbis { + for _, rpi := range dbi.RetentionPolicies { + for _, si := range rpi.Subscriptions { + se := subEntry{ + db: dbi.Name, + rp: rpi.Name, + name: si.Name, + } + allEntries[se] = true + if _, ok := subs[se]; ok { + continue + } + sub, err := s.createSubscription(se, si.Mode, si.Destinations) if err != nil { - s.Logger.Println(err) - s.statMap.Add(statWriteFailures, 1) + return err } + subs[se] = sub + s.Logger.Println("added new subscription for", se.db, se.rp) } } - s.statMap.Add(statPointsWritten, int64(len(p.Points))) } + + // Remove deleted subs + for se := range subs { + if !allEntries[se] { + delete(subs, se) + s.Logger.Println("deleted old subscription for", se.db, se.rp) + } + } + + return nil } // BalanceMode sets what balance mode to use on a subscription. @@ -281,6 +289,8 @@ func newPointsWriter(u url.URL) (PointsWriter, error) { switch u.Scheme { case "udp": return NewUDP(u.Host), nil + case "http", "https": + return NewHTTP(u.Host) default: return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme) } From 18d0bfe056adf3fe2df204dc1c816694f775d709 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 17 May 2016 13:24:23 -0600 Subject: [PATCH 2/7] fix some logging issues --- services/subscriber/http.go | 5 ++++- services/subscriber/service.go | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/services/subscriber/http.go b/services/subscriber/http.go index 44d53f9717a..c1208e90a84 100644 --- a/services/subscriber/http.go +++ b/services/subscriber/http.go @@ -1,6 +1,8 @@ package subscriber import ( + "time" + "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/coordinator" ) @@ -13,7 +15,8 @@ type HTTP struct { // NewHTTP returns a new HTTP points writer with default options. func NewHTTP(addr string) (*HTTP, error) { conf := client.HTTPConfig{ - Addr: addr, + Addr: addr, + Timeout: 30 * time.Second, } c, err := client.NewHTTPClient(conf) if err != nil { diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 36b4bcba365..27f0148aee6 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -165,7 +165,6 @@ func (s *Service) createSubscription(se subEntry, mode string, destinations []st key := strings.Join([]string{"subscriber", se.db, se.rp, se.name, dest}, ":") statMaps[i] = influxdb.NewStatistics(key, "subscriber", tags) } - s.Logger.Println("created new subscription for", se.db, se.rp) return &balancewriter{ bm: bm, writers: writers, @@ -187,7 +186,10 @@ func (s *Service) run() { for { select { case <-s.update: - s.updateSubs(subs) + err := s.updateSubs(subs) + if err != nil { + s.Logger.Println("failed to update subscriptions:", err) + } case p, ok := <-s.points: if !ok { return @@ -290,7 +292,7 @@ func newPointsWriter(u url.URL) (PointsWriter, error) { case "udp": return NewUDP(u.Host), nil case "http", "https": - return NewHTTP(u.Host) + return NewHTTP(u.String()) default: return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme) } From 904e8cc825aab0124e7ac04c4c6824e0bee9ce98 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 17 May 2016 13:36:03 -0600 Subject: [PATCH 3/7] add config for http timeout --- cmd/influxd/run/config.go | 4 ++++ services/subscriber/config.go | 25 ++++++++++++++++++++- services/subscriber/http.go | 4 ++-- services/subscriber/service.go | 40 +++++++++++++++++++--------------- 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 27b6f35f424..ba9d4ed255f 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -168,6 +168,10 @@ func (c *Config) Validate() error { return err } + if err := c.Subscriber.Validate(); err != nil { + return err + } + for _, g := range c.GraphiteInputs { if err := g.Validate(); err != nil { return fmt.Errorf("invalid graphite config: %v", err) diff --git a/services/subscriber/config.go b/services/subscriber/config.go index 5512678479c..ff92077c2e1 100644 --- a/services/subscriber/config.go +++ b/services/subscriber/config.go @@ -1,12 +1,35 @@ package subscriber +import ( + "errors" + "time" + + "github.com/influxdata/influxdb/toml" +) + +const ( + DefaultHTTPTimeout = 30 * time.Second +) + // Config represents a configuration of the subscriber service. type Config struct { // Whether to enable to Subscriber service Enabled bool `toml:"enabled"` + + HTTPTimeout toml.Duration `toml:"http-timeout"` } // NewConfig returns a new instance of a subscriber config. func NewConfig() Config { - return Config{Enabled: true} + return Config{ + Enabled: true, + HTTPTimeout: toml.Duration(DefaultHTTPTimeout), + } +} + +func (c Config) Validate() error { + if c.HTTPTimeout <= 0 { + return errors.New("http-timeout must be greater than 0") + } + return nil } diff --git a/services/subscriber/http.go b/services/subscriber/http.go index c1208e90a84..57b97949aab 100644 --- a/services/subscriber/http.go +++ b/services/subscriber/http.go @@ -13,10 +13,10 @@ type HTTP struct { } // NewHTTP returns a new HTTP points writer with default options. -func NewHTTP(addr string) (*HTTP, error) { +func NewHTTP(addr string, timeout time.Duration) (*HTTP, error) { conf := client.HTTPConfig{ Addr: addr, - Timeout: 30 * time.Second, + Timeout: timeout, } c, err := client.NewHTTPClient(conf) if err != nil { diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 27f0148aee6..f5ae310898f 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -10,6 +10,7 @@ import ( "os" "strings" "sync" + "time" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/coordinator" @@ -52,17 +53,20 @@ type Service struct { closed bool closing chan struct{} mu sync.Mutex + conf Config } // NewService returns a subscriber service with given settings func NewService(c Config) *Service { - return &Service{ - NewPointsWriter: newPointsWriter, - Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), - statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), - points: make(chan *coordinator.WritePointsRequest, 100), - closed: true, + s := &Service{ + Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), + statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), + points: make(chan *coordinator.WritePointsRequest, 100), + closed: true, + conf: c, } + s.NewPointsWriter = s.newPointsWriter + return s } // Open starts the subscription service. @@ -245,6 +249,18 @@ func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error { return nil } +// Creates a PointsWriter from the given URL +func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) { + switch u.Scheme { + case "udp": + return NewUDP(u.Host), nil + case "http", "https": + return NewHTTP(u.String(), time.Duration(s.conf.HTTPTimeout)) + default: + return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme) + } +} + // BalanceMode sets what balance mode to use on a subscription. // valid options are currently ALL or ANY type BalanceMode int @@ -285,15 +301,3 @@ func (b *balancewriter) WritePoints(p *coordinator.WritePointsRequest) error { } return lastErr } - -// Creates a PointsWriter from the given URL -func newPointsWriter(u url.URL) (PointsWriter, error) { - switch u.Scheme { - case "udp": - return NewUDP(u.Host), nil - case "http", "https": - return NewHTTP(u.String()) - default: - return nil, fmt.Errorf("unknown destination scheme %s", u.Scheme) - } -} From 73076fa0a3a04eb30061b4ee14c9595fe6306aa4 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 17 May 2016 13:39:24 -0600 Subject: [PATCH 4/7] update example config --- etc/config.sample.toml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/etc/config.sample.toml b/etc/config.sample.toml index 89ae82bb60d..c1e06149816 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -172,6 +172,18 @@ reporting-disabled = false # https-private-key = "" max-row-limit = 10000 +### +### [subsciber] +### +### Controls the subscriptions, which can be used to fork a copy of all data +### received by the InfluxDB host. +### + +[subsciber] + enabled = true + http-timeout = "30s" + + ### ### [[graphite]] ### From 663373c4e14647941beb3060786c2cdf393add88 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Tue, 17 May 2016 14:44:48 -0600 Subject: [PATCH 5/7] changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a82785fe725..25c7eb33e20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#2926](https://github.com/influxdata/influxdb/issues/2926): Support bound parameters in the parser. - [#1310](https://github.com/influxdata/influxdb/issues/1310): Add https-private-key option to httpd config. - [#6621](https://github.com/influxdata/influxdb/pull/6621): Add Holt-Winter forecasting function. +- [#6655](https://github.com/influxdata/influxdb/issues/6655): Add HTTP(s) based subscriptions. ### Bugfixes From a7cac1337bf5881570e0761e96492547232f4f09 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 18 May 2016 14:11:25 -0600 Subject: [PATCH 6/7] writes happen concurrently via chanWriter --- services/subscriber/http.go | 4 +- services/subscriber/service.go | 90 ++++++++++++++++++++++++++-------- 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/services/subscriber/http.go b/services/subscriber/http.go index 57b97949aab..807ec322a65 100644 --- a/services/subscriber/http.go +++ b/services/subscriber/http.go @@ -31,8 +31,8 @@ func (h *HTTP) WritePoints(p *coordinator.WritePointsRequest) (err error) { Database: p.Database, RetentionPolicy: p.RetentionPolicy, }) - for _, p := range p.Points { - bp.AddPoint(client.NewPointFrom(p)) + for _, pt := range p.Points { + bp.AddPoint(client.NewPointFrom(pt)) } err = h.c.Write(bp) return diff --git a/services/subscriber/service.go b/services/subscriber/service.go index f5ae310898f..1d2888ca51a 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -54,18 +54,22 @@ type Service struct { closing chan struct{} mu sync.Mutex conf Config + + failures *expvar.Int } // NewService returns a subscriber service with given settings func NewService(c Config) *Service { s := &Service{ - Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), - statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), - points: make(chan *coordinator.WritePointsRequest, 100), - closed: true, - conf: c, + Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), + statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), + points: make(chan *coordinator.WritePointsRequest, 100), + closed: true, + conf: c, + failures: &expvar.Int{}, } s.NewPointsWriter = s.newPointsWriter + s.statMap.Set(statWriteFailures, s.failures) return s } @@ -83,14 +87,21 @@ func (s *Service) Open() error { s.update = make(chan struct{}) s.wg.Add(2) - go s.run() - go s.waitForMetaUpdates() + go func() { + defer s.wg.Done() + s.run() + }() + go func() { + defer s.wg.Done() + s.waitForMetaUpdates() + }() s.Logger.Println("opened service") return nil } // Close terminates the subscription service +// Will panic if called multiple times or without first opening the service. func (s *Service) Close() error { s.mu.Lock() defer s.mu.Unlock() @@ -111,7 +122,6 @@ func (s *Service) SetLogOutput(w io.Writer) { } func (s *Service) waitForMetaUpdates() { - defer s.wg.Done() for { ch := s.MetaClient.WaitForDataChanged() select { @@ -183,28 +193,30 @@ func (s *Service) Points() chan<- *coordinator.WritePointsRequest { // read points off chan and write them func (s *Service) run() { - defer s.wg.Done() - subs := make(map[subEntry]PointsWriter) + var wg sync.WaitGroup + subs := make(map[subEntry]chanWriter) // Perform initial update - s.updateSubs(subs) + s.updateSubs(subs, &wg) for { select { case <-s.update: - err := s.updateSubs(subs) + err := s.updateSubs(subs, &wg) if err != nil { s.Logger.Println("failed to update subscriptions:", err) } case p, ok := <-s.points: if !ok { + // Close out all chanWriters + for _, cw := range subs { + cw.Close() + } + // Wait for them to finish + wg.Wait() return } - for se, sub := range subs { + for se, cw := range subs { if p.Database == se.db && p.RetentionPolicy == se.rp { - err := sub.WritePoints(p) - if err != nil { - s.Logger.Println(err) - s.statMap.Add(statWriteFailures, 1) - } + cw.writeRequests <- p } } s.statMap.Add(statPointsWritten, int64(len(p.Points))) @@ -212,7 +224,7 @@ func (s *Service) run() { } } -func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error { +func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) error { dbis := s.MetaClient.Databases() allEntries := make(map[subEntry]bool, 0) // Add in new subscriptions @@ -232,7 +244,18 @@ func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error { if err != nil { return err } - subs[se] = sub + cw := chanWriter{ + writeRequests: make(chan *coordinator.WritePointsRequest), + pw: sub, + failures: s.failures, + logger: s.Logger, + } + wg.Add(1) + go func() { + defer wg.Done() + cw.Run() + }() + subs[se] = cw s.Logger.Println("added new subscription for", se.db, se.rp) } } @@ -241,6 +264,10 @@ func (s *Service) updateSubs(subs map[subEntry]PointsWriter) error { // Remove deleted subs for se := range subs { if !allEntries[se] { + // Close the chanWriter + subs[se].Close() + + // Remove it from the set delete(subs, se) s.Logger.Println("deleted old subscription for", se.db, se.rp) } @@ -261,6 +288,29 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) { } } +// Sends WritePointsRequest to a PointsWriter received over a channel. +type chanWriter struct { + writeRequests chan *coordinator.WritePointsRequest + pw PointsWriter + failures *expvar.Int + logger *log.Logger +} + +// Close the chanWriter +func (c chanWriter) Close() { + close(c.writeRequests) +} + +func (c chanWriter) Run() { + for wr := range c.writeRequests { + err := c.pw.WritePoints(wr) + if err != nil { + c.logger.Println(err) + c.failures.Add(1) + } + } +} + // BalanceMode sets what balance mode to use on a subscription. // valid options are currently ALL or ANY type BalanceMode int From d460be370694ee82cb4346f1a1d2912185090b1f Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 19 May 2016 10:53:04 -0600 Subject: [PATCH 7/7] buffered chans, correct stats, drop to slow backends --- services/subscriber/service.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 1d2888ca51a..9790c4fb2ed 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -55,21 +55,24 @@ type Service struct { mu sync.Mutex conf Config - failures *expvar.Int + failures *expvar.Int + pointsWritten *expvar.Int } // NewService returns a subscriber service with given settings func NewService(c Config) *Service { s := &Service{ - Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), - statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), - points: make(chan *coordinator.WritePointsRequest, 100), - closed: true, - conf: c, - failures: &expvar.Int{}, + Logger: log.New(os.Stderr, "[subscriber] ", log.LstdFlags), + statMap: influxdb.NewStatistics("subscriber", "subscriber", nil), + points: make(chan *coordinator.WritePointsRequest, 100), + closed: true, + conf: c, + failures: &expvar.Int{}, + pointsWritten: &expvar.Int{}, } s.NewPointsWriter = s.newPointsWriter s.statMap.Set(statWriteFailures, s.failures) + s.statMap.Set(statPointsWritten, s.pointsWritten) return s } @@ -216,10 +219,13 @@ func (s *Service) run() { } for se, cw := range subs { if p.Database == se.db && p.RetentionPolicy == se.rp { - cw.writeRequests <- p + select { + case cw.writeRequests <- p: + default: + s.failures.Add(1) + } } } - s.statMap.Add(statPointsWritten, int64(len(p.Points))) } } } @@ -245,9 +251,10 @@ func (s *Service) updateSubs(subs map[subEntry]chanWriter, wg *sync.WaitGroup) e return err } cw := chanWriter{ - writeRequests: make(chan *coordinator.WritePointsRequest), + writeRequests: make(chan *coordinator.WritePointsRequest, 100), pw: sub, failures: s.failures, + pointsWritten: s.pointsWritten, logger: s.Logger, } wg.Add(1) @@ -292,6 +299,7 @@ func (s *Service) newPointsWriter(u url.URL) (PointsWriter, error) { type chanWriter struct { writeRequests chan *coordinator.WritePointsRequest pw PointsWriter + pointsWritten *expvar.Int failures *expvar.Int logger *log.Logger } @@ -307,6 +315,8 @@ func (c chanWriter) Run() { if err != nil { c.logger.Println(err) c.failures.Add(1) + } else { + c.pointsWritten.Add(int64(len(wr.Points))) } } }