Skip to content

Commit

Permalink
PS: Add new lookup logic (not enabled yet) (#2978)
Browse files Browse the repository at this point in the history
Add all the bits needed to use the new lookup logic via the segfetcher module in the PS.

Contributes #2454
  • Loading branch information
lukedirtwalker authored Aug 9, 2019
1 parent c9a8c6b commit e3f6385
Show file tree
Hide file tree
Showing 22 changed files with 1,513 additions and 13 deletions.
8 changes: 4 additions & 4 deletions go/lib/infra/modules/segfetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type FetcherConfig struct {
PathDB pathdb.PathDB
// RevCache is the revocation cache to use.
RevCache revcache.RevCache
// Messenger is the messenger to use.
Messenger infra.Messenger
// RequestAPI is the request api to use.
RequestAPI RequestAPI
// DstProvider provides destinations to fetch segments from
DstProvider DstProvider
// Validator is used to validate requests.
Expand All @@ -62,7 +62,7 @@ func (cfg FetcherConfig) New() *Fetcher {
Validator: cfg.Validator,
Splitter: cfg.Splitter,
Resolver: NewResolver(cfg.PathDB),
Requester: &DefaultRequester{API: cfg.Messenger, DstProvider: cfg.DstProvider},
Requester: &DefaultRequester{API: cfg.RequestAPI, DstProvider: cfg.DstProvider},
ReplyHandler: &SegReplyHandler{
Verifier: &SegVerifier{Verifier: cfg.VerificationFactory.NewVerifier()},
Storage: &DefaultStorage{PathDB: cfg.PathDB, RevCache: cfg.RevCache},
Expand Down Expand Up @@ -90,7 +90,7 @@ type Fetcher struct {
// cache the segments are fetched from the remote server.
func (f *Fetcher) FetchSegs(ctx context.Context, req Request) (Segments, error) {
if f.Validator != nil {
if err := f.Validator.Validate(req); err != nil {
if err := f.Validator.Validate(ctx, req); err != nil {
return Segments{}, err
}
}
Expand Down
8 changes: 4 additions & 4 deletions go/lib/infra/modules/segfetcher/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,21 @@ func TestFetcher(t *testing.T) {
}{
"Invalid request": {
PrepareFetcher: func(f *TestableFetcher) {
f.Validator.EXPECT().Validate(gomock.Any()).Return(testErr)
f.Validator.EXPECT().Validate(gomock.Any(), gomock.Any()).Return(testErr)
},
ErrorAssertion: require.Error,
},
"Splitter error": {
PrepareFetcher: func(f *TestableFetcher) {
f.Validator.EXPECT().Validate(gomock.Any())
f.Validator.EXPECT().Validate(gomock.Any(), gomock.Any())
f.Splitter.EXPECT().Split(gomock.Any(), gomock.Any()).
Return(segfetcher.RequestSet{}, testErr)
},
ErrorAssertion: require.Error,
},
"Resolver error": {
PrepareFetcher: func(f *TestableFetcher) {
f.Validator.EXPECT().Validate(gomock.Any())
f.Validator.EXPECT().Validate(gomock.Any(), gomock.Any())
f.Splitter.EXPECT().Split(gomock.Any(), gomock.Any())
f.Resolver.EXPECT().Resolve(gomock.Any(), gomock.Any(), gomock.Any()).
Return(segfetcher.Segments{}, segfetcher.RequestSet{}, testErr)
Expand All @@ -101,7 +101,7 @@ func TestFetcher(t *testing.T) {
},
"Immediately resolved": {
PrepareFetcher: func(f *TestableFetcher) {
f.Validator.EXPECT().Validate(gomock.Any())
f.Validator.EXPECT().Validate(gomock.Any(), gomock.Any())
reqSet := segfetcher.RequestSet{
Up: segfetcher.Request{Src: non_core_111, Dst: core_130},
}
Expand Down
8 changes: 4 additions & 4 deletions go/lib/infra/modules/segfetcher/mock_segfetcher/segfetcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion go/lib/infra/modules/segfetcher/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

package segfetcher

import "context"

// Validator validates a request.
type Validator interface {
// Validate should return an error if the given request is not valid.
Validate(r Request) error
Validate(ctx context.Context, r Request) error
}
1 change: 1 addition & 0 deletions go/path_srv/internal/handlers/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"//go/lib/infra:go_default_library",
"//go/lib/infra/dedupe:go_default_library",
"//go/lib/infra/messenger:go_default_library",
"//go/lib/infra/modules/segfetcher:go_default_library",
"//go/lib/infra/modules/segverifier:go_default_library",
"//go/lib/log:go_default_library",
"//go/lib/pathdb:go_default_library",
Expand Down
2 changes: 2 additions & 0 deletions go/path_srv/internal/handlers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/scionproto/scion/go/lib/ctrl/path_mgmt"
"github.com/scionproto/scion/go/lib/ctrl/seg"
"github.com/scionproto/scion/go/lib/infra"
"github.com/scionproto/scion/go/lib/infra/modules/segfetcher"
"github.com/scionproto/scion/go/lib/infra/modules/segverifier"
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/pathdb"
Expand All @@ -52,6 +53,7 @@ type HandlerArgs struct {
QueryInterval time.Duration
IA addr.IA
TopoProvider topology.Provider
SegRequestAPI segfetcher.RequestAPI
}

type baseHandler struct {
Expand Down
62 changes: 62 additions & 0 deletions go/path_srv/internal/segreq/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = [
"db.go",
"doc.go",
"handler.go",
"helpers.go",
"provider.go",
"splitter.go",
"validator.go",
],
importpath = "github.com/scionproto/scion/go/path_srv/internal/segreq",
visibility = ["//go/path_srv:__subpackages__"],
deps = [
"//go/lib/addr:go_default_library",
"//go/lib/common:go_default_library",
"//go/lib/ctrl/path_mgmt:go_default_library",
"//go/lib/ctrl/seg:go_default_library",
"//go/lib/infra:go_default_library",
"//go/lib/infra/modules/segfetcher:go_default_library",
"//go/lib/log:go_default_library",
"//go/lib/pathdb:go_default_library",
"//go/lib/pathdb/query:go_default_library",
"//go/lib/pathpol:go_default_library",
"//go/lib/revcache:go_default_library",
"//go/lib/snet/addrutil:go_default_library",
"//go/lib/topology:go_default_library",
"//go/path_srv/internal/handlers:go_default_library",
"//go/path_srv/internal/segutil:go_default_library",
"//go/proto:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = [
"db_test.go",
"helpers_test.go",
"provider_test.go",
"splitter_test.go",
"validator_test.go",
],
embed = [":go_default_library"],
deps = [
"//go/lib/addr:go_default_library",
"//go/lib/ctrl/path_mgmt:go_default_library",
"//go/lib/ctrl/seg:go_default_library",
"//go/lib/infra/mock_infra:go_default_library",
"//go/lib/infra/modules/segfetcher:go_default_library",
"//go/lib/pathdb/mock_pathdb:go_default_library",
"//go/lib/pathdb/query:go_default_library",
"//go/lib/revcache:go_default_library",
"//go/lib/revcache/mock_revcache:go_default_library",
"//go/lib/xtest:go_default_library",
"//go/path_srv/internal/segreq/mock_segreq:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
"@com_github_stretchr_testify//require:go_default_library",
],
)
133 changes: 133 additions & 0 deletions go/path_srv/internal/segreq/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2019 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package segreq

import (
"context"
"time"

"github.com/scionproto/scion/go/lib/addr"
"github.com/scionproto/scion/go/lib/pathdb"
"github.com/scionproto/scion/go/lib/pathdb/query"
"github.com/scionproto/scion/go/lib/pathpol"
"github.com/scionproto/scion/go/proto"
)

// LocalInfo indicates whether something is always local.
type LocalInfo interface {
IsSegLocal(ctx context.Context, src, dst addr.IA) (bool, error)
IsParamsLocal(*query.Params) bool
}

// PathDB is a wrapper around the path db that handles retries and changes
// GetNextQuery behavior for usage in segfetcher.
type PathDB struct {
pathdb.PathDB
LocalInfo LocalInfo
RetrySleep time.Duration
}

// Get implements the path db's get function. It retries the underlying
// connection for local segments. For example a non-core path server will retry
// for local up segments since there is a chance it will receive them from the
// beacon server. A core path server will retry on core segments since there is
// a chance it receives them from the beacon server.
func (db *PathDB) Get(ctx context.Context, params *query.Params) (query.Results, error) {
res, err := db.PathDB.Get(ctx, params)
if err == nil && db.LocalInfo.IsParamsLocal(params) {
for err == nil && len(query.Results(res).Segs()) == 0 {
select {
case <-ctx.Done():
return res, ctx.Err()
case <-time.After(db.RetrySleep):
}
res, err = db.PathDB.Get(ctx, params)
}
}
return res, err
}

func (db *PathDB) GetNextQuery(ctx context.Context, src, dst addr.IA,
policy *pathpol.Policy) (time.Time, error) {
if local, err := db.LocalInfo.IsSegLocal(ctx, src, dst); err != nil {
return time.Time{}, err
} else if local {
return time.Now().Add(24 * time.Hour), nil
}
return db.PathDB.GetNextQuery(ctx, src, dst, policy)
}

// CoreLocalInfo implements local info for core PSes.
type CoreLocalInfo struct {
CoreChecker CoreChecker
LocalIA addr.IA
}

// IsSegLocal returns whether the segments described by src and dst would be a
// core segments or a local down segment.
func (i *CoreLocalInfo) IsSegLocal(ctx context.Context, src, dst addr.IA) (bool, error) {
// All local core and down segments.
if dst.I == i.LocalIA.I {
return true, nil
}
// All core segments
isCore, err := i.CoreChecker.IsCore(ctx, dst)
if err != nil {
return false, err
}
return isCore, nil
}

// IsParamsLocal returns whether params is a core segment request.
func (i *CoreLocalInfo) IsParamsLocal(params *query.Params) bool {
if len(params.SegTypes) != 1 {
return false
}
if params.SegTypes[0] == proto.PathSegType_core {
return true
}
if params.SegTypes[0] == proto.PathSegType_down {
for _, ia := range params.StartsAt {
if ia.I != i.LocalIA.I {
return false
}
}
for _, ia := range params.EndsAt {
if ia.I != i.LocalIA.I {
return false
}
}
return true
}
return false
}

// NonCoreLocalInfo is the local info for non core PSes.
type NonCoreLocalInfo struct {
LocalIA addr.IA
}

// IsSegLocal checks if the segment described by src and dst is an up segment
// to the local core.
func (i *NonCoreLocalInfo) IsSegLocal(ctx context.Context, src, dst addr.IA) (bool, error) {
// The validator should make sure that if we are at the source it can only
// be an up segment.
return i.LocalIA.Equal(src), nil
}

// IsParamsLocal returns whether params is a up segments request.
func (i *NonCoreLocalInfo) IsParamsLocal(params *query.Params) bool {
return len(params.SegTypes) == 1 && params.SegTypes[0] == proto.PathSegType_up
}
Loading

0 comments on commit e3f6385

Please sign in to comment.