diff --git a/go/path_srv/internal/handlers/psdedupe.go b/go/path_srv/internal/handlers/psdedupe.go new file mode 100644 index 0000000000..11d7e38bca --- /dev/null +++ b/go/path_srv/internal/handlers/psdedupe.go @@ -0,0 +1,81 @@ +// Copyright 2018 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "context" + "fmt" + "net" + + "github.com/scionproto/scion/go/lib/common" + "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/dedupe" +) + +type segReq struct { + segReq *path_mgmt.SegReq + server net.Addr + id uint64 +} + +func (req *segReq) DedupeKey() string { + return fmt.Sprintf("%s %s", req.segReq, req.server) +} + +func (req *segReq) BroadcastKey() string { + return fmt.Sprintf("%s %s", req.segReq, req.server) +} + +type psDeduper struct { + msger infra.Messenger +} + +func (pd *psDeduper) segsRequestFunc(ctx context.Context, + request dedupe.Request) dedupe.Response { + + req := request.(*segReq) + segs, err := pd.msger.GetSegs(ctx, req.segReq, req.server, req.id) + if err != nil { + return dedupe.Response{Error: err} + } + return dedupe.Response{Data: segs} +} + +func NewDeduper(msger infra.Messenger) *dedupe.Deduper { + psd := &psDeduper{msger: msger} + return dedupe.New(psd.segsRequestFunc, 0, 0) +} + +func (h *segReqHandler) getSegsFromNetwork(ctx context.Context, + req *path_mgmt.SegReq, server net.Addr, id uint64) (*path_mgmt.SegReply, error) { + + responseC, cancelF := h.segsDeduper.Request(ctx, &segReq{ + segReq: req, + server: server, + id: id, + }) + defer cancelF() + select { + case response := <-responseC: + if response.Error != nil { + return nil, response.Error + } + return response.Data.(*path_mgmt.SegReply), nil + case <-ctx.Done(): + return nil, common.NewBasicError("Context done while waiting for Segs", + ctx.Err()) + } +} diff --git a/go/path_srv/internal/handlers/psdedupe_test.go b/go/path_srv/internal/handlers/psdedupe_test.go new file mode 100644 index 0000000000..d3c0753a51 --- /dev/null +++ b/go/path_srv/internal/handlers/psdedupe_test.go @@ -0,0 +1,74 @@ +// Copyright 2018 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package handlers + +import ( + "context" + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + . "github.com/smartystreets/goconvey/convey" + + "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" + "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/mock_infra" + "github.com/scionproto/scion/go/lib/log" + "github.com/scionproto/scion/go/lib/xtest" +) + +func TestDedupe(t *testing.T) { + Convey("getSegsFromNetwork should dedupe", t, func() { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + ctx, cancelF := context.WithTimeout(context.Background(), time.Second) + defer cancelF() + msger := mock_infra.NewMockMessenger(ctrl) + ireq := &infra.Request{ + Logger: log.Root(), + } + h := &segReqHandler{ + baseHandler: newBaseHandler(ireq, HandlerArgs{}), + segsDeduper: NewDeduper(msger), + } + reply := &path_mgmt.SegReply{} + msger.EXPECT().GetSegs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ *path_mgmt.SegReq, _ net.Addr, + _ uint64) (*path_mgmt.SegReply, error) { + + time.Sleep(20 * time.Millisecond) + return reply, nil + }, + ) + req := &path_mgmt.SegReq{ + RawSrcIA: xtest.MustParseIA("1-ff00:0:110").IAInt(), + RawDstIA: xtest.MustParseIA("1-ff00:0:211").IAInt(), + } + req2 := &path_mgmt.SegReq{ + RawSrcIA: req.RawSrcIA, + RawDstIA: req.RawDstIA, + } + Convey("Parallel", xtest.Parallel(func(sc *xtest.SC) { + r, err := h.getSegsFromNetwork(ctx, req, nil, 1) + sc.SoMsg("Should be no error", err, ShouldBeNil) + sc.SoMsg("Should return single reply", r, ShouldEqual, reply) + }, func(sc *xtest.SC) { + r, err := h.getSegsFromNetwork(ctx, req2, nil, 2) + sc.SoMsg("Should be no error", err, ShouldBeNil) + sc.SoMsg("Should return single reply", r, ShouldEqual, reply) + })) + }) +} diff --git a/go/path_srv/internal/handlers/segreq.go b/go/path_srv/internal/handlers/segreq.go index 4a83bf5e72..b6ac17ff84 100644 --- a/go/path_srv/internal/handlers/segreq.go +++ b/go/path_srv/internal/handlers/segreq.go @@ -25,6 +25,7 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/ctrl/seg" "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/dedupe" "github.com/scionproto/scion/go/lib/infra/messenger" "github.com/scionproto/scion/go/lib/pathdb/query" "github.com/scionproto/scion/go/lib/revcache" @@ -36,7 +37,8 @@ import ( type segReqHandler struct { *baseHandler - localIA addr.IA + localIA addr.IA + segsDeduper *dedupe.Deduper } func (h *segReqHandler) sendEmptySegReply(ctx context.Context, @@ -118,7 +120,7 @@ func (h *segReqHandler) fetchAndSaveSegs(ctx context.Context, msger infra.Messen queryTime := time.Now() r := &path_mgmt.SegReq{RawSrcIA: src.IAInt(), RawDstIA: dst.IAInt()} - segs, err := msger.GetSegs(ctx, r, cPSAddr, messenger.NextId()) + segs, err := h.getSegsFromNetwork(ctx, r, cPSAddr, messenger.NextId()) if err != nil { return err } diff --git a/go/path_srv/internal/handlers/segreqcore.go b/go/path_srv/internal/handlers/segreqcore.go index 8d2338d4a1..d98b4ca630 100644 --- a/go/path_srv/internal/handlers/segreqcore.go +++ b/go/path_srv/internal/handlers/segreqcore.go @@ -24,6 +24,7 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/ctrl/seg" "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/dedupe" "github.com/scionproto/scion/go/lib/pathdb/query" "github.com/scionproto/scion/go/path_srv/internal/addrutil" "github.com/scionproto/scion/go/proto" @@ -33,12 +34,13 @@ type segReqCoreHandler struct { segReqHandler } -func NewSegReqCoreHandler(args HandlerArgs) infra.Handler { +func NewSegReqCoreHandler(args HandlerArgs, segsDeduper *dedupe.Deduper) infra.Handler { f := func(r *infra.Request) { handler := &segReqCoreHandler{ segReqHandler: segReqHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, + segsDeduper: segsDeduper, }, } handler.Handle() diff --git a/go/path_srv/internal/handlers/segreqnoncore.go b/go/path_srv/internal/handlers/segreqnoncore.go index 9bc5291cbd..4560c0c78c 100644 --- a/go/path_srv/internal/handlers/segreqnoncore.go +++ b/go/path_srv/internal/handlers/segreqnoncore.go @@ -26,6 +26,7 @@ import ( "github.com/scionproto/scion/go/lib/ctrl/path_mgmt" "github.com/scionproto/scion/go/lib/ctrl/seg" "github.com/scionproto/scion/go/lib/infra" + "github.com/scionproto/scion/go/lib/infra/dedupe" "github.com/scionproto/scion/go/lib/pathdb/query" "github.com/scionproto/scion/go/path_srv/internal/addrutil" "github.com/scionproto/scion/go/proto" @@ -35,12 +36,13 @@ type segReqNonCoreHandler struct { segReqHandler } -func NewSegReqNonCoreHandler(args HandlerArgs) infra.Handler { +func NewSegReqNonCoreHandler(args HandlerArgs, segsDeduper *dedupe.Deduper) infra.Handler { f := func(r *infra.Request) { handler := &segReqNonCoreHandler{ segReqHandler: segReqHandler{ baseHandler: newBaseHandler(r, args), localIA: args.IA, + segsDeduper: segsDeduper, }, } handler.Handle() diff --git a/go/path_srv/main.go b/go/path_srv/main.go index 71c6a4c8f8..a439bb8341 100644 --- a/go/path_srv/main.go +++ b/go/path_srv/main.go @@ -130,10 +130,11 @@ func realMain() int { } core := topo.Core var segReqHandler infra.Handler + deduper := handlers.NewDeduper(msger) if core { - segReqHandler = handlers.NewSegReqCoreHandler(args) + segReqHandler = handlers.NewSegReqCoreHandler(args, deduper) } else { - segReqHandler = handlers.NewSegReqNonCoreHandler(args) + segReqHandler = handlers.NewSegReqNonCoreHandler(args, deduper) } msger.AddHandler(infra.SegRequest, segReqHandler) msger.AddHandler(infra.SegReg, handlers.NewSegRegHandler(args))