diff --git a/internal/errors.go b/internal/errors.go index a1f58c3eb2510..d1e098ea441ce 100644 --- a/internal/errors.go +++ b/internal/errors.go @@ -37,3 +37,23 @@ func (e *FatalError) Error() string { func (e *FatalError) Unwrap() error { return e.Err } + +// PartialWriteError indicate that only a subset of the metrics were written +// successfully (i.e. accepted). The rejected metrics should be removed from +// the buffer without being successfully written. Please note: the metrics +// are specified as indices into the batch to be able to reference tracking +// metrics correctly. +type PartialWriteError struct { + Err error + MetricsAccept []int + MetricsReject []int + MetricsRejectErrors []error +} + +func (e *PartialWriteError) Error() string { + return e.Err.Error() +} + +func (e *PartialWriteError) Unwrap() error { + return e.Err +} diff --git a/models/buffer.go b/models/buffer.go index f20e6ee2f9c08..167f56639818f 100644 --- a/models/buffer.go +++ b/models/buffer.go @@ -10,12 +10,57 @@ import ( ) var ( - AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string)) - AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string)) + AgentMetricsWritten = selfstat.Register("agent", "metrics_written", make(map[string]string)) + AgentMetricsRejected = selfstat.Register("agent", "metrics_rejected", make(map[string]string)) + AgentMetricsDropped = selfstat.Register("agent", "metrics_dropped", make(map[string]string)) registerGob = sync.OnceFunc(func() { metric.Init() }) ) +type Transaction struct { + // Batch of metrics to write + Batch []telegraf.Metric + + // Accept denotes the indices of metrics that were successfully written + Accept []int + // Reject denotes the indices of metrics that were not written but should + // not be requeued + Reject []int + + // Marks this transaction as valid + valid bool + + // Internal state that can be used by the buffer implementation + state interface{} +} + +func (tx *Transaction) AcceptAll() { + tx.Accept = make([]int, len(tx.Batch)) + for i := range tx.Batch { + tx.Accept[i] = i + } +} + +func (tx *Transaction) KeepAll() {} + +func (tx *Transaction) InferKeep() []int { + used := make([]bool, len(tx.Batch)) + for _, idx := range tx.Accept { + used[idx] = true + } + for _, idx := range tx.Reject { + used[idx] = true + } + + keep := make([]int, 0, len(tx.Batch)) + for i := range tx.Batch { + if !used[i] { + keep = append(keep, i) + } + } + return keep +} + type Buffer interface { // Len returns the number of metrics currently in the buffer. Len() int @@ -23,19 +68,15 @@ type Buffer interface { // Add adds metrics to the buffer and returns number of dropped metrics. Add(metrics ...telegraf.Metric) int - // Batch returns a slice containing up to batchSize of the oldest metrics not - // yet dropped. Metrics are ordered from oldest to newest in the batch. The - // batch must not be modified by the client. - Batch(batchSize int) []telegraf.Metric - - // Accept marks the batch, acquired from Batch(), as successfully written. - Accept(metrics []telegraf.Metric) + // Batch starts a transaction by returning a slice of metrics up to the + // given batch-size starting from the oldest metric in the buffer. Metrics + // are ordered from oldest to newest and must not be modified by the plugin. + BeginTransaction(batchSize int) *Transaction - // Reject returns the batch, acquired from Batch(), to the buffer and marks it - // as unsent. - Reject([]telegraf.Metric) + // Flush ends a metric and persists the buffer state + EndTransaction(*Transaction) - // Stats returns the buffer statistics such as rejected, dropped and accepred metrics + // Stats returns the buffer statistics such as rejected, dropped and accepted metrics Stats() BufferStats // Close finalizes the buffer and closes all open resources @@ -45,11 +86,12 @@ type Buffer interface { // BufferStats holds common metrics used for buffer implementations. // Implementations of Buffer should embed this struct in them. type BufferStats struct { - MetricsAdded selfstat.Stat - MetricsWritten selfstat.Stat - MetricsDropped selfstat.Stat - BufferSize selfstat.Stat - BufferLimit selfstat.Stat + MetricsAdded selfstat.Stat + MetricsWritten selfstat.Stat + MetricsRejected selfstat.Stat + MetricsDropped selfstat.Stat + BufferSize selfstat.Stat + BufferLimit selfstat.Stat } // NewBuffer returns a new empty Buffer with the given capacity. @@ -84,6 +126,11 @@ func NewBufferStats(name, alias string, capacity int) BufferStats { "metrics_written", tags, ), + MetricsRejected: selfstat.Register( + "write", + "metrics_rejected", + tags, + ), MetricsDropped: selfstat.Register( "write", "metrics_dropped", @@ -115,6 +162,12 @@ func (b *BufferStats) metricWritten(m telegraf.Metric) { m.Accept() } +func (b *BufferStats) metricRejected(m telegraf.Metric) { + AgentMetricsRejected.Incr(1) + b.MetricsRejected.Incr(1) + m.Reject() +} + func (b *BufferStats) metricDropped(m telegraf.Metric) { AgentMetricsDropped.Incr(1) b.MetricsDropped.Incr(1) diff --git a/models/buffer_disk.go b/models/buffer_disk.go index 57836dbab9070..799ac24758cb1 100644 --- a/models/buffer_disk.go +++ b/models/buffer_disk.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "path/filepath" + "slices" + "sort" "sync" "github.com/tidwall/wal" @@ -31,6 +33,11 @@ type DiskBuffer struct { // we have to do our best and track that the walfile "should" be empty, so that next // write, we can remove the invalid entry (also skipping this entry if it is being read). isEmpty bool + + // The mask contains offsets of metric already removed during a previous + // transaction. Metrics at those offsets should not be contained in new + // batches. + mask []int } func NewDiskBuffer(name, id, path string, stats BufferStats) (*DiskBuffer, error) { @@ -67,7 +74,11 @@ func (b *DiskBuffer) length() int { if b.isEmpty { return 0 } - // Special case for when the read index is zero, it must be empty (otherwise it would be >= 1) + + return b.entries() - len(b.mask) +} + +func (b *DiskBuffer) entries() int { if b.readIndex() == 0 { return 0 } @@ -121,28 +132,33 @@ func (b *DiskBuffer) addSingleMetric(m telegraf.Metric) bool { return false } -func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { +func (b *DiskBuffer) BeginTransaction(batchSize int) *Transaction { b.Lock() defer b.Unlock() if b.length() == 0 { - // no metrics in the wal file, so return an empty array - return make([]telegraf.Metric, 0) + return &Transaction{} } b.batchFirst = b.readIndex() - var metrics []telegraf.Metric - b.batchSize = 0 + + metrics := make([]telegraf.Metric, 0, batchSize) + offsets := make([]int, 0, batchSize) readIndex := b.batchFirst endIndex := b.writeIndex() + offset := 0 for batchSize > 0 && readIndex < endIndex { data, err := b.file.Read(readIndex) if err != nil { panic(err) } readIndex++ + offset++ - m, err := metric.FromBytes(data) + if slices.Contains(b.mask, offset) { + // Metric is masked by a previous write and is scheduled for removal + continue + } // Validate that a tracking metric is from this instance of telegraf and skip ones from older instances. // A tracking metric can be skipped here because metric.Accept() is only called once data is successfully @@ -152,11 +168,12 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { // - ErrSkipTracking: means that the tracking information was unable to be found for a tracking ID. // - Outside of range: means that the metric was guaranteed to be left over from the previous instance // as it was here when we opened the wal file in this instance. - if errors.Is(err, metric.ErrSkipTracking) { - // could not look up tracking information for metric, skip - continue - } + m, err := metric.FromBytes(data) if err != nil { + if errors.Is(err, metric.ErrSkipTracking) { + // could not look up tracking information for metric, skip + continue + } // non-recoverable error in deserialization, abort log.Printf("E! raw metric data: %v", data) panic(err) @@ -167,33 +184,82 @@ func (b *DiskBuffer) Batch(batchSize int) []telegraf.Metric { } metrics = append(metrics, m) + offsets = append(offsets, offset) b.batchSize++ batchSize-- } - return metrics + return &Transaction{Batch: metrics, valid: true, state: offsets} } -func (b *DiskBuffer) Accept(batch []telegraf.Metric) { +func (b *DiskBuffer) EndTransaction(tx *Transaction) { + if len(tx.Batch) == 0 { + return + } + + // Ignore invalid transactions and make sure they can only be finished once + if !tx.valid { + return + } + tx.valid = false + + // Get the metric offsets from the transaction + offsets := tx.state.([]int) + b.Lock() defer b.Unlock() - if b.batchSize == 0 || len(batch) == 0 { - // nothing to accept + // Mark metrics which should be removed in the internal mask + remove := make([]int, 0, len(tx.Accept)+len(tx.Reject)) + for _, idx := range tx.Accept { + b.metricWritten(tx.Batch[idx]) + remove = append(remove, offsets[idx]) + } + for _, idx := range tx.Reject { + b.metricRejected(tx.Batch[idx]) + remove = append(remove, offsets[idx]) + } + b.mask = append(b.mask, remove...) + sort.Ints(b.mask) + + // Remove the metrics that are marked for removal from the front of the + // WAL file. All other metrics must be kept. + if len(b.mask) == 0 || b.mask[0] != 0 { + // Mask is empty or the first index is not the front of the file, so + // exit early as there is nothing to remove return } - for _, m := range batch { - b.metricWritten(m) + + // Determine up to which index we can remove the entries from the WAL file + var removeIdx int + for i, offset := range b.mask { + if offset != i { + break + } + removeIdx = offset } - if b.length() == len(batch) { - b.emptyFile() + + // Remove the metrics in front from the WAL file + b.isEmpty = b.entries()-removeIdx-1 <= 0 + if b.isEmpty { + // WAL files cannot be fully empty but need to contain at least one + // item to not throw an error + if err := b.file.TruncateFront(b.writeIndex()); err != nil { + log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) + panic(err) + } } else { - err := b.file.TruncateFront(b.batchFirst + uint64(len(batch))) - if err != nil { - log.Printf("E! batch length: %d, batchFirst: %d, batchSize: %d", len(batch), b.batchFirst, b.batchSize) + if err := b.file.TruncateFront(b.batchFirst + uint64(removeIdx+1)); err != nil { + log.Printf("E! batch length: %d, first: %d, size: %d", len(tx.Batch), b.batchFirst, b.batchSize) panic(err) } } + // Truncate the mask and update the relative offsets + b.mask = b.mask[:removeIdx] + for i := range b.mask { + b.mask[i] -= removeIdx + } + // check if the original end index is still valid, clear if not if b.originalEnd < b.readIndex() { b.originalEnd = 0 @@ -203,14 +269,6 @@ func (b *DiskBuffer) Accept(batch []telegraf.Metric) { b.BufferSize.Set(int64(b.length())) } -func (b *DiskBuffer) Reject(_ []telegraf.Metric) { - // very little to do here as the disk buffer retains metrics in - // the wal file until a call to accept - b.Lock() - defer b.Unlock() - b.resetBatch() -} - func (b *DiskBuffer) Stats() BufferStats { return b.BufferStats } @@ -238,14 +296,3 @@ func (b *DiskBuffer) handleEmptyFile() { } b.isEmpty = false } - -func (b *DiskBuffer) emptyFile() { - if b.isEmpty || b.length() == 0 { - return - } - if err := b.file.TruncateFront(b.writeIndex() - 1); err != nil { - log.Printf("E! writeIndex: %d, buffer len: %d", b.writeIndex(), b.length()) - panic(err) - } - b.isEmpty = true -} diff --git a/models/buffer_disk_test.go b/models/buffer_disk_test.go index 3f04ef86d6246..15ff25a73c42b 100644 --- a/models/buffer_disk_test.go +++ b/models/buffer_disk_test.go @@ -27,9 +27,9 @@ func TestDiskBufferRetainsTrackingInformation(t *testing.T) { defer buf.Close() buf.Add(mm) - - batch := buf.Batch(1) - buf.Accept(batch) + tx := buf.BeginTransaction(1) + tx.AcceptAll() + buf.EndTransaction(tx) require.Equal(t, 1, delivered) } @@ -85,11 +85,11 @@ func TestDiskBufferTrackingDroppedFromOldWal(t *testing.T) { buf.Stats().MetricsDropped.Set(0) defer buf.Close() - batch := buf.Batch(4) + tx := buf.BeginTransaction(4) // Check that the tracking metric is skipped expected := []telegraf.Metric{ metrics[0], metrics[1], metrics[2], metrics[4], } - testutil.RequireMetricsEqual(t, expected, batch) + testutil.RequireMetricsEqual(t, expected, tx.Batch) } diff --git a/models/buffer_mem.go b/models/buffer_mem.go index 7bba4744f4e07..3c2daa89c51a3 100644 --- a/models/buffer_mem.go +++ b/models/buffer_mem.go @@ -51,67 +51,67 @@ func (b *MemoryBuffer) Add(metrics ...telegraf.Metric) int { return dropped } -func (b *MemoryBuffer) Batch(batchSize int) []telegraf.Metric { +func (b *MemoryBuffer) BeginTransaction(batchSize int) *Transaction { b.Lock() defer b.Unlock() outLen := min(b.size, batchSize) - out := make([]telegraf.Metric, outLen) if outLen == 0 { - return out + return &Transaction{} } b.batchFirst = b.first b.batchSize = outLen - batchIndex := b.batchFirst - for i := range out { - out[i] = b.buf[batchIndex] + batch := make([]telegraf.Metric, outLen) + for i := range batch { + batch[i] = b.buf[batchIndex] b.buf[batchIndex] = nil batchIndex = b.next(batchIndex) } b.first = b.nextby(b.first, b.batchSize) b.size -= outLen - return out + return &Transaction{Batch: batch, valid: true} } -func (b *MemoryBuffer) Accept(batch []telegraf.Metric) { +func (b *MemoryBuffer) EndTransaction(tx *Transaction) { b.Lock() defer b.Unlock() - for _, m := range batch { - b.metricWritten(m) - } - - b.resetBatch() - b.BufferSize.Set(int64(b.length())) -} - -func (b *MemoryBuffer) Reject(batch []telegraf.Metric) { - b.Lock() - defer b.Unlock() - - if len(batch) == 0 { + // Ignore invalid transactions and make sure they can only be finished once + if !tx.valid { return } + tx.valid = false - free := b.cap - b.size - restore := min(len(batch), free) - skip := len(batch) - restore + // Accept metrics + for _, idx := range tx.Accept { + b.metricWritten(tx.Batch[idx]) + } - b.first = b.prevby(b.first, restore) - b.size = min(b.size+restore, b.cap) + // Reject metrics + for _, idx := range tx.Reject { + b.metricRejected(tx.Batch[idx]) + } - re := b.first + // Keep metrics + keep := tx.InferKeep() + if len(keep) > 0 { + restore := min(len(keep), b.cap-b.size) + b.first = b.prevby(b.first, restore) + b.size = min(b.size+restore, b.cap) + + // Restore the metrics that fit into the buffer + current := b.first + for i := 0; i < restore; i++ { + b.buf[current] = tx.Batch[keep[i]] + current = b.next(current) + } - // Copy metrics from the batch back into the buffer - for i := range batch { - if i < skip { - b.metricDropped(batch[i]) - } else { - b.buf[re] = batch[i] - re = b.next(re) + // Drop all remaining metrics + for i := restore; i < len(keep); i++ { + b.metricDropped(tx.Batch[keep[i]]) } } diff --git a/models/buffer_mem_test.go b/models/buffer_mem_test.go index 650bd3bf65c93..8a473fcb5a0a9 100644 --- a/models/buffer_mem_test.go +++ b/models/buffer_mem_test.go @@ -24,8 +24,9 @@ func TestMemoryBufferAcceptCallsMetricAccept(t *testing.T) { }, } buf.Add(mm, mm, mm) - batch := buf.Batch(2) - buf.Accept(batch) + tx := buf.BeginTransaction(2) + tx.AcceptAll() + buf.EndTransaction(tx) require.Equal(t, 2, accept) } diff --git a/models/buffer_suite_test.go b/models/buffer_suite_test.go index 99d008096373a..80ce63bdce95b 100644 --- a/models/buffer_suite_test.go +++ b/models/buffer_suite_test.go @@ -53,6 +53,7 @@ func (s *BufferSuiteTest) newTestBuffer(capacity int) Buffer { s.Require().NoError(err) buf.Stats().MetricsAdded.Set(0) buf.Stats().MetricsWritten.Set(0) + buf.Stats().MetricsRejected.Set(0) buf.Stats().MetricsDropped.Set(0) return buf } @@ -99,16 +100,16 @@ func (s *BufferSuiteTest) TestBufferBatchLenZero() { buf := s.newTestBuffer(5) defer buf.Close() - batch := buf.Batch(0) - s.Empty(batch) + tx := buf.BeginTransaction(0) + s.Empty(tx.Batch) } func (s *BufferSuiteTest) TestBufferBatchLenBufferEmpty() { buf := s.newTestBuffer(5) defer buf.Close() - batch := buf.Batch(2) - s.Empty(batch) + tx := buf.BeginTransaction(2) + s.Empty(tx.Batch) } func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() { @@ -117,8 +118,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenUnderfill() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m) - batch := buf.Batch(2) - s.Len(batch, 1) + tx := buf.BeginTransaction(2) + s.Len(tx.Batch, 1) } func (s *BufferSuiteTest) TestBufferBatchLenFill() { @@ -127,8 +128,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenFill() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m) - batch := buf.Batch(2) - s.Len(batch, 2) + tx := buf.BeginTransaction(2) + s.Len(tx.Batch, 2) } func (s *BufferSuiteTest) TestBufferBatchLenExact() { @@ -137,8 +138,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenExact() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m) - batch := buf.Batch(2) - s.Len(batch, 2) + tx := buf.BeginTransaction(2) + s.Len(tx.Batch, 2) } func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() { @@ -147,8 +148,8 @@ func (s *BufferSuiteTest) TestBufferBatchLenLargerThanBuffer() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(6) - s.Len(batch, 5) + tx := buf.BeginTransaction(6) + s.Len(tx.Batch, 5) } func (s *BufferSuiteTest) TestBufferBatchWrap() { @@ -157,11 +158,12 @@ func (s *BufferSuiteTest) TestBufferBatchWrap() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(2) - buf.Accept(batch) + tx := buf.BeginTransaction(2) + tx.AcceptAll() + buf.EndTransaction(tx) buf.Add(m, m) - batch = buf.Batch(5) - s.Len(batch, 5) + tx = buf.BeginTransaction(5) + s.Len(tx.Batch, 5) } func (s *BufferSuiteTest) TestBufferBatchLatest() { @@ -171,13 +173,13 @@ func (s *BufferSuiteTest) TestBufferBatchLatest() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferBatchLatestWrap() { @@ -193,13 +195,13 @@ func (s *BufferSuiteTest) TestBufferBatchLatestWrap() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferMultipleBatch() { @@ -212,7 +214,7 @@ func (s *BufferSuiteTest) TestBufferMultipleBatch() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), @@ -220,14 +222,16 @@ func (s *BufferSuiteTest) TestBufferMultipleBatch() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), - }, batch) - buf.Accept(batch) - batch = buf.Batch(5) + }, tx.Batch) + tx.AcceptAll() + buf.EndTransaction(tx) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), - }, batch) - buf.Accept(batch) + }, tx.Batch) + tx.AcceptAll() + buf.EndTransaction(tx) } func (s *BufferSuiteTest) TestBufferRejectWithRoom() { @@ -237,14 +241,15 @@ func (s *BufferSuiteTest) TestBufferRejectWithRoom() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), @@ -252,7 +257,7 @@ func (s *BufferSuiteTest) TestBufferRejectWithRoom() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() { @@ -264,12 +269,13 @@ func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - batch := buf.Batch(2) - buf.Reject(batch) + tx := buf.BeginTransaction(2) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), @@ -277,7 +283,7 @@ func (s *BufferSuiteTest) TestBufferRejectNothingNewFull() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectNoRoom() { @@ -291,18 +297,19 @@ func (s *BufferSuiteTest) TestBufferRejectNoRoom() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(3), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), @@ -310,7 +317,7 @@ func (s *BufferSuiteTest) TestBufferRejectNoRoom() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectRoomExact() { @@ -319,16 +326,17 @@ func (s *BufferSuiteTest) TestBufferRejectRoomExact() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0)), @@ -336,7 +344,7 @@ func (s *BufferSuiteTest) TestBufferRejectRoomExact() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() { @@ -350,16 +358,17 @@ func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(1) + tx := buf.BeginTransaction(1) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(1), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0)), @@ -367,7 +376,7 @@ func (s *BufferSuiteTest) TestBufferRejectRoomOverwriteOld() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectPartialRoom() { @@ -381,16 +390,17 @@ func (s *BufferSuiteTest) TestBufferRejectPartialRoom() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(2), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0)), @@ -398,7 +408,7 @@ func (s *BufferSuiteTest) TestBufferRejectPartialRoom() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() { @@ -412,7 +422,7 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) @@ -435,11 +445,12 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0))) // buffer: 13, 14, 15, 11, 12; batch: 2, 3 s.Equal(int64(8), buf.Stats().MetricsDropped.Get()) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(10), buf.Stats().MetricsDropped.Get()) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)), @@ -447,7 +458,7 @@ func (s *BufferSuiteTest) TestBufferRejectNewMetricsWrapped() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectWrapped() { @@ -467,16 +478,17 @@ func (s *BufferSuiteTest) TestBufferRejectWrapped() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) - batch := buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) - batch = buf.Batch(5) + tx = buf.BeginTransaction(5) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0)), @@ -484,7 +496,7 @@ func (s *BufferSuiteTest) TestBufferRejectWrapped() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() { @@ -498,36 +510,39 @@ func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(4, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(5, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(6, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(7, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(8, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(9, 0))) - batch = buf.Batch(3) + tx = buf.BeginTransaction(3) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(11, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(12, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(13, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(14, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(15, 0))) - batch = buf.Batch(3) + tx = buf.BeginTransaction(3) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(16, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0))) - batch = buf.Batch(10) + tx = buf.BeginTransaction(10) testutil.RequireMetricsEqual(s.T(), []telegraf.Metric{ metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(10, 0)), @@ -540,7 +555,7 @@ func (s *BufferSuiteTest) TestBufferRejectAdjustFirst() { metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(17, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(18, 0)), metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(19, 0)), - }, batch) + }, tx.Batch) } func (s *BufferSuiteTest) TestBufferAddDropsOverwrittenMetrics() { @@ -565,8 +580,9 @@ func (s *BufferSuiteTest) TestBufferAcceptRemovesBatch() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m) - batch := buf.Batch(2) - buf.Accept(batch) + tx := buf.BeginTransaction(2) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(1, buf.Len()) } @@ -576,8 +592,9 @@ func (s *BufferSuiteTest) TestBufferRejectLeavesBatch() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m) - batch := buf.Batch(2) - buf.Reject(batch) + tx := buf.BeginTransaction(2) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(3, buf.Len()) } @@ -587,9 +604,10 @@ func (s *BufferSuiteTest) TestBufferAcceptWritesOverwrittenBatch() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) buf.Add(m, m, m, m, m) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(int64(0), buf.Stats().MetricsDropped.Get()) s.Equal(int64(5), buf.Stats().MetricsWritten.Get()) @@ -605,9 +623,10 @@ func (s *BufferSuiteTest) TestBufferBatchRejectDropsOverwrittenBatch() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) buf.Add(m, m, m, m, m) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(5), buf.Stats().MetricsDropped.Get()) s.Equal(int64(0), buf.Stats().MetricsWritten.Get()) @@ -619,9 +638,10 @@ func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchAccept() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(m, m, m) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "dropped") s.Equal(int64(3), buf.Stats().MetricsWritten.Get(), "written") } @@ -636,9 +656,10 @@ func (s *BufferSuiteTest) TestBufferMetricsOverwriteBatchReject() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(m, m, m) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(int64(3), buf.Stats().MetricsDropped.Get()) s.Equal(int64(0), buf.Stats().MetricsWritten.Get()) } @@ -653,9 +674,10 @@ func (s *BufferSuiteTest) TestBufferMetricsBatchAcceptRemoved() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(m, m, m, m, m) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(int64(2), buf.Stats().MetricsDropped.Get()) s.Equal(int64(3), buf.Stats().MetricsWritten.Get()) } @@ -670,10 +692,10 @@ func (s *BufferSuiteTest) TestBufferWrapWithBatch() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m) - buf.Batch(3) + tx := buf.BeginTransaction(3) buf.Add(m, m, m, m, m, m) - s.Equal(int64(1), buf.Stats().MetricsDropped.Get()) + buf.EndTransaction(tx) } func (s *BufferSuiteTest) TestBufferBatchNotRemoved() { @@ -682,8 +704,9 @@ func (s *BufferSuiteTest) TestBufferBatchNotRemoved() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - buf.Batch(2) + tx := buf.BeginTransaction(2) s.Equal(5, buf.Len()) + buf.EndTransaction(tx) } func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() { @@ -692,9 +715,11 @@ func (s *BufferSuiteTest) TestBufferBatchRejectAcceptNoop() { m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) buf.Add(m, m, m, m, m) - batch := buf.Batch(2) - buf.Reject(batch) - buf.Accept(batch) + tx := buf.BeginTransaction(2) + tx.KeepAll() + buf.EndTransaction(tx) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(5, buf.Len()) } @@ -734,10 +759,11 @@ func (s *BufferSuiteTest) TestBufferAddCallsMetricRejectWhenNotInBatch() { }, } buf.Add(mm, mm, mm, mm, mm) - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(mm, mm, mm, mm) s.Equal(2, reject) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(4, reject) } @@ -757,10 +783,11 @@ func (s *BufferSuiteTest) TestBufferRejectCallsMetricRejectWithOverwritten() { }, } buf.Add(mm, mm, mm, mm, mm) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) buf.Add(mm, mm) s.Equal(0, reject) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(2, reject) } @@ -780,13 +807,14 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndReject() { }, } buf.Add(mm, mm, mm, mm, mm) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) buf.Add(mm, mm, mm, mm, mm) buf.Add(mm, mm, mm, mm, mm) buf.Add(mm, mm, mm, mm, mm) buf.Add(mm, mm, mm, mm, mm) s.Equal(15, reject) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) s.Equal(20, reject) } @@ -812,7 +840,7 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() { buf.Add(mm, mm, mm) buf.Add(mm, mm, mm, mm) s.Equal(2, reject) - batch := buf.Batch(5) + tx := buf.BeginTransaction(5) buf.Add(mm, mm, mm, mm) s.Equal(2, reject) buf.Add(mm, mm, mm, mm) @@ -821,7 +849,8 @@ func (s *BufferSuiteTest) TestBufferAddOverwriteAndRejectOffset() { s.Equal(9, reject) buf.Add(mm, mm, mm, mm) s.Equal(13, reject) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(13, reject) s.Equal(5, accept) } @@ -830,14 +859,16 @@ func (s *BufferSuiteTest) TestBufferRejectEmptyBatch() { buf := s.newTestBuffer(5) defer buf.Close() - batch := buf.Batch(2) + tx := buf.BeginTransaction(2) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) - buf.Reject(batch) + tx.KeepAll() + buf.EndTransaction(tx) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) - batch = buf.Batch(2) - for _, m := range batch { + tx = buf.BeginTransaction(2) + for _, m := range tx.Batch { s.NotNil(m) } + buf.EndTransaction(tx) } func (s *BufferSuiteTest) TestBufferFlushedPartial() { @@ -847,10 +878,11 @@ func (s *BufferSuiteTest) TestBufferFlushedPartial() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(3, 0))) - batch := buf.Batch(2) - s.Len(batch, 2) + tx := buf.BeginTransaction(2) + s.Len(tx.Batch, 2) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(1, buf.Len()) } @@ -860,13 +892,48 @@ func (s *BufferSuiteTest) TestBufferFlushedFull() { buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(1, 0))) buf.Add(metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(2, 0))) - batch := buf.Batch(2) - s.Len(batch, 2) + tx := buf.BeginTransaction(2) + s.Len(tx.Batch, 2) - buf.Accept(batch) + tx.AcceptAll() + buf.EndTransaction(tx) s.Equal(0, buf.Len()) } +func (s *BufferSuiteTest) TestPartialWriteBackToFront() { + buf := s.newTestBuffer(5) + defer buf.Close() + + m := metric.New("cpu", map[string]string{}, map[string]interface{}{"value": 42.0}, time.Unix(0, 0)) + buf.Add(m, m, m, m, m) + + // Get a batch of all metrics but only reject the last one + tx := buf.BeginTransaction(5) + s.Len(tx.Batch, 5) + tx.Reject = []int{4} + buf.EndTransaction(tx) + s.Equal(4, buf.Len()) + + // Get the next batch which should miss the last metric + tx = buf.BeginTransaction(5) + s.Len(tx.Batch, 4) + tx.Accept = []int{3} + buf.EndTransaction(tx) + s.Equal(3, buf.Len()) + + // Now get the next batch and reject the remaining metrics + tx = buf.BeginTransaction(5) + s.Len(tx.Batch, 3) + tx.Accept = []int{0, 1, 2} + buf.EndTransaction(tx) + s.Equal(0, buf.Len()) + + s.Equal(int64(5), buf.Stats().MetricsAdded.Get(), "metrics added") + s.Equal(int64(4), buf.Stats().MetricsWritten.Get(), "metrics written") + s.Equal(int64(1), buf.Stats().MetricsRejected.Get(), "metrics rejected") + s.Equal(int64(0), buf.Stats().MetricsDropped.Get(), "metrics dropped") +} + type mockMetric struct { telegraf.Metric AcceptF func() diff --git a/models/running_output.go b/models/running_output.go index c8a730d572ba6..fd1622c4438de 100644 --- a/models/running_output.go +++ b/models/running_output.go @@ -301,22 +301,21 @@ func (r *RunningOutput) Write() error { atomic.StoreInt64(&r.newMetricsCount, 0) - // Only process the metrics in the buffer now. Metrics added while we are + // Only process the metrics in the buffer now. Metrics added while we are // writing will be sent on the next call. nBuffer := r.buffer.Len() nBatches := nBuffer/r.MetricBatchSize + 1 for i := 0; i < nBatches; i++ { - batch := r.buffer.Batch(r.MetricBatchSize) - if len(batch) == 0 { - break + tx := r.buffer.BeginTransaction(r.MetricBatchSize) + if len(tx.Batch) == 0 { + return nil } - - err := r.writeMetrics(batch) + err := r.writeMetrics(tx.Batch) + r.updateTransaction(tx, err) + r.buffer.EndTransaction(tx) if err != nil { - r.buffer.Reject(batch) return err } - r.buffer.Accept(batch) } return nil } @@ -334,19 +333,15 @@ func (r *RunningOutput) WriteBatch() error { r.log.Debugf("Successfully connected after %d attempts", r.retries) } - batch := r.buffer.Batch(r.MetricBatchSize) - if len(batch) == 0 { + tx := r.buffer.BeginTransaction(r.MetricBatchSize) + if len(tx.Batch) == 0 { return nil } + err := r.writeMetrics(tx.Batch) + r.updateTransaction(tx, err) + r.buffer.EndTransaction(tx) - err := r.writeMetrics(batch) - if err != nil { - r.buffer.Reject(batch) - return err - } - r.buffer.Accept(batch) - - return nil + return err } func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { @@ -367,6 +362,26 @@ func (r *RunningOutput) writeMetrics(metrics []telegraf.Metric) error { return err } +func (r *RunningOutput) updateTransaction(tx *Transaction, err error) { + // No error indicates all metrics were written successfully + if err == nil { + tx.AcceptAll() + return + } + + // A non-partial-write-error indicated none of the metrics were written + // successfully and we should keep them for the next write cycle + var writeErr *internal.PartialWriteError + if !errors.As(err, &writeErr) { + tx.KeepAll() + return + } + + // Transfer the accepted and rejected indices based on the write error values + tx.Accept = writeErr.MetricsAccept + tx.Reject = writeErr.MetricsReject +} + func (r *RunningOutput) LogBufferStatus() { nBuffer := r.buffer.Len() if r.Config.BufferStrategy == "disk" { diff --git a/models/running_output_test.go b/models/running_output_test.go index c045dcf0140f2..3c8b9e5951e1a 100644 --- a/models/running_output_test.go +++ b/models/running_output_test.go @@ -433,6 +433,7 @@ func TestRunningOutputInternalMetrics(t *testing.T) { "buffer_size": 0, "errors": 0, "metrics_added": 0, + "metrics_rejected": 0, "metrics_dropped": 0, "metrics_filtered": 0, "metrics_written": 0,