forked from singularperturbation/sphinx
-
Notifications
You must be signed in to change notification settings - Fork 1
/
sphinx.go
280 lines (235 loc) · 6.49 KB
/
sphinx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Package sphinx is a Sphinx API library (not SphinxQL) that works with version 2.0.8 of Sphinx
package sphinx
import (
"bytes"
"fmt"
"io"
"log"
"net"
"time"
"sync"
"github.com/fatih/pool"
)
// Config holds options needed for SphinxClient to connect to the server.
type Config struct {
Host string
Port int
ConnectTimeout time.Duration
StartingPoolSize int
MaxPoolSize int
// Query-specific config option
MaxQueryTime time.Duration // Convert to milliseconds before sending
MaxReconnectAttempts int
}
// NewDefaultConfig provides sane defaults for the Sphinx Client
// - Listen on localhost with default Sphinx API port
// - Timeout of 10 seconds to connect to Sphinx server
// - Starting / Maximum connection pool size
func NewDefaultConfig() *Config {
return &Config{
Host: "0.0.0.0",
Port: 9312, // Default Sphinx API port
ConnectTimeout: time.Second * 1,
MaxQueryTime: 0,
StartingPoolSize: 1,
MaxPoolSize: 30,
MaxReconnectAttempts: 10,
}
}
// SphinxClient represents a pooled connection to the Sphinx server
// Thread-safe after being opened.
type SphinxClient struct {
Config Config
ConnectionPool pool.Pool
ReconnectAttemptCount int
mu sync.Mutex
}
// Limits:
// Offset: Distance from beginning of results
// Limit: Maximum matches to return
// Cutoff: Stop searching after this limit has been reached.
// MaxMatches: Maximum # of matches to return (default 1000)
type Limits struct {
Offset uint32
Limit uint32
Cutoff uint32
MaxMatches uint32
}
type FieldWeight struct {
FieldName string
FieldWeight uint32
}
// FilterValue is an approximation of struct st_filter type from sphinxclient.c
type FilterValue struct {
Attribute string
Type Filter
Values []uint64
Min uint64
Max uint64
Fmin float32
Fmax float32
Exclude uint32
}
type SphinxQuery struct {
Keywords string
Index string
// Discrete matching options
MatchType MatchMode
RankType RankMode
// Sorting options
SortType SortMode
SortByString string
// Filter options
Filters []FilterValue
// Offsets and max results
QueryLimits Limits
FieldWeights []FieldWeight
IndexWeights []FieldWeight
// ID limits
MinID uint64
MaxID uint64
MaxQueryTime time.Duration
Comment string
}
// DefaultQuery provides sane defaults for limits and index options.
// If value not specified, Go's zero value is the default.
func DefaultQuery() *SphinxQuery {
return &SphinxQuery{
Index: DefaultIndex,
QueryLimits: Limits{
Offset: 0,
Limit: 20,
Cutoff: 0,
MaxMatches: 1000,
},
}
}
// Init creates a SphinxClient with an initial connection pool to the Sphinx
// server. We will need to pass in a config or use the default.
func (s *SphinxClient) Init(config *Config) error {
if config == nil {
config = NewDefaultConfig()
}
s.Config = *config
s.ReconnectAttemptCount = 0
err := s.PoolInit()
return err
}
func (s *SphinxClient) PoolInit() error {
// Factory function that returns a new connection for use in the pool
sphinxConnFactory := func() (net.Conn, error) {
conn, err := net.DialTimeout(
"tcp",
fmt.Sprintf("%s:%d", s.Config.Host, s.Config.Port),
s.Config.ConnectTimeout,
)
if err != nil {
return nil, err
}
// Reset connect deadline to 0 after connection
conn.SetDeadline(time.Now().Add(s.Config.ConnectTimeout))
log.Println("Initializing sphinx connection")
err = rawInitializeSphinxConnection(conn)
conn.SetDeadline(time.Time{})
return conn, err
}
pool, err := pool.NewChannelPool(s.Config.StartingPoolSize, s.Config.MaxPoolSize, sphinxConnFactory)
s.ConnectionPool = pool
return err
}
// Close closes the connection pool used by the client, which closes all
// outstanding connections
func (s *SphinxClient) Close() {
if s.ConnectionPool != nil {
s.ConnectionPool.Close()
}
}
// Removes bad connection from connection pool
func (s *SphinxClient) RemoveBadConnection(c net.Conn) (err error) {
// Type assertion as pool connection - have to since what is returned is
// base interface type.
if poolConn, ok := c.(*pool.PoolConn); ok {
log.Printf("Removing bad connection from pool")
poolConn.MarkUnusable()
}
// Too many bad connections, reinitialize the pool
if s.ConnectionPool.Len() == 0 {
s.Close()
err = s.PoolInit()
return err
}
return nil
}
func (s *SphinxClient) HandleConnectionError(q *SphinxQuery, c net.Conn, err error) (*SphinxResult, error) {
if s.ReconnectAttemptCount >= s.Config.MaxReconnectAttempts {
return nil, err
}
s.mu.Lock()
s.ReconnectAttemptCount++
s.mu.Unlock()
err = s.RemoveBadConnection(c)
if err != nil {
return nil, err
}
return s.Query(q)
}
// Query takes SphinxQuery objects and spawns off requests to Sphinx for them
// TODO: Decompose this into functions, remove debugging statements
func (s *SphinxClient) Query(q *SphinxQuery) (*SphinxResult, error) {
if s.ConnectionPool == nil {
poolErr := s.PoolInit()
return nil, poolErr
}
// Build request first to avoid contention over connections in pool
q.MaxQueryTime = s.Config.MaxQueryTime
headerBuf, requestBuf, err := buildRequest(q)
if err != nil {
return nil, err
}
log.Println("Request for query built")
conn, err := s.ConnectionPool.Get()
if err != nil {
// return s.HandleConnectionError(q, conn, err)
return nil, err
}
defer conn.Close()
_, err = headerBuf.WriteTo(conn)
if err != nil {
return s.HandleConnectionError(q, conn, err)
}
_, err = requestBuf.WriteTo(conn)
if err != nil {
return s.HandleConnectionError(q, conn, err)
}
log.Println("Wrote request to server")
responseHeader, err := readHeader(conn)
if err != nil {
return s.HandleConnectionError(q, conn, err)
}
// Now need to read the remainder of the response into the buffer
// FIXME: Check len to make sure reasonable
responseBytes := make([]byte, responseHeader.len)
_, err = io.ReadFull(conn, responseBytes)
if err != nil {
return nil, err
}
result, err := getResultFromBuffer(responseHeader, bytes.NewBuffer(responseBytes))
// Query function succeeded. Reset reconnect attempt count.
s.mu.Lock()
s.ReconnectAttemptCount = 0
s.mu.Unlock()
return result, err
}
// NewSearch gives a better API to creating a query object that can be passed to
// Query.
func NewSearch(keywords, index, comment string) *SphinxQuery {
q := DefaultQuery()
q.Keywords = keywords
if index == "" {
q.Index = DefaultIndex
} else {
q.Index = index
}
q.Comment = comment
return q
}