Skip to content

Commit

Permalink
changed: Handle calls to https://insights.algolia.io in transport layer
Browse files Browse the repository at this point in the history
  • Loading branch information
aseure committed Dec 13, 2018
1 parent 713245f commit 0ef2010
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 9 deletions.
2 changes: 2 additions & 0 deletions algoliasearch/call/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ const (
Read Kind = iota
Write
Analytics
Insights
)

func IsRead(k Kind) bool { return k == Read }
func IsWrite(k Kind) bool { return k == Write }
func IsAnalytics(k Kind) bool { return k == Analytics }
func IsInsights(k Kind) bool { return k == Insights }
func IsReadWrite(k Kind) bool { return IsRead(k) || IsWrite(k) }
7 changes: 4 additions & 3 deletions algoliasearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ func (c *client) SetTimeout(connectTimeout, readTimeout int) {
// configurable.
c.SetReadTimeout(time.Duration(readTimeout) * time.Second)
}
func (c *client) SetReadTimeout(t time.Duration) { c.transport.setTimeouts(t, -1, -1) }
func (c *client) SetWriteTimeout(t time.Duration) { c.transport.setTimeouts(-1, t, -1) }
func (c *client) SetAnalyticsTimeout(t time.Duration) { c.transport.setTimeouts(-1, -1, t) }
func (c *client) SetReadTimeout(t time.Duration) { c.transport.setTimeouts(t, -1, -1, -1) }
func (c *client) SetWriteTimeout(t time.Duration) { c.transport.setTimeouts(-1, t, -1, -1) }
func (c *client) SetAnalyticsTimeout(t time.Duration) { c.transport.setTimeouts(-1, -1, t, -1) }
func (c *client) SetInsightsTimeout(t time.Duration) { c.transport.setTimeouts(-1, -1, -1, t) }

func (c *client) SetMaxIdleConnsPerHosts(maxIdleConnsPerHost int) {
c.transport.setMaxIdleConnsPerHost(maxIdleConnsPerHost)
Expand Down
13 changes: 11 additions & 2 deletions algoliasearch/retry_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
DefaultReadTimeout = 5 * time.Second
DefaultWriteTimeout = 30 * time.Second
DefaultAnalyticsTimeout = 30 * time.Second
DefaultInsightsTimeout = 30 * time.Second

Success Outcome = iota
Failure
Expand Down Expand Up @@ -53,7 +54,7 @@ type RetryStrategy interface {
// SetTimeouts updates the internal timeouts for read, write (i.e.
// indexing) and analytics calls. Negative values are simply ignored,
// leaving the original timeouts unchanged.
SetTimeouts(read, write, analytics time.Duration)
SetTimeouts(read, write, analytics, insights time.Duration)
}

type retryStrategy struct {
Expand All @@ -62,6 +63,7 @@ type retryStrategy struct {
readTimeout time.Duration
writeTimeout time.Duration
analyticsTimeout time.Duration
insightsTimeout time.Duration
}

type statefulHost struct {
Expand Down Expand Up @@ -107,12 +109,14 @@ func NewRetryStrategy(appID string, providedHosts []string) *retryStrategy {
)...)
}
allHosts = append(allHosts, &statefulHost{host: "analytics.algolia.com", lastUpdate: now, accept: call.IsAnalytics})
allHosts = append(allHosts, &statefulHost{host: "insights.algolia.io", lastUpdate: now, accept: call.IsInsights})

return &retryStrategy{
hosts: allHosts,
readTimeout: DefaultReadTimeout,
writeTimeout: DefaultWriteTimeout,
analyticsTimeout: DefaultAnalyticsTimeout,
insightsTimeout: DefaultInsightsTimeout,
}
}

Expand All @@ -131,6 +135,8 @@ func (s *retryStrategy) GetTryableHosts(k call.Kind) []TryableHost {
baseTimeout = s.writeTimeout
case call.Analytics:
baseTimeout = s.analyticsTimeout
case call.Insights:
baseTimeout = s.insightsTimeout
default:
return nil
}
Expand Down Expand Up @@ -176,7 +182,7 @@ func (s *retryStrategy) Decide(h TryableHost, code int, err error) Outcome {
return Failure
}

func (s *retryStrategy) SetTimeouts(read, write, analytics time.Duration) {
func (s *retryStrategy) SetTimeouts(read, write, analytics, insights time.Duration) {
s.Lock()
defer s.Unlock()

Expand All @@ -189,6 +195,9 @@ func (s *retryStrategy) SetTimeouts(read, write, analytics time.Duration) {
if analytics > 0 {
s.analyticsTimeout = analytics
}
if insights > 0 {
s.insightsTimeout = insights
}
}

func (s *retryStrategy) markUp(host string) { s.update(host, false, false) }
Expand Down
9 changes: 7 additions & 2 deletions algoliasearch/retry_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ func TestRetryStrategy_Concurrent(t *testing.T) {
readTimeout = 1 * time.Second
writeTimeout = 2 * time.Second
analyticsTimeout = 3 * time.Second
insightsTimeout = 4 * time.Second
)
strategy.SetTimeouts(readTimeout, writeTimeout, analyticsTimeout)
strategy.SetTimeouts(readTimeout, writeTimeout, analyticsTimeout, insightsTimeout)

for i := 0; i < 1000; i++ {
wg.Add(2)
Expand All @@ -214,12 +215,16 @@ func TestRetryStrategy_Concurrent(t *testing.T) {
hosts = strategy.GetTryableHosts(call.Analytics)
require.ElementsMatch(t, expected, hosts)

expected = []TryableHost{&tryableHost{"insights.algolia.io", insightsTimeout}}
hosts = strategy.GetTryableHosts(call.Insights)
require.ElementsMatch(t, expected, hosts)

}(&wg)

go func(wg *sync.WaitGroup) {
defer wg.Done()

strategy.SetTimeouts(readTimeout, writeTimeout, analyticsTimeout)
strategy.SetTimeouts(readTimeout, writeTimeout, analyticsTimeout, insightsTimeout)
}(&wg)
}

Expand Down
7 changes: 5 additions & 2 deletions algoliasearch/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
write
read
analyticsCall
insightsCall
)

// Transport is responsible for the connection and the retry strategy to
Expand Down Expand Up @@ -88,6 +89,8 @@ func (t *Transport) request(method, path string, body interface{}, typeCall int,
k = call.Write
case analyticsCall:
k = call.Analytics
case insightsCall:
k = call.Insights
default:
return nil, fmt.Errorf("unsupported call type %d", typeCall)
}
Expand Down Expand Up @@ -193,8 +196,8 @@ func (t *Transport) setExtraHeader(key, value string) {
t.headers[key] = value
}

func (t *Transport) setTimeouts(read, write, analytics time.Duration) {
t.retryStrategy.SetTimeouts(read, write, analytics)
func (t *Transport) setTimeouts(read, write, analytics, insights time.Duration) {
t.retryStrategy.SetTimeouts(read, write, analytics, insights)
}

// setMaxIdleConnsPerHost sets the `MaxIdleConnsPerHost` via the given
Expand Down

0 comments on commit 0ef2010

Please sign in to comment.