Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,59 @@ To connect via TCP, set the configuration string to:
// ...
```

## Pooled Line Senders

**Warning: Experimental feature designed for use with HTTP senders ONLY**

Version 3 of the client introduces a `LineSenderPool`, which provides a mechanism
to cache previously-used `LineSender`s in memory so they can be reused without
having to allocate and instantiate new senders.

A LineSenderPool is thread-safe and can be used to concurrently Acquire and Release senders
across multiple goroutines.

Since `LineSender`s must be used in a single-threaded context, a typical pattern is to Acquire
a sender from a `LineSenderPool` at the beginning of a goroutine and use a deferred
execution block to Release the sender at the end of the goroutine.

Here is an example of the `LineSenderPool` Acquire, Release, and Close semantics:

```go
package main

import (
"context"

qdb "github.com/questdb/go-questdb-client/v3"
)

func main() {
ctx := context.TODO()

pool := qdb.PoolFromConf("http::addr=localhost:9000")
defer func() {
err := pool.Close(ctx)
if err != nil {
panic(err)
}
}()

sender, err := pool.Acquire(ctx)
if err != nil {
panic(err)
}

sender.Table("prices").
Symbol("ticker", "AAPL").
Float64Column("price", 123.45).
AtNow(ctx)

if err := pool.Release(ctx, sender); err != nil {
panic(err)
}
}
```

## Migration from v2

v2 code example:
Expand Down
179 changes: 179 additions & 0 deletions sender_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2022 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package questdb

import (
"context"
"errors"
"fmt"
"strings"
"sync"
)

// LineSenderPool wraps a mutex-protected slice of [LineSender]. It allows a goroutine to
// Acquire a sender from the pool and Release it back to the pool when it's done being used.
//
// WARNING: This is an experimental API that is designed to work with HTTP senders ONLY.
type LineSenderPool struct {
maxSenders int
conf string

closed bool

senders []LineSender
mu *sync.Mutex
}

// LineSenderPoolOption defines line sender pool config option.
type LineSenderPoolOption func(*LineSenderPool)

// PoolFromConf instantiates a new LineSenderPool with a QuestDB configuration string.
// Any sender acquired from this pool will be initialized with the same configuration
// string that was passed into the conf argument.
//
// The default maximum number of senders is 64, but can be customized by using the
// [WithMaxSenders] option.
func PoolFromConf(conf string, opts ...LineSenderPoolOption) (*LineSenderPool, error) {
if strings.HasPrefix(conf, "tcp") {
return nil, errors.New("tcp/s not supported for pooled senders, use http/s only")
}

pool := &LineSenderPool{
maxSenders: 64,
conf: conf,
senders: []LineSender{},
mu: &sync.Mutex{},
}

for _, opt := range opts {
opt(pool)
}

return pool, nil
}

// WithMaxSenders sets the maximum number of senders in the pool.
// The default maximum number of senders is 64.
func WithMaxSenders(count int) LineSenderPoolOption {
return func(lsp *LineSenderPool) {
lsp.maxSenders = count
}
}

// Acquire obtains a LineSender from the pool. If the pool is empty, a new
// LineSender will be instantiated using the pool's config string.
func (p *LineSenderPool) Acquire(ctx context.Context) (LineSender, error) {
p.mu.Lock()
defer p.mu.Unlock()

if p.closed {
return nil, fmt.Errorf("cannot Acquire a LineSender from a closed LineSenderPool")
}

if len(p.senders) > 0 {
// Pop sender off the slice and return it
s := p.senders[len(p.senders)-1]
p.senders = p.senders[0 : len(p.senders)-1]
return s, nil
}

return LineSenderFromConf(ctx, p.conf)
}

// Release flushes the LineSender and returns it back to the pool. If the pool
// is full, the sender is closed and discarded. In cases where the sender's
// flush fails, it is not added back to the pool.
func (p *LineSenderPool) Release(ctx context.Context, s LineSender) error {
// If there is an error on flush, do not add the sender back to the pool
if err := s.Flush(ctx); err != nil {
return err
}

p.mu.Lock()
defer p.mu.Unlock()

for i := range p.senders {
if p.senders[i] == s {
return fmt.Errorf("LineSender %p has already been released back to the pool", s)
}
}

if p.closed || len(p.senders) >= p.maxSenders {
return s.Close(ctx)
}

p.senders = append(p.senders, s)

return nil
}

// Close sets the pool's status to "closed" and closes all cached LineSenders.
// When LineSenders are released back into a closed pool, they will be closed and discarded.
func (p *LineSenderPool) Close(ctx context.Context) error {
p.mu.Lock()
defer p.mu.Unlock()

p.closed = true

var senderErrors []error

for _, s := range p.senders {
senderErr := s.Close(ctx)
if senderErr != nil {
senderErrors = append(senderErrors, senderErr)

}
}

if len(senderErrors) == 0 {
return nil
}

err := fmt.Errorf("error closing one or more LineSenders in the pool")
for _, senderErr := range senderErrors {
err = fmt.Errorf("%s %w", err, senderErr)
}

return err
}

// IsClosed will return true if the pool is closed. Once a pool is closed,
// you will not be able to Acquire any new LineSenders from it. When
// LineSenders are released back into a closed pool, they will be closed and
// discarded.
func (p *LineSenderPool) IsClosed() bool {
p.mu.Lock()
defer p.mu.Unlock()

return p.closed
}

// Len returns the numbers of cached LineSenders in the pool.
func (p *LineSenderPool) Len() int {
p.mu.Lock()
defer p.mu.Unlock()

return len(p.senders)
}
Loading