Skip to content

Commit

Permalink
feat: (WIP) adding LP batching
Browse files Browse the repository at this point in the history
  • Loading branch information
karel-rehor committed Oct 22, 2024
1 parent 86c45d7 commit 8a9d396
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 29 deletions.
179 changes: 160 additions & 19 deletions influxdb3/batching/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,54 @@ import (
"github.com/InfluxCommunity/influxdb3-go/influxdb3"
)

const (
BatchUnknown = iota
BatchPoints = iota
BatchLP = iota
)

// Option to adapt properties of a batcher
type Option func(*Batcher)
type PBOption func(*PointBatcher)

// WithSize changes the batch-size emitted by the batcher
func WithSize(size int) Option {
return func(b *Batcher) {
func WithSize(size int) PBOption {
return func(b *PointBatcher) {
b.size = size
}
}

// WithCapacity changes the initial capacity of the points buffer
func WithCapacity(capacity int) Option {
return func(b *Batcher) {
func WithCapacity(capacity int) PBOption {
return func(b *PointBatcher) {
b.capacity = capacity
}
}

/*
func WithIdiom(idiom int) Option {
return func(b *Batcher) {
if idiom < 0 || idiom > BatchIdiomLP {
b.idiom = BatchIdiomUnknown
} else {
b.idiom = idiom
}
}
} */

// WithReadyCallback sets the function called when a new batch is ready. The
// batcher will wait for the callback to finish, so please return as fast as
// possible and move long-running processing to a go-routine.
func WithReadyCallback(f func()) Option {
return func(b *Batcher) {
func WithReadyCallback(f func()) PBOption {
return func(b *PointBatcher) {
b.callbackReady = f
}
}

// WithEmitCallback sets the function called when a new batch is ready with the
// batch of points. The batcher will wait for the callback to finish, so please
// return as fast as possible and move long-running processing to a go-routine.
func WithEmitCallback(f func([]*influxdb3.Point)) Option {
return func(b *Batcher) {
func WithEmitCallback(f func([]*influxdb3.Point)) PBOption {
return func(b *PointBatcher) {
b.callbackEmit = f
}
}
Expand All @@ -70,13 +87,28 @@ const DefaultBatchSize = 1000
// DefaultCapacity is the default initial capacity of the point buffer
const DefaultCapacity = 2 * DefaultBatchSize

// Batcher collects points and emits them as batches
type Batcher struct {
type Batcher interface {
Add(any)
Ready() bool
Emit() any
}

type BatcherCallBacks interface {
callbackReady()
callbackEmit([]any)
}

type BaseBatcher struct {
size int
capacity int

callbackReady func()
callbackEmit func([]*influxdb3.Point)
}

// Batcher collects points and emits them as batches
type PointBatcher struct {
BaseBatcher

points []*influxdb3.Point
sync.Mutex
Expand All @@ -85,31 +117,35 @@ type Batcher struct {
// NewBatcher creates and initializes a new Batcher instance applying the
// specified options. By default, a batch-size is DefaultBatchSize and the
// initial capacity is DefaultCapacity.
func NewBatcher(options ...Option) *Batcher {
func NewPointBatcher(options ...PBOption) *PointBatcher {
// Set up a batcher with the default values
b := &Batcher{
base := BaseBatcher{
size: DefaultBatchSize,
capacity: DefaultCapacity,
}
b := &PointBatcher{
BaseBatcher: base,
}

// Apply the options
for _, o := range options {
o(b)
}

// Setup the internal data
// setup internal data
b.points = make([]*influxdb3.Point, 0, b.capacity)

return b
}

// Add metric(s) to the batcher and call the given callbacks if any
func (b *Batcher) Add(p ...*influxdb3.Point) {
func (b *PointBatcher) Add(p ...*influxdb3.Point) {
b.Lock()
defer b.Unlock()

// Add the point
b.points = append(b.points, p...)
//b.addToBuffer(interfaces...)

// Call callbacks if a new batch is ready
if b.isReady() {
Expand All @@ -123,31 +159,136 @@ func (b *Batcher) Add(p ...*influxdb3.Point) {
}

// Ready tells the call if a new batch is ready to be emitted
func (b *Batcher) Ready() bool {
func (b *PointBatcher) Ready() bool {
b.Lock()
defer b.Unlock()
return b.isReady()
}

func (b *Batcher) isReady() bool {
func (b *PointBatcher) isReady() bool {
return len(b.points) >= b.size
}

// Emit returns a new batch of points with the provided batch size or with the
// remaining points. Please drain the points at the end of your processing to
// get the remaining points not filling up a batch.
func (b *Batcher) Emit() []*influxdb3.Point {
func (b *PointBatcher) Emit() []*influxdb3.Point {
b.Lock()
defer b.Unlock()

return b.emitPoints()
}

func (b *Batcher) emitPoints() []*influxdb3.Point {
func (b *PointBatcher) emitPoints() []*influxdb3.Point {
l := min(b.size, len(b.points))

points := b.points[:l]
b.points = b.points[l:]

return points
}

type LPOption func(*LPBatcher)

func WithBufferSize(size int) LPOption {
return func(b *LPBatcher) {
b.size = size
}
}

func WithBufferCapacity(capacity int) LPOption {
return func(b *LPBatcher) {
b.capacity = capacity
}
}

type LPBatcher struct {
BaseBatcher
buffer []byte
sync.Mutex
}

func NewLPBatcher(options ...LPOption) *LPBatcher {
base := BaseBatcher{
size: DefaultBatchSize,
capacity: DefaultCapacity,
}
b := &LPBatcher{
BaseBatcher: base,
}

// Apply the options
for _, o := range options {
o(b)
}

// setup internal data
b.buffer = make([]byte, 0, b.capacity)

return b
}

/*
func (b *PointBatcher) emitBytes() []byte {
b.Lock()
defer b.Unlock()
return []byte(b.buffer)
}
func (b *Batcher) addToBuffer(items ...*interface{}) {
b.Lock()
defer b.Unlock()
b.buffer = append(b.buffer, items...)
//Call callbacks if a new batch is ready
if b.isReady() {
if b.callbackReady != nil {
b.callbackReady()
}
if b.callbackEmit != nil {
// ??? and if its line protocol?
b.callbackEmit(b.emitPoints())
}
}
}
*/
/*
func (b *Batcher) AddPoints(p ...*influxdb3.Point) error {
//b.Lock()
//defer b.Unlock()
if b.idiom != BatchIdiomPoints {
if len(b.buffer) == 0 {
b.idiom = BatchIdiomPoints
} else {
return errors.New("this batcher does not support the Point idiom")
}
}
interfaces := make([]*interface{}, len(p))
// Add the point
for i, point := range p {
var iface interface{} = point
interfaces[i] = &iface
}
//b.points = append(b.points, p...)
b.addToBuffer(interfaces...)
return nil
} */

/*
func (b *Batcher) AddLP(lines ...string) error {
if b.idiom != BatchIdiomLP {
if len(b.buffer) == 0 {
b.idiom = BatchIdiomLP
} else {
return errors.New("this batcher does not support the Line Protocol (LP) idiom")
}
}
interfaces := make([]*interface{}, len(lines))
for n, line := range lines {
var iface interface{} = line
interfaces[n] = &iface
}
b.addToBuffer(interfaces...)
return nil
} */
Loading

0 comments on commit 8a9d396

Please sign in to comment.