Skip to content

Commit

Permalink
PS: implement request deduplication
Browse files Browse the repository at this point in the history
Fixes #1858
  • Loading branch information
lukedirtwalker committed Oct 23, 2018
1 parent cd32945 commit a1a18e4
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 6 deletions.
81 changes: 81 additions & 0 deletions go/path_srv/internal/handlers/psdedupe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2018 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"context"
"fmt"
"net"

"github.com/scionproto/scion/go/lib/common"
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/dedupe"
)

type segReq struct {
segReq *path_mgmt.SegReq
server net.Addr
id uint64
}

func (req *segReq) DedupeKey() string {
return fmt.Sprintf("%s %s", req.segReq, req.server)
}

func (req *segReq) BroadcastKey() string {
return fmt.Sprintf("%s %s", req.segReq, req.server)
}

type psDeduper struct {
msger infra.Messenger
}

func (pd *psDeduper) segsRequestFunc(ctx context.Context,
request dedupe.Request) dedupe.Response {

req := request.(*segReq)
segs, err := pd.msger.GetSegs(ctx, req.segReq, req.server, req.id)
if err != nil {
return dedupe.Response{Error: err}
}
return dedupe.Response{Data: segs}
}

func NewDeduper(msger infra.Messenger) *dedupe.Deduper {
psd := &psDeduper{msger: msger}
return dedupe.New(psd.segsRequestFunc, 0, 0)
}

func (h *segReqHandler) getSegsFromNetwork(ctx context.Context,
req *path_mgmt.SegReq, server net.Addr, id uint64) (*path_mgmt.SegReply, error) {

responseC, cancelF := h.segsDeduper.Request(ctx, &segReq{
segReq: req,
server: server,
id: id,
})
defer cancelF()
select {
case response := <-responseC:
if response.Error != nil {
return nil, response.Error
}
return response.Data.(*path_mgmt.SegReply), nil
case <-ctx.Done():
return nil, common.NewBasicError("Context done while waiting for Segs",
ctx.Err())
}
}
74 changes: 74 additions & 0 deletions go/path_srv/internal/handlers/psdedupe_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2018 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handlers

import (
"context"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
. "github.com/smartystreets/goconvey/convey"

"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/mock_infra"
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/xtest"
)

func TestDedupe(t *testing.T) {
Convey("getSegsFromNetwork should dedupe", t, func() {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx, cancelF := context.WithTimeout(context.Background(), time.Second)
defer cancelF()
msger := mock_infra.NewMockMessenger(ctrl)
ireq := &infra.Request{
Logger: log.Root(),
}
h := &segReqHandler{
baseHandler: newBaseHandler(ireq, HandlerArgs{}),
segsDeduper: NewDeduper(msger),
}
reply := &path_mgmt.SegReply{}
msger.EXPECT().GetSegs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ *path_mgmt.SegReq, _ net.Addr,
_ uint64) (*path_mgmt.SegReply, error) {

time.Sleep(20 * time.Millisecond)
return reply, nil
},
)
req := &path_mgmt.SegReq{
RawSrcIA: xtest.MustParseIA("1-ff00:0:110").IAInt(),
RawDstIA: xtest.MustParseIA("1-ff00:0:211").IAInt(),
}
req2 := &path_mgmt.SegReq{
RawSrcIA: req.RawSrcIA,
RawDstIA: req.RawDstIA,
}
Convey("Parallel", xtest.Parallel(func(sc *xtest.SC) {
r, err := h.getSegsFromNetwork(ctx, req, nil, 1)
sc.SoMsg("Should be no error", err, ShouldBeNil)
sc.SoMsg("Should return single reply", r, ShouldEqual, reply)
}, func(sc *xtest.SC) {
r, err := h.getSegsFromNetwork(ctx, req2, nil, 2)
sc.SoMsg("Should be no error", err, ShouldBeNil)
sc.SoMsg("Should return single reply", r, ShouldEqual, reply)
}))
})
}
6 changes: 4 additions & 2 deletions go/path_srv/internal/handlers/segreq.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/ctrl/seg"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/dedupe"
"github.com/scionproto/scion/go/lib/infra/messenger"
"github.com/scionproto/scion/go/lib/pathdb/query"
"github.com/scionproto/scion/go/lib/revcache"
Expand All @@ -36,7 +37,8 @@ import (

type segReqHandler struct {
*baseHandler
localIA addr.IA
localIA addr.IA
segsDeduper *dedupe.Deduper
}

func (h *segReqHandler) sendEmptySegReply(ctx context.Context,
Expand Down Expand Up @@ -118,7 +120,7 @@ func (h *segReqHandler) fetchAndSaveSegs(ctx context.Context, msger infra.Messen

queryTime := time.Now()
r := &path_mgmt.SegReq{RawSrcIA: src.IAInt(), RawDstIA: dst.IAInt()}
segs, err := msger.GetSegs(ctx, r, cPSAddr, messenger.NextId())
segs, err := h.getSegsFromNetwork(ctx, r, cPSAddr, messenger.NextId())
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion go/path_srv/internal/handlers/segreqcore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/ctrl/seg"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/dedupe"
"github.com/scionproto/scion/go/lib/pathdb/query"
"github.com/scionproto/scion/go/path_srv/internal/addrutil"
"github.com/scionproto/scion/go/proto"
Expand All @@ -33,12 +34,13 @@ type segReqCoreHandler struct {
segReqHandler
}

func NewSegReqCoreHandler(args HandlerArgs) infra.Handler {
func NewSegReqCoreHandler(args HandlerArgs, segsDeduper *dedupe.Deduper) infra.Handler {
f := func(r *infra.Request) {
handler := &segReqCoreHandler{
segReqHandler: segReqHandler{
baseHandler: newBaseHandler(r, args),
localIA: args.IA,
segsDeduper: segsDeduper,
},
}
handler.Handle()
Expand Down
4 changes: 3 additions & 1 deletion go/path_srv/internal/handlers/segreqnoncore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/ctrl/seg"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/dedupe"
"github.com/scionproto/scion/go/lib/pathdb/query"
"github.com/scionproto/scion/go/path_srv/internal/addrutil"
"github.com/scionproto/scion/go/proto"
Expand All @@ -35,12 +36,13 @@ type segReqNonCoreHandler struct {
segReqHandler
}

func NewSegReqNonCoreHandler(args HandlerArgs) infra.Handler {
func NewSegReqNonCoreHandler(args HandlerArgs, segsDeduper *dedupe.Deduper) infra.Handler {
f := func(r *infra.Request) {
handler := &segReqNonCoreHandler{
segReqHandler: segReqHandler{
baseHandler: newBaseHandler(r, args),
localIA: args.IA,
segsDeduper: segsDeduper,
},
}
handler.Handle()
Expand Down
5 changes: 3 additions & 2 deletions go/path_srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ func realMain() int {
}
core := topo.Core
var segReqHandler infra.Handler
deduper := handlers.NewDeduper(msger)
if core {
segReqHandler = handlers.NewSegReqCoreHandler(args)
segReqHandler = handlers.NewSegReqCoreHandler(args, deduper)
} else {
segReqHandler = handlers.NewSegReqNonCoreHandler(args)
segReqHandler = handlers.NewSegReqNonCoreHandler(args, deduper)
}
msger.AddHandler(infra.SegRequest, segReqHandler)
msger.AddHandler(infra.SegReg, handlers.NewSegRegHandler(args))
Expand Down

0 comments on commit a1a18e4

Please sign in to comment.