From 7353a856e41ea49b3830f2b72009fc1e62a67d1e Mon Sep 17 00:00:00 2001 From: Owen Marshall Date: Thu, 31 Mar 2016 22:29:32 -0400 Subject: [PATCH 1/4] qnd cookie hacking --- roundrobin/rr.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/roundrobin/rr.go b/roundrobin/rr.go index a8812fea..1dd54f7d 100644 --- a/roundrobin/rr.go +++ b/roundrobin/rr.go @@ -61,15 +61,51 @@ func (r *RoundRobin) Next() http.Handler { return r.next } +func (r *RoundRobin) getCookieVal(req *http.Request) (*url.URL, bool, error) { + // Before we move on to a new server, peek at our req cookie to see if we are bound. + // If so, serve to them. + cookie, err := req.Cookie("__STICKY_SVR") + switch err { + case nil: + case http.ErrNoCookie: + return nil, false, nil + default: + return nil, false, err + + } + + s_url, err := url.Parse(cookie.Value) + if err != nil { + return nil, false, err + } + + s, i := r.findServerByURL(s_url) + if i != -1 { + return s.url, true, nil + } else { + return nil, false, nil + } + +} func (r *RoundRobin) ServeHTTP(w http.ResponseWriter, req *http.Request) { - url, err := r.NextServer() + // make shallow copy of request before chaning anything to avoid side effects + newReq := *req + cookie_url, present, err := r.getCookieVal(&newReq) + if err != nil { r.errHandler.ServeHTTP(w, req, err) return } - // make shallow copy of request before chaning anything to avoid side effects - newReq := *req - newReq.URL = url + if present { + newReq.URL = cookie_url + } else { + url, err := r.NextServer() + if err != nil { + r.errHandler.ServeHTTP(w, req, err) + return + } + newReq.URL = url + } r.next.ServeHTTP(w, &newReq) } From 7095f8c49ee62ccaff43095923cce616cb1e265f Mon Sep 17 00:00:00 2001 From: Owen Marshall Date: Fri, 1 Apr 2016 10:42:05 -0400 Subject: [PATCH 2/4] Add work in progress sticky session support Sticky sessions are set through an HTTP cookie. If the cookie: * is not present, use the next server & set that as sticky * is present, * but is no longer valid, use the next server & set that as sticky * and valid, use that server without advancing .next. --- roundrobin/rr.go | 60 ++++---- roundrobin/stickysessions.go | 57 +++++++ roundrobin/stickysessions_test.go | 237 ++++++++++++++++++++++++++++++ 3 files changed, 321 insertions(+), 33 deletions(-) create mode 100644 roundrobin/stickysessions.go create mode 100644 roundrobin/stickysessions_test.go diff --git a/roundrobin/rr.go b/roundrobin/rr.go index 1dd54f7d..4f1b0a30 100644 --- a/roundrobin/rr.go +++ b/roundrobin/rr.go @@ -29,6 +29,13 @@ func ErrorHandler(h utils.ErrorHandler) LBOption { } } +func EnableStickySession(ss *StickySession) LBOption { + return func(s *RoundRobin) error { + s.ss = ss + return nil + } +} + type RoundRobin struct { mutex *sync.Mutex next http.Handler @@ -37,6 +44,7 @@ type RoundRobin struct { index int servers []*server currentWeight int + ss *StickySession } func New(next http.Handler, opts ...LBOption) (*RoundRobin, error) { @@ -45,6 +53,7 @@ func New(next http.Handler, opts ...LBOption) (*RoundRobin, error) { index: -1, mutex: &sync.Mutex{}, servers: []*server{}, + ss: nil, } for _, o := range opts { if err := o(rr); err != nil { @@ -61,49 +70,34 @@ func (r *RoundRobin) Next() http.Handler { return r.next } -func (r *RoundRobin) getCookieVal(req *http.Request) (*url.URL, bool, error) { - // Before we move on to a new server, peek at our req cookie to see if we are bound. - // If so, serve to them. - cookie, err := req.Cookie("__STICKY_SVR") - switch err { - case nil: - case http.ErrNoCookie: - return nil, false, nil - default: - return nil, false, err - - } - - s_url, err := url.Parse(cookie.Value) - if err != nil { - return nil, false, err - } - - s, i := r.findServerByURL(s_url) - if i != -1 { - return s.url, true, nil - } else { - return nil, false, nil - } - -} func (r *RoundRobin) ServeHTTP(w http.ResponseWriter, req *http.Request) { // make shallow copy of request before chaning anything to avoid side effects newReq := *req - cookie_url, present, err := r.getCookieVal(&newReq) + stuck := false + if r.ss != nil { + cookie_url, present, err := r.ss.GetBackend(&newReq, r.Servers()) - if err != nil { - r.errHandler.ServeHTTP(w, req, err) - return + if err != nil { + r.errHandler.ServeHTTP(w, req, err) + return + } + + if present { + newReq.URL = cookie_url + stuck = true + } } - if present { - newReq.URL = cookie_url - } else { + + if !stuck { url, err := r.NextServer() if err != nil { r.errHandler.ServeHTTP(w, req, err) return } + + if r.ss != nil { + r.ss.StickBackend(url, &w) + } newReq.URL = url } r.next.ServeHTTP(w, &newReq) diff --git a/roundrobin/stickysessions.go b/roundrobin/stickysessions.go new file mode 100644 index 00000000..170d583e --- /dev/null +++ b/roundrobin/stickysessions.go @@ -0,0 +1,57 @@ +// package stickysession is a mixin for load balancers that implements layer 8 (http cookie) session affinity +package roundrobin + +import ( + "net/http" + "net/url" +) + +type StickySession struct { + cookiename string +} + +func NewStickySession(c string) *StickySession { + return &StickySession{c} +} + +// GetBackend returns the backend URL stored in the sticky cookie, iff the backend is still in the valid list of servers. +func (s *StickySession) GetBackend(req *http.Request, servers []*url.URL) (*url.URL, bool, error) { + cookie, err := req.Cookie(s.cookiename) + switch err { + case nil: + case http.ErrNoCookie: + return nil, false, nil + default: + return nil, false, err + } + + s_url, err := url.Parse(cookie.Value) + if err != nil { + return nil, false, err + } + + if s.isBackendAlive(s_url, servers) { + return s_url, true, nil + } else { + return nil, false, nil + } +} + +func (s *StickySession) StickBackend(backend *url.URL, w *http.ResponseWriter) { + c := &http.Cookie{Name: s.cookiename, Value: backend.String()} + http.SetCookie(*w, c) + return +} + +func (s *StickySession) isBackendAlive(needle *url.URL, haystack []*url.URL) bool { + if len(haystack) == 0 { + return false + } + + for _, s := range haystack { + if sameURL(needle, s) { + return true + } + } + return false +} diff --git a/roundrobin/stickysessions_test.go b/roundrobin/stickysessions_test.go new file mode 100644 index 00000000..e3251ba4 --- /dev/null +++ b/roundrobin/stickysessions_test.go @@ -0,0 +1,237 @@ +package roundrobin + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/vulcand/oxy/forward" + "github.com/vulcand/oxy/testutils" + + . "gopkg.in/check.v1" +) + +func TestSS(t *testing.T) { TestingT(t) } + +type SSSuite struct{} + +var _ = Suite(&SSSuite{}) + +func (s *SSSuite) TestBasic(c *C) { + a := testutils.NewResponder("a") + b := testutils.NewResponder("b") + + defer a.Close() + defer b.Close() + + fwd, err := forward.New() + c.Assert(err, IsNil) + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + lb, err := New(fwd, EnableStickySession(sticky)) + c.Assert(err, IsNil) + + lb.UpsertServer(testutils.ParseURI(a.URL)) + lb.UpsertServer(testutils.ParseURI(b.URL)) + + proxy := httptest.NewServer(lb) + defer proxy.Close() + + http_cli := &http.Client{} + + for i := 0; i < 10; i++ { + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "a") + } +} + +func (s *SSSuite) TestStickCookie(c *C) { + a := testutils.NewResponder("a") + b := testutils.NewResponder("b") + + defer a.Close() + defer b.Close() + + fwd, err := forward.New() + c.Assert(err, IsNil) + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + lb, err := New(fwd, EnableStickySession(sticky)) + c.Assert(err, IsNil) + + lb.UpsertServer(testutils.ParseURI(a.URL)) + lb.UpsertServer(testutils.ParseURI(b.URL)) + + proxy := httptest.NewServer(lb) + defer proxy.Close() + + resp, err := http.Get(proxy.URL) + c.Assert(err, IsNil) + + c_out := resp.Cookies()[0] + c.Assert(c_out.Name, Equals, "test") + c.Assert(c_out.Value, Equals, a.URL) +} + +func (s *SSSuite) TestRemoveRespondingServer(c *C) { + a := testutils.NewResponder("a") + b := testutils.NewResponder("b") + + defer a.Close() + defer b.Close() + + fwd, err := forward.New() + c.Assert(err, IsNil) + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + lb, err := New(fwd, EnableStickySession(sticky)) + c.Assert(err, IsNil) + + lb.UpsertServer(testutils.ParseURI(a.URL)) + lb.UpsertServer(testutils.ParseURI(b.URL)) + + proxy := httptest.NewServer(lb) + defer proxy.Close() + + http_cli := &http.Client{} + + for i := 0; i < 10; i++ { + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "a") + } + + lb.RemoveServer(testutils.ParseURI(a.URL)) + + // Now, use the organic cookie response in our next requests. + req, err := http.NewRequest("GET", proxy.URL, nil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + c.Assert(resp.Cookies()[0].Name, Equals, "test") + c.Assert(resp.Cookies()[0].Value, Equals, b.URL) + + for i := 0; i < 10; i++ { + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "b") + } +} + +func (s *SSSuite) TestRemoveAllServers(c *C) { + a := testutils.NewResponder("a") + b := testutils.NewResponder("b") + + defer a.Close() + defer b.Close() + + fwd, err := forward.New() + c.Assert(err, IsNil) + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + lb, err := New(fwd, EnableStickySession(sticky)) + c.Assert(err, IsNil) + + lb.UpsertServer(testutils.ParseURI(a.URL)) + lb.UpsertServer(testutils.ParseURI(b.URL)) + + proxy := httptest.NewServer(lb) + defer proxy.Close() + + http_cli := &http.Client{} + + for i := 0; i < 10; i++ { + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "a") + } + + lb.RemoveServer(testutils.ParseURI(a.URL)) + lb.RemoveServer(testutils.ParseURI(b.URL)) + + // Now, use the organic cookie response in our next requests. + req, err := http.NewRequest("GET", proxy.URL, nil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusInternalServerError) +} + +func (s *SSSuite) TestBadCookieVal(c *C) { + a := testutils.NewResponder("a") + b := testutils.NewResponder("b") + + defer a.Close() + defer b.Close() + + fwd, err := forward.New() + c.Assert(err, IsNil) + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + lb, err := New(fwd, EnableStickySession(sticky)) + c.Assert(err, IsNil) + + lb.UpsertServer(testutils.ParseURI(a.URL)) + lb.UpsertServer(testutils.ParseURI(b.URL)) + + proxy := httptest.NewServer(lb) + defer proxy.Close() + + http_cli := &http.Client{} + + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + req.AddCookie(&http.Cookie{Name: "test", Value: "http://[::1]a"}) // error value from go's net/url tests. + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusInternalServerError) +} From 638a9a64b16dc6ef91e2e5de404d50ff9915ea3c Mon Sep 17 00:00:00 2001 From: Owen Marshall Date: Tue, 12 Apr 2016 22:22:25 -0400 Subject: [PATCH 3/4] fix misleading comment layer 7, that is... layer 8 is something different (https://en.wikipedia.org/wiki/Layer_8) --- roundrobin/stickysessions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/roundrobin/stickysessions.go b/roundrobin/stickysessions.go index 170d583e..8a458b87 100644 --- a/roundrobin/stickysessions.go +++ b/roundrobin/stickysessions.go @@ -1,4 +1,4 @@ -// package stickysession is a mixin for load balancers that implements layer 8 (http cookie) session affinity +// package stickysession is a mixin for load balancers that implements layer 7 (http cookie) session affinity package roundrobin import ( From c9eac1bbc76ef5e79fe3c1167bbb706e55b2b837 Mon Sep 17 00:00:00 2001 From: Owen Marshall Date: Wed, 25 May 2016 16:21:30 -0400 Subject: [PATCH 4/4] Add sticky session support to rebalancing rr --- roundrobin/rebalancer.go | 50 +++++++++++++++++++++++++++++------ roundrobin/rebalancer_test.go | 50 ++++++++++++++++++++++++++++++++++- 2 files changed, 91 insertions(+), 9 deletions(-) diff --git a/roundrobin/rebalancer.go b/roundrobin/rebalancer.go index f8a4179c..d9a8939a 100644 --- a/roundrobin/rebalancer.go +++ b/roundrobin/rebalancer.go @@ -7,9 +7,9 @@ import ( "sync" "time" + "github.com/mailgun/timetools" "github.com/vulcand/oxy/memmetrics" "github.com/vulcand/oxy/utils" - "github.com/mailgun/timetools" ) // RebalancerOption - functional option setter for rebalancer @@ -48,6 +48,9 @@ type Rebalancer struct { // creates new meters newMeter NewMeterFn + + // sticky session object + ss *StickySession } func RebalancerLogger(log utils.Logger) RebalancerOption { @@ -86,10 +89,18 @@ func RebalancerErrorHandler(h utils.ErrorHandler) RebalancerOption { } } +func RebalancerStickySession(ss *StickySession) RebalancerOption { + return func(r *Rebalancer) error { + r.ss = ss + return nil + } +} + func NewRebalancer(handler balancerHandler, opts ...RebalancerOption) (*Rebalancer, error) { rb := &Rebalancer{ mtx: &sync.Mutex{}, next: handler, + ss: nil, } for _, o := range opts { if err := o(rb); err != nil { @@ -134,18 +145,41 @@ func (rb *Rebalancer) Servers() []*url.URL { func (rb *Rebalancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { pw := &utils.ProxyWriter{W: w} start := rb.clock.UtcNow() - url, err := rb.next.NextServer() - if err != nil { - rb.errHandler.ServeHTTP(w, req, err) - return - } // make shallow copy of request before changing anything to avoid side effects newReq := *req - newReq.URL = url + stuck := false + + if rb.ss != nil { + cookie_url, present, err := rb.ss.GetBackend(&newReq, rb.Servers()) + + if err != nil { + rb.errHandler.ServeHTTP(w, req, err) + return + } + + if present { + newReq.URL = cookie_url + stuck = true + } + } + + if !stuck { + url, err := rb.next.NextServer() + if err != nil { + rb.errHandler.ServeHTTP(w, req, err) + return + } + + if rb.ss != nil { + rb.ss.StickBackend(url, &w) + } + + newReq.URL = url + } rb.next.Next().ServeHTTP(pw, &newReq) - rb.recordMetrics(url, pw.Code, rb.clock.UtcNow().Sub(start)) + rb.recordMetrics(newReq.URL, pw.Code, rb.clock.UtcNow().Sub(start)) rb.adjustWeights() } diff --git a/roundrobin/rebalancer_test.go b/roundrobin/rebalancer_test.go index fd05a175..65f68cd9 100644 --- a/roundrobin/rebalancer_test.go +++ b/roundrobin/rebalancer_test.go @@ -1,15 +1,16 @@ package roundrobin import ( + "io/ioutil" "net/http" "net/http/httptest" "os" "time" + "github.com/mailgun/timetools" "github.com/vulcand/oxy/forward" "github.com/vulcand/oxy/testutils" "github.com/vulcand/oxy/utils" - "github.com/mailgun/timetools" . "gopkg.in/check.v1" ) @@ -327,6 +328,53 @@ func (s *RBSuite) TestRebalancerLive(c *C) { c.Assert(rb.servers[2].curWeight, Equals, 1) } +func (s *RBSuite) TestRebalancerStickySession(c *C) { + a, b, x := testutils.NewResponder("a"), testutils.NewResponder("b"), testutils.NewResponder("x") + defer a.Close() + defer b.Close() + defer x.Close() + + sticky := NewStickySession("test") + c.Assert(sticky, NotNil) + + fwd, err := forward.New() + c.Assert(err, IsNil) + + lb, err := New(fwd) + c.Assert(err, IsNil) + + rb, err := NewRebalancer(lb, RebalancerStickySession(sticky)) + c.Assert(err, IsNil) + + rb.UpsertServer(testutils.ParseURI(a.URL)) + rb.UpsertServer(testutils.ParseURI(b.URL)) + rb.UpsertServer(testutils.ParseURI(x.URL)) + + proxy := httptest.NewServer(rb) + defer proxy.Close() + + http_cli := &http.Client{} + for i := 0; i < 10; i++ { + req, err := http.NewRequest("GET", proxy.URL, nil) + c.Assert(err, IsNil) + req.AddCookie(&http.Cookie{Name: "test", Value: a.URL}) + + resp, err := http_cli.Do(req) + c.Assert(err, IsNil) + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + + c.Assert(err, IsNil) + c.Assert(string(body), Equals, "a") + } + + c.Assert(rb.RemoveServer(testutils.ParseURI(a.URL)), IsNil) + c.Assert(seq(c, proxy.URL, 3), DeepEquals, []string{"b", "x", "b"}) + c.Assert(rb.RemoveServer(testutils.ParseURI(b.URL)), IsNil) + c.Assert(seq(c, proxy.URL, 3), DeepEquals, []string{"x", "x", "x"}) +} + type testMeter struct { rating float64 notReady bool