Skip to content

Commit

Permalink
Shared pool among clients and fixed pool resources release (#51)
Browse files Browse the repository at this point in the history
* [add] added option to create clients with shared pool. added option to releases the resources used by the pool ( single, and multihost )

* [add] added Close() test on multi-host pool

* [add] extended test for Close on multihost pool

* [add] swapped deprecated redis.NewPool to redis.Pool constructor. preserving all errors on multihost pool CLose()

* [fix] fixed CI invalid memory address or nil pointer dereference on TestNewMultiHostPool/multi-host_single_address
  • Loading branch information
filipecosta90 authored Apr 7, 2020
1 parent 7ccd0b5 commit a595a0d
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 194 deletions.
70 changes: 62 additions & 8 deletions redisearch/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ import (
"testing"
)

func createClient(indexName string) *Client {
value, exists := os.LookupEnv("REDISEARCH_TEST_HOST")
host := "localhost:6379"
if exists && value != "" {
host = value
}
return NewClient(host, indexName)
}

// Game struct which contains a Asin, a Description, a Title, a Price, and a list of categories
// a type and a list of social links
Expand All @@ -38,6 +30,68 @@ type Game struct {
Categories []string `json:"categories"`
}


func init() {
/* load test data */
value, exists := os.LookupEnv("REDISEARCH_RDB_LOADED")
requiresDatagen := true
if exists && value != "" {
requiresDatagen = false
}
if requiresDatagen {
c := createClient("bench.ft.aggregate")

sc := NewSchema(DefaultOptions).
AddField(NewTextField("foo"))
c.Drop()
if err := c.CreateIndex(sc); err != nil {
log.Fatal(err)
}
ndocs := 10000
docs := make([]Document, ndocs)
for i := 0; i < ndocs; i++ {
docs[i] = NewDocument(fmt.Sprintf("doc%d", i), 1).Set("foo", "hello world")
}

if err := c.IndexOptions(DefaultIndexingOptions, docs...); err != nil {
log.Fatal(err)
}
}

}

func benchmarkAggregate(c *Client, q *AggregateQuery, b *testing.B) {
for n := 0; n < b.N; n++ {
c.Aggregate(q)
}
}

func benchmarkAggregateCursor(c *Client, q *AggregateQuery, b *testing.B) {
for n := 0; n < b.N; n++ {
c.Aggregate(q)
for q.CursorHasResults() {
c.Aggregate(q)
}
}
}

func BenchmarkAgg_1(b *testing.B) {
c := createClient("bench.ft.aggregate")
q := NewAggregateQuery().
SetQuery(NewQuery("*"))
b.ResetTimer()
benchmarkAggregate(c, q, b)
}

func BenchmarkAggCursor_1(b *testing.B) {
c := createClient("bench.ft.aggregate")
q := NewAggregateQuery().
SetQuery(NewQuery("*")).
SetCursor(NewCursor())
b.ResetTimer()
benchmarkAggregateCursor(c, q, b)
}

func AddValues(c *Client) {
// Open our jsonFile
bzipfile := "../tests/games.json.bz2"
Expand Down
35 changes: 12 additions & 23 deletions redisearch/autocomplete_test.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,21 @@
package redisearch_test
package redisearch

import (
"fmt"
"github.com/RediSearch/redisearch-go/redisearch"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
"os"
"reflect"
"testing"
)

func createAutocompleter(dictName string) *redisearch.Autocompleter {
value, exists := os.LookupEnv("REDISEARCH_TEST_HOST")
host := "localhost:6379"
if exists && value != "" {
host = value
}
return redisearch.NewAutocompleter(host, dictName)
}

func TestAutocompleter_Serialize(t *testing.T) {
fuzzy := redisearch.DefaultSuggestOptions
fuzzy := DefaultSuggestOptions
fuzzy.Fuzzy = true
withscores := redisearch.DefaultSuggestOptions
withscores := DefaultSuggestOptions
withscores.WithScores = true
withpayloads := redisearch.DefaultSuggestOptions
withpayloads := DefaultSuggestOptions
withpayloads.WithPayloads = true
all := redisearch.DefaultSuggestOptions
all := DefaultSuggestOptions
all.Fuzzy = true
all.WithScores = true
all.WithPayloads = true
Expand All @@ -36,7 +25,7 @@ func TestAutocompleter_Serialize(t *testing.T) {
}
type args struct {
prefix string
opts redisearch.SuggestOptions
opts SuggestOptions
}
tests := []struct {
name string
Expand All @@ -45,15 +34,15 @@ func TestAutocompleter_Serialize(t *testing.T) {
want redis.Args
want1 int
}{
{"default options", fields{"key1"}, args{"ab", redisearch.DefaultSuggestOptions,}, redis.Args{"key1", "ab", "MAX", 5}, 1},
{"default options", fields{"key1"}, args{"ab", DefaultSuggestOptions,}, redis.Args{"key1", "ab", "MAX", 5}, 1},
{"FUZZY", fields{"key1"}, args{"ab", fuzzy,}, redis.Args{"key1", "ab", "MAX", 5, "FUZZY"}, 1},
{"WITHSCORES", fields{"key1"}, args{"ab", withscores,}, redis.Args{"key1", "ab", "MAX", 5, "WITHSCORES"}, 2},
{"WITHPAYLOADS", fields{"key1"}, args{"ab", withpayloads,}, redis.Args{"key1", "ab", "MAX", 5, "WITHPAYLOADS"}, 2},
{"all", fields{"key1"}, args{"ab", all,}, redis.Args{"key1", "ab", "MAX", 5, "FUZZY", "WITHSCORES", "WITHPAYLOADS"}, 3},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
a := redisearch.NewAutocompleterFromPool(nil, tt.fields.name)
a := NewAutocompleterFromPool(nil, tt.fields.name)
got, got1 := a.Serialize(tt.args.prefix, tt.args.opts)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("serialize() got = %v, want %v", got, tt.want)
Expand All @@ -69,9 +58,9 @@ func TestSuggest(t *testing.T) {
a := createAutocompleter("testing")

// Add Terms to the Autocompleter
terms := make([]redisearch.Suggestion, 10)
terms := make([]Suggestion, 10)
for i := 0; i < 10; i++ {
terms[i] = redisearch.Suggestion{Term: fmt.Sprintf("foo %d", i),
terms[i] = Suggestion{Term: fmt.Sprintf("foo %d", i),
Score: 1.0, Payload: fmt.Sprintf("bar %d", i)}
}
err := a.AddTerms(terms...)
Expand All @@ -80,7 +69,7 @@ func TestSuggest(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, int64(10), suglen)
// Retrieve Terms From Autocompleter - Without Payloads / Scores
suggestions, err := a.SuggestOpts("f", redisearch.SuggestOptions{Num: 10})
suggestions, err := a.SuggestOpts("f", SuggestOptions{Num: 10})
assert.Nil(t, err)
assert.Equal(t, 10, len(suggestions))
for _, suggestion := range suggestions {
Expand All @@ -90,7 +79,7 @@ func TestSuggest(t *testing.T) {
}

// Retrieve Terms From Autocompleter - With Payloads & Scores
suggestions, err = a.SuggestOpts("f", redisearch.SuggestOptions{Num: 10, WithScores: true, WithPayloads: true})
suggestions, err = a.SuggestOpts("f", SuggestOptions{Num: 10, WithScores: true, WithPayloads: true})
assert.Nil(t, err)
assert.Equal(t, 10, len(suggestions))
for _, suggestion := range suggestions {
Expand Down
9 changes: 9 additions & 0 deletions redisearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func NewClient(addr, name string) *Client {
return ret
}

// NewAutocompleter creates a new Autocompleter with the given pool and index name
func NewClientFromPool(pool *redis.Pool, name string) *Client {
ret := &Client{
pool: pool,
name: name,
}
return ret
}

// CreateIndex configues the index and creates it on redis
func (i *Client) CreateIndex(s *Schema) (err error) {
args := redis.Args{i.name}
Expand Down
81 changes: 17 additions & 64 deletions redisearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,73 +2,12 @@ package redisearch

import (
"fmt"
"github.com/stretchr/testify/assert"
"log"
"os"
"reflect"
"testing"
)

func init() {
/* load test data */
value, exists := os.LookupEnv("REDISEARCH_RDB_LOADED")
requiresDatagen := true
if exists && value != "" {
requiresDatagen = false
}
if requiresDatagen {
c := createClient("bench.ft.aggregate")

sc := NewSchema(DefaultOptions).
AddField(NewTextField("foo"))
c.Drop()
if err := c.CreateIndex(sc); err != nil {
log.Fatal(err)
}
ndocs := 10000
docs := make([]Document, ndocs)
for i := 0; i < ndocs; i++ {
docs[i] = NewDocument(fmt.Sprintf("doc%d", i), 1).Set("foo", "hello world")
}

if err := c.IndexOptions(DefaultIndexingOptions, docs...); err != nil {
log.Fatal(err)
}
}

}

func benchmarkAggregate(c *Client, q *AggregateQuery, b *testing.B) {
for n := 0; n < b.N; n++ {
c.Aggregate(q)
}
}

func benchmarkAggregateCursor(c *Client, q *AggregateQuery, b *testing.B) {
for n := 0; n < b.N; n++ {
c.Aggregate(q)
for q.CursorHasResults() {
c.Aggregate(q)
}
}
}

func BenchmarkAgg_1(b *testing.B) {
c := createClient("bench.ft.aggregate")
q := NewAggregateQuery().
SetQuery(NewQuery("*"))
b.ResetTimer()
benchmarkAggregate(c, q, b)
}

func BenchmarkAggCursor_1(b *testing.B) {
c := createClient("bench.ft.aggregate")
q := NewAggregateQuery().
SetQuery(NewQuery("*")).
SetCursor(NewCursor())
b.ResetTimer()
benchmarkAggregateCursor(c, q, b)
}
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
)

func TestClient_Get(t *testing.T) {

Expand Down Expand Up @@ -492,6 +431,20 @@ func TestClient_Config(t *testing.T) {
assert.Equal(t, "100", kvs["TIMEOUT"])
}

func TestNewClientFromPool(t *testing.T) {
host, password := getTestConnectionDetails()
pool := &redis.Pool{Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", host, redis.DialPassword(password))
}, MaxIdle: maxConns}
client1 := NewClientFromPool(pool, "index1")
client2 := NewClientFromPool(pool, "index2")
assert.Equal(t, client1.pool, client2.pool)
err1 := client1.pool.Close()
err2 := client2.pool.Close()
assert.Nil(t, err1)
assert.Nil(t, err2)
}

func TestClient_GetTagVals(t *testing.T) {
c := createClient("testgettagvals")

Expand Down
9 changes: 4 additions & 5 deletions redisearch/document_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package redisearch_test
package redisearch

import (
"github.com/RediSearch/redisearch-go/redisearch"
"reflect"
"testing"
)
Expand All @@ -24,7 +23,7 @@ func TestEscapeTextFileString(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := redisearch.EscapeTextFileString(tt.args.value); got != tt.want {
if got := EscapeTextFileString(tt.args.value); got != tt.want {
t.Errorf("EscapeTextFileString() = %v, want %v", got, tt.want)
}
})
Expand Down Expand Up @@ -55,7 +54,7 @@ func TestDocument_EstimateSize(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &redisearch.Document{
d := &Document{
Id: tt.fields.Id,
Score: tt.fields.Score,
Payload: tt.fields.Payload,
Expand Down Expand Up @@ -90,7 +89,7 @@ func TestDocument_SetPayload(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
d := &redisearch.Document{
d := &Document{
Id: tt.fields.Id,
Score: tt.fields.Score,
Payload: tt.fields.Payload,
Expand Down
30 changes: 24 additions & 6 deletions redisearch/pool.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,33 @@
package redisearch

import (
"fmt"
"github.com/gomodule/redigo/redis"
"math/rand"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)

type ConnPool interface {
Get() redis.Conn
Close() error
}

type SingleHostPool struct {
*redis.Pool
}

func NewSingleHostPool(host string) *SingleHostPool {
ret := redis.NewPool(func() (redis.Conn, error) {
// TODO: Add timeouts. and 2 separate pools for indexing and querying, with different timeouts
pool := &redis.Pool{Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", host)
}, maxConns)
ret.TestOnBorrow = func(c redis.Conn, t time.Time) (err error) {
}, MaxIdle: maxConns}
pool.TestOnBorrow = func(c redis.Conn, t time.Time) (err error) {
if time.Since(t) > time.Second {
_, err = c.Do("PING")
}
return err
}
return &SingleHostPool{ret}
return &SingleHostPool{pool}
}

type MultiHostPool struct {
Expand Down Expand Up @@ -65,3 +66,20 @@ func (p *MultiHostPool) Get() redis.Conn {
return pool.Get()

}

func (p *MultiHostPool) Close() (err error) {
p.Lock()
defer p.Unlock()
for host, pool := range p.pools {
poolErr := pool.Close()
//preserve pool error if not nil but continue
if poolErr != nil {
if err == nil {
err = fmt.Errorf("Error closing pool for host %s. Got %v.", host, poolErr)
} else {
err = fmt.Errorf("%v Error closing pool for host %s. Got %v.", err, host, poolErr)
}
}
}
return
}
Loading

0 comments on commit a595a0d

Please sign in to comment.