Skip to content
This repository has been archived by the owner on Feb 15, 2019. It is now read-only.

Add sticky session support to containous/oxy's round robin #4

Merged
merged 4 commits into from
Jul 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions roundrobin/rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"sync"
"time"

"github.com/mailgun/timetools"
"github.com/vulcand/oxy/memmetrics"
"github.com/vulcand/oxy/utils"
"github.com/mailgun/timetools"
)

// RebalancerOption - functional option setter for rebalancer
Expand Down Expand Up @@ -48,6 +48,9 @@ type Rebalancer struct {

// creates new meters
newMeter NewMeterFn

// sticky session object
ss *StickySession
}

func RebalancerLogger(log utils.Logger) RebalancerOption {
Expand Down Expand Up @@ -86,10 +89,18 @@ func RebalancerErrorHandler(h utils.ErrorHandler) RebalancerOption {
}
}

func RebalancerStickySession(ss *StickySession) RebalancerOption {
return func(r *Rebalancer) error {
r.ss = ss
return nil
}
}

func NewRebalancer(handler balancerHandler, opts ...RebalancerOption) (*Rebalancer, error) {
rb := &Rebalancer{
mtx: &sync.Mutex{},
next: handler,
ss: nil,
}
for _, o := range opts {
if err := o(rb); err != nil {
Expand Down Expand Up @@ -134,18 +145,41 @@ func (rb *Rebalancer) Servers() []*url.URL {
func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
pw := &utils.ProxyWriter{W: w}
start := rb.clock.UtcNow()
url, err := rb.next.NextServer()
if err != nil {
rb.errHandler.ServeHTTP(w, req, err)
return
}

// make shallow copy of request before changing anything to avoid side effects
newReq := *req
newReq.URL = url
stuck := false

if rb.ss != nil {
cookie_url, present, err := rb.ss.GetBackend(&newReq, rb.Servers())

if err != nil {
rb.errHandler.ServeHTTP(w, req, err)
return
}

if present {
newReq.URL = cookie_url
stuck = true
}
}

if !stuck {
url, err := rb.next.NextServer()
if err != nil {
rb.errHandler.ServeHTTP(w, req, err)
return
}

if rb.ss != nil {
rb.ss.StickBackend(url, &w)
}

newReq.URL = url
}
rb.next.Next().ServeHTTP(pw, &newReq)

rb.recordMetrics(url, pw.Code, rb.clock.UtcNow().Sub(start))
rb.recordMetrics(newReq.URL, pw.Code, rb.clock.UtcNow().Sub(start))
rb.adjustWeights()
}

Expand Down
50 changes: 49 additions & 1 deletion roundrobin/rebalancer_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package roundrobin

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"time"

"github.com/mailgun/timetools"
"github.com/vulcand/oxy/forward"
"github.com/vulcand/oxy/testutils"
"github.com/vulcand/oxy/utils"
"github.com/mailgun/timetools"

. "gopkg.in/check.v1"
)
Expand Down Expand Up @@ -327,6 +328,53 @@ func (s *RBSuite) TestRebalancerLive(c *C) {
c.Assert(rb.servers[2].curWeight, Equals, 1)
}

func (s *RBSuite) TestRebalancerStickySession(c *C) {
a, b, x := testutils.NewResponder("a"), testutils.NewResponder("b"), testutils.NewResponder("x")
defer a.Close()
defer b.Close()
defer x.Close()

sticky := NewStickySession("test")
c.Assert(sticky, NotNil)

fwd, err := forward.New()
c.Assert(err, IsNil)

lb, err := New(fwd)
c.Assert(err, IsNil)

rb, err := NewRebalancer(lb, RebalancerStickySession(sticky))
c.Assert(err, IsNil)

rb.UpsertServer(testutils.ParseURI(a.URL))
rb.UpsertServer(testutils.ParseURI(b.URL))
rb.UpsertServer(testutils.ParseURI(x.URL))

proxy := httptest.NewServer(rb)
defer proxy.Close()

http_cli := &http.Client{}
for i := 0; i < 10; i++ {
req, err := http.NewRequest("GET", proxy.URL, nil)
c.Assert(err, IsNil)
req.AddCookie(&http.Cookie{Name: "test", Value: a.URL})

resp, err := http_cli.Do(req)
c.Assert(err, IsNil)

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)

c.Assert(err, IsNil)
c.Assert(string(body), Equals, "a")
}

c.Assert(rb.RemoveServer(testutils.ParseURI(a.URL)), IsNil)
c.Assert(seq(c, proxy.URL, 3), DeepEquals, []string{"b", "x", "b"})
c.Assert(rb.RemoveServer(testutils.ParseURI(b.URL)), IsNil)
c.Assert(seq(c, proxy.URL, 3), DeepEquals, []string{"x", "x", "x"})
}

type testMeter struct {
rating float64
notReady bool
Expand Down
42 changes: 36 additions & 6 deletions roundrobin/rr.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ func ErrorHandler(h utils.ErrorHandler) LBOption {
}
}

func EnableStickySession(ss *StickySession) LBOption {
return func(s *RoundRobin) error {
s.ss = ss
return nil
}
}

type RoundRobin struct {
mutex *sync.Mutex
next http.Handler
Expand All @@ -37,6 +44,7 @@ type RoundRobin struct {
index int
servers []*server
currentWeight int
ss *StickySession
}

func New(next http.Handler, opts ...LBOption) (*RoundRobin, error) {
Expand All @@ -45,6 +53,7 @@ func New(next http.Handler, opts ...LBOption) (*RoundRobin, error) {
index: -1,
mutex: &sync.Mutex{},
servers: []*server{},
ss: nil,
}
for _, o := range opts {
if err := o(rr); err != nil {
Expand All @@ -62,14 +71,35 @@ func (r *RoundRobin) Next() http.Handler {
}

func (r *RoundRobin) ServeHTTP(w http.ResponseWriter, req *http.Request) {
url, err := r.NextServer()
if err != nil {
r.errHandler.ServeHTTP(w, req, err)
return
}
// make shallow copy of request before chaning anything to avoid side effects
newReq := *req
newReq.URL = url
stuck := false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, there is an issue here I think.
You don't call url, err := r.NextServer() if stuck is false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Glad I saw your comment in my other PR, I didn't see this comment!

I've retraced the logic of that section. The flow is this:

  • Look to see if we have a sticky session
    ** if so, pull the server out of the cookie and store it.
    ** if not, fall through to the next session
  • Look to see if we use sticky sessions & do not have a server || if we do not use sticky sessions:
    ** call NextServer() to get the approp. URL
    ** if we have sticky sessions, stick that backend
  • Return the calculated backend URL

So we only call NextServer() if we have failed to retrieve a server URL from the cookie - that's in the if !stuck { stanza. We won't call it if stuck = true, which is only if we have retrieved the server from the cookie -- and, validated that it is still a valid server, which we handle in ss.GetBackend.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emilevauge let me know if that clears it up! alternatively, if I'm missing something here - which I may be ;) - let me know and I can revise

if r.ss != nil {
cookie_url, present, err := r.ss.GetBackend(&newReq, r.Servers())

if err != nil {
r.errHandler.ServeHTTP(w, req, err)
return
}

if present {
newReq.URL = cookie_url
stuck = true
}
}

if !stuck {
url, err := r.NextServer()
if err != nil {
r.errHandler.ServeHTTP(w, req, err)
return
}

if r.ss != nil {
r.ss.StickBackend(url, &w)
}
newReq.URL = url
}
r.next.ServeHTTP(w, &newReq)
}

Expand Down
57 changes: 57 additions & 0 deletions roundrobin/stickysessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// package stickysession is a mixin for load balancers that implements layer 7 (http cookie) session affinity
package roundrobin

import (
"net/http"
"net/url"
)

type StickySession struct {
cookiename string
}

func NewStickySession(c string) *StickySession {
return &StickySession{c}
}

// GetBackend returns the backend URL stored in the sticky cookie, iff the backend is still in the valid list of servers.
func (s *StickySession) GetBackend(req *http.Request, servers []*url.URL) (*url.URL, bool, error) {
cookie, err := req.Cookie(s.cookiename)
switch err {
case nil:
case http.ErrNoCookie:
return nil, false, nil
default:
return nil, false, err
}

s_url, err := url.Parse(cookie.Value)
if err != nil {
return nil, false, err
}

if s.isBackendAlive(s_url, servers) {
return s_url, true, nil
} else {
return nil, false, nil
}
}

func (s *StickySession) StickBackend(backend *url.URL, w *http.ResponseWriter) {
c := &http.Cookie{Name: s.cookiename, Value: backend.String()}
http.SetCookie(*w, c)
return
}

func (s *StickySession) isBackendAlive(needle *url.URL, haystack []*url.URL) bool {
if len(haystack) == 0 {
return false
}

for _, s := range haystack {
if sameURL(needle, s) {
return true
}
}
return false
}
Loading