-
Couldn't load subscription status.
- Fork 10
feat(client): add sender pool #47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
4bcc562
initial feature commit
sklarsa 1afb3f5
adds a test
sklarsa fc9b6f8
chore(ci): fix haproxy configuration for archlinux (#46)
sklarsa 55d78d9
address some pr comments around lifecycle
sklarsa 03b3329
chore(test): move test utils into an internal package
sklarsa 1b7eff2
add license to new files
sklarsa 14dda3f
fix error wrapping
sklarsa 0486e47
Merge branch 'steve/move-test-clients' into steve/pooled-senders
sklarsa da7e244
godocs
sklarsa 2e10c4c
adds sender tests
sklarsa 14c94a6
fix assertions
sklarsa f80d111
another try
sklarsa 2421757
Revert "chore(test): move test utils into an internal package"
sklarsa 660f91e
move everything into 1 package
sklarsa e956e1a
exit test reader goroutines
sklarsa f7bd180
Apply suggestions from code review
sklarsa b28d3d4
a few more suggestions
sklarsa ebe9478
Merge branch 'main' into steve/pooled-senders
sklarsa d3a4164
Remove extra new lines
puzpuzpuz 8f70e34
return error with tcp connection
sklarsa 283fbe2
Minor tweaks
puzpuzpuz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,75 @@ | ||
| package pool | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "sync" | ||
|
|
||
| qdb "github.com/questdb/go-questdb-client/v3" | ||
| ) | ||
|
|
||
| type LineSenderPool struct { | ||
sklarsa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| maxSenders int | ||
| conf string | ||
|
|
||
| senders []qdb.LineSender | ||
| mu *sync.Mutex | ||
| } | ||
|
|
||
| type LineSenderPoolOption func(*LineSenderPool) | ||
|
|
||
| func FromConf(conf string, opts ...LineSenderPoolOption) *LineSenderPool { | ||
| pool := &LineSenderPool{ | ||
| maxSenders: 64, | ||
| conf: conf, | ||
| senders: []qdb.LineSender{}, | ||
| mu: &sync.Mutex{}, | ||
| } | ||
|
|
||
| for _, opt := range opts { | ||
| opt(pool) | ||
| } | ||
|
|
||
| return pool | ||
| } | ||
|
|
||
| func WithMaxSenders(count int) LineSenderPoolOption { | ||
| return func(lsp *LineSenderPool) { | ||
| lsp.maxSenders = count | ||
| } | ||
| } | ||
|
|
||
| func (p *LineSenderPool) Acquire(ctx context.Context) (qdb.LineSender, error) { | ||
| p.mu.Lock() | ||
| defer p.mu.Unlock() | ||
|
|
||
| if len(p.senders) > 0 { | ||
| // Pop sender off the slice and return it | ||
| s := p.senders[len(p.senders)-1] | ||
| p.senders = p.senders[0 : len(p.senders)-1] | ||
| return s, nil | ||
| } | ||
|
|
||
| return qdb.LineSenderFromConf(ctx, p.conf) | ||
|
|
||
| } | ||
|
|
||
| func (p *LineSenderPool) Release(ctx context.Context, s qdb.LineSender) error { | ||
| err := s.Flush(ctx) | ||
sklarsa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| p.mu.Lock() | ||
| defer p.mu.Unlock() | ||
|
|
||
| for i := range p.senders { | ||
| if p.senders[i] == s { | ||
| return fmt.Errorf("LineSender %p is has already been released back to the pool", s) | ||
| } | ||
| } | ||
|
|
||
| if len(p.senders) < p.maxSenders { | ||
| p.senders = append(p.senders, s) | ||
| } | ||
sklarsa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| return err | ||
|
|
||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| package pool_test | ||
sklarsa marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| "github.com/questdb/go-questdb-client/v3/pool" | ||
| "github.com/stretchr/testify/assert" | ||
| ) | ||
|
|
||
| func TestBasicBehavior(t *testing.T) { | ||
| p := pool.FromConf("http::addr=localhost:1234") | ||
| ctx := context.Background() | ||
|
|
||
| // Start with an empty pool, allocate a new sender | ||
| s1, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
|
|
||
| // Release the sender and add it to the pool | ||
| assert.NoError(t, p.Release(ctx, s1)) | ||
|
|
||
| // Acquiring a sender will return the initial one from the pool | ||
| s2, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.Same(t, s1, s2) | ||
|
|
||
| // Acquiring another sender will create a new one | ||
| s3, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.NotSame(t, s1, s3) | ||
|
|
||
| // Releasing the new sender will add it back to the pool | ||
| assert.NoError(t, p.Release(ctx, s3)) | ||
|
|
||
| // Releasing the original sender will add it to the end of the pool slice | ||
| assert.NoError(t, p.Release(ctx, s2)) | ||
|
|
||
| // Acquiring a new sender will pop the original one off the slice | ||
| s4, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.Same(t, s1, s4) | ||
|
|
||
| // Acquiring another sender will pop the second one off the slice | ||
| s5, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.Same(t, s3, s5) | ||
|
|
||
| } | ||
|
|
||
| func TestDoubleReleaseShouldFail(t *testing.T) { | ||
| p := pool.FromConf("http::addr=localhost:1234") | ||
| ctx := context.Background() | ||
|
|
||
| // Start with an empty pool, allocate a new sender | ||
| s1, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
|
|
||
| // Release the sender | ||
| assert.NoError(t, p.Release(ctx, s1)) | ||
|
|
||
| // Try to release the sender again. This should fail because it already exists in the slice | ||
| assert.Error(t, p.Release(ctx, s1)) | ||
|
|
||
| } | ||
|
|
||
| func TestMaxPoolSize(t *testing.T) { | ||
| // Create a pool with 2 max senders | ||
| p := pool.FromConf("http::addr=localhost:1234", pool.WithMaxSenders(2)) | ||
| ctx := context.Background() | ||
|
|
||
| // Allocate 3 senders | ||
| s1, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
|
|
||
| s2, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
|
|
||
| s3, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
|
|
||
| // Release all senders in reverse order | ||
| // Internal slice will look like: [ s3 , s2 ] | ||
| assert.NoError(t, p.Release(ctx, s3)) | ||
| assert.NoError(t, p.Release(ctx, s2)) | ||
| assert.NoError(t, p.Release(ctx, s1)) | ||
|
|
||
| // Acquire 3 more senders. | ||
|
|
||
| // The first one will be s2 (senders get popped off the slice) | ||
| s, err := p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.Same(t, s, s2) | ||
|
|
||
| // The next will be s3 | ||
| s, err = p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.Same(t, s, s3) | ||
|
|
||
| // The final one will not be s1, s2, or s3 because the slice is empty | ||
| s, err = p.Acquire(ctx) | ||
| assert.NoError(t, err) | ||
| assert.NotSame(t, s, s1) | ||
| assert.NotSame(t, s, s2) | ||
| assert.NotSame(t, s, s3) | ||
|
|
||
| } | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.