Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infra: Write static topology updates to file system #2430

Merged
merged 6 commits into from
Feb 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions acceptance/discovery_infra_fetches_static_acceptance/test
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

# This test checks that the infra services fetch the static topology
# from the discovery service.
# from the discovery service, and that it writes the update to disk.

PROGRAM=`basename "$0"`
COMMAND="$1"
Expand All @@ -14,9 +14,12 @@ test_setup() {
set -e
base_setup

for cfg in gen/ISD1/AS$AS_FILE/*/{cs,ps}config.toml; do
set_log_lvl "$cfg"
set_interval "$cfg" "static"
for elem in gen/ISD1/AS$AS_FILE/{cs,ps}*; do
for cfg in $elem/*.toml; do
set_log_lvl "$cfg"
set_interval "$cfg" "static"
sed -i -e "/\[discovery.static]/a Filename = \"/share/cache/${elem##*/}-topo.json\"" $cfg
done
done

base_start_scion
Expand All @@ -27,14 +30,25 @@ test_run() {
# Start serving static topology.
jq ".BorderRouters[].InternalAddrs.IPv4.PublicOverlay = {Addr: \"127.42.42.42\", OverlayPort: 39999} | .Timestamp = $( date +%s) | .TTL = 3" $TOPO | sponge $STATIC_FULL
sleep 6
# Check that the mock ds serves the file
check_file "static"
# Check that the logs contain setting and writing the topo.
check_logs "ps$IA_FILE-1"
check_logs "cs$IA_FILE-1"
# Check that the written file does not differ from the served file.
check_diff "ps$IA_FILE-1"
check_diff "cs$IA_FILE-1"
}

check_logs() {
grep -q "\[discovery\] Set topology .* Mode=static" "logs/$1.log" || \
{ echo "Setting static topology not found in logs. id=$1"; return 1; }
grep -q "\[discovery\] Topology written to filesystem .* Mode=static" "logs/$1.log" || \
{ echo "Writing static topology not found in logs. id=$1"; return 1; }
}

check_diff () {
diff -q $STATIC_FULL gen-cache/$1-topo.json
}

shift
Expand Down
2 changes: 2 additions & 0 deletions go/cert_srv/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestSampleCorrect(t *testing.T) {
cfg.Discovery.Dynamic.Https = true
cfg.Discovery.Static.Enable = true
cfg.Discovery.Static.Https = true
cfg.Discovery.Static.Filename = "topology.json"
_, err := toml.Decode(Sample, &cfg)
SoMsg("err", err, ShouldBeNil)

Expand All @@ -53,6 +54,7 @@ func TestSampleCorrect(t *testing.T) {
SoMsg("Discovery.Static.Timeout correct", cfg.Discovery.Static.Timeout.Duration,
ShouldEqual, idiscovery.DefaultFetchTimeout)
SoMsg("Discovery.Static.Https correct", cfg.Discovery.Static.Https, ShouldBeFalse)
SoMsg("Discovery.Static.Filename correct", cfg.Discovery.Static.Filename, ShouldBeBlank)
SoMsg("Discovery.Dynamic.Enable correct", cfg.Discovery.Dynamic.Enable, ShouldBeFalse)
SoMsg("Discovery.Dynamic.Interval correct", cfg.Discovery.Dynamic.Interval.Duration,
ShouldEqual, idiscovery.DefaultDynamicFetchInterval)
Expand Down
4 changes: 4 additions & 0 deletions go/cert_srv/internal/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ const Sample = `[general]
# Require https connection. (default false)
Https = false

# Filename where the updated static topologies are written. In case of the
# empty string, the updated topologies are not written. (default "")
Filename = ""

[discovery.dynamic]
# Enable periodic fetching of the dynamic topology. (default false)
Enable = false
Expand Down
2 changes: 1 addition & 1 deletion go/examples/discovery_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func realMain() int {
Update: func(topo *topology.Topo) {
log.Info("Fetched new topology", "ia", topo.ISD_AS, "ts", topo.Timestamp)
},
Raw: func(raw common.RawBytes) {
Raw: func(raw common.RawBytes, _ *topology.Topo) {
writeOnce.Do(func() {
fmt.Println(string(raw))
if *out == "" {
Expand Down
6 changes: 3 additions & 3 deletions go/lib/discovery/topofetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var _ discovery.Fetcher = (*Fetcher)(nil)
// Callbacks are used to inform the client. The functions are called when
// an associated event occurs. If the function is nil, it is ignored.
type Callbacks struct {
// Raw is called with the raw body from the discovery service response.
Raw func(common.RawBytes)
// Raw is called with the raw body from the discovery service response and the parsed topology.
Raw func(common.RawBytes, *topology.Topo)
// Update is called with the parsed topology from the discovery service response.
Update func(*topology.Topo)
// Error is called with any error that occurs.
Expand Down Expand Up @@ -102,7 +102,7 @@ func (f *Fetcher) run(ctx context.Context) error {
}
// Notify the client.
if f.Callbacks.Raw != nil {
f.Callbacks.Raw(raw)
f.Callbacks.Raw(raw, topo)
}
if f.Callbacks.Update != nil {
f.Callbacks.Update(topo)
Expand Down
3 changes: 3 additions & 0 deletions go/lib/infra/modules/idiscovery/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func (c *Config) InitDefaults() {

type StaticConfig struct {
FetchConfig
// Filename indicates the file that the static topology is written to on updates.
// The empty string indicates that the static topology is not written.
Filename string
}

func (s *StaticConfig) InitDefaults() {
Expand Down
72 changes: 48 additions & 24 deletions go/lib/infra/modules/idiscovery/idiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
// By default changes to the semi-mutable section of static topologies is
// not allowed. It can be enabled by providing a custom topo handler.
//
// The periodic.Runner for the static topology can be instructed to
// write updated versions to the file system. To enable this, set
// the filename in StaticConfig.
//
// A periodic.Task with a customized TopoHandler can be created with
// NewFetcher, when the client package requires more control.
package idiscovery
Expand All @@ -37,6 +41,7 @@ import (
"github.com/scionproto/scion/go/lib/log"
"github.com/scionproto/scion/go/lib/periodic"
"github.com/scionproto/scion/go/lib/topology"
"github.com/scionproto/scion/go/lib/util"
)

// TopoHandler handles a topology fetched from the discovery service, and
Expand Down Expand Up @@ -93,29 +98,31 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers,
var err error
r := Runners{}
if cfg.Static.Enable {
r.Static, err = startPeriodic(
r.Static, err = startPeriodicFetcher(
cfg.Static.FetchConfig,
handlers.static(),
discovery.FetchParams{
Mode: discovery.Static,
Https: cfg.Static.Https,
File: file,
},
cfg.Static.Filename,
client,
)
if err != nil {
return Runners{}, err
}
}
if cfg.Dynamic.Enable {
r.Dynamic, err = startPeriodic(
r.Dynamic, err = startPeriodicFetcher(
cfg.Dynamic,
handlers.dynamic(),
discovery.FetchParams{
Mode: discovery.Dynamic,
Https: cfg.Dynamic.Https,
File: file,
},
"",
client,
)
if err != nil {
Expand All @@ -128,6 +135,19 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers,
return r, nil
}

// startPeriodicFetcher starts a runner that periodically fetches the topology.
func startPeriodicFetcher(cfg FetchConfig, handler TopoHandler, params discovery.FetchParams,
filename string, client *http.Client) (*periodic.Runner, error) {

fetcher, err := NewFetcher(handler, params, filename, client)
if err != nil {
return nil, err
}
runner := periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration),
cfg.Timeout.Duration)
return runner, nil
}

// Stop stops all runners.
func (r *Runners) Stop() {
if r.Static != nil {
Expand All @@ -154,45 +174,35 @@ func (r *Runners) Kill() {
}
}

// startPeriodic starts a runner that periodically fetches the topology.
func startPeriodic(cfg FetchConfig, handler TopoHandler,
params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) {

fetcher, err := NewFetcher(handler, params, client)
if err != nil {
return nil, err
}
runner := periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration),
cfg.Timeout.Duration)
return runner, nil
}

// task is a periodic.Task that fetches the topology from the discovery service.
type task struct {
log.Logger
handler TopoHandler
fetcher *topofetcher.Fetcher
handler TopoHandler
fetcher *topofetcher.Fetcher
filename string
}

// NewFetcher creates a periodic.Task that fetches the topology from the discovery
// service and calls the provided handler on the received topology.
// service and calls the provided handler on the received topology. If the handler
// indicates an update, and filename is set, the topology is written.
func NewFetcher(handler TopoHandler, params discovery.FetchParams,
client *http.Client) (*task, error) {
filename string, client *http.Client) (*task, error) {

if handler == nil {
return nil, common.NewBasicError("handler must not be nil", nil)
}
t := &task{
Logger: log.New("Module", "Discovery", "Mode", params.Mode),
handler: handler,
Logger: log.New("Module", "Discovery", "Mode", params.Mode),
handler: handler,
filename: filename,
}
var err error
t.fetcher, err = topofetcher.New(
itopo.Get().DS,
params,
topofetcher.Callbacks{
Error: t.handleErr,
Update: t.handleTopo,
Error: t.handleErr,
Raw: t.handleRaw,
},
client,
)
Expand All @@ -214,11 +224,25 @@ func (t *task) handleErr(err error) {
t.Error("[discovery] Unable to fetch topology", "err", err)
}

func (t *task) handleTopo(topo *topology.Topo) {
func (t *task) handleRaw(raw common.RawBytes, topo *topology.Topo) {
updated, err := t.callHandler(topo)
if err != nil || t.filename == "" || !updated {
return
}
if err := util.WriteFile(t.filename, raw, 0644); err != nil {
t.Error("[discovery] Unable to write new topology to filesystem", "err", err)
return
}
t.Trace("[discovery] Topology written to filesystem",
"file", t.filename, "params", t.fetcher.Params)
}

func (t *task) callHandler(topo *topology.Topo) (bool, error) {
updated, err := t.handler(topo)
if err != nil {
t.Error("[discovery] Unable to handle topology", "err", err)
} else if updated {
t.Trace("[discovery] Set topology", "params", t.fetcher.Params)
}
return updated, err
}
48 changes: 48 additions & 0 deletions go/lib/util/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2019 Anapaya Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package util

import (
"io/ioutil"
"os"
"path"
)

// WriteFile writes data to the file. It first writes to a temporary file
// in the same directory and the renames that file to filename.
func WriteFile(filename string, data []byte, perm os.FileMode) error {
dir, file := path.Split(filename)
tmp, err := ioutil.TempFile(dir, file)
if err != nil {
return err
}
defer os.Remove(tmp.Name())
if _, err := tmp.Write(data); err != nil {
return err
}
if err := tmp.Sync(); err != nil {
return err
}
if err := tmp.Chmod(perm); err != nil {
return err
}
if err := tmp.Close(); err != nil {
return err
}
if err := os.Rename(tmp.Name(), filename); err != nil {
return err
}
return nil
}
4 changes: 4 additions & 0 deletions go/path_srv/internal/config/sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ const Sample = `[general]
# Require https connection. (default false)
Https = false

# Filename where the updated static topologies are written. In case of the
# empty string, the updated topologies are not written. (default "")
Filename = ""

[discovery.dynamic]
# Enable periodic fetching of the dynamic topology. (default false)
Enable = false
Expand Down
2 changes: 2 additions & 0 deletions go/path_srv/internal/config/sample_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func TestSampleCorrect(t *testing.T) {
cfg.Discovery.Dynamic.Https = true
cfg.Discovery.Static.Enable = true
cfg.Discovery.Static.Https = true
cfg.Discovery.Static.Filename = "topology.json"
_, err := toml.Decode(Sample, &cfg)
SoMsg("err", err, ShouldBeNil)

Expand All @@ -52,6 +53,7 @@ func TestSampleCorrect(t *testing.T) {
SoMsg("Discovery.Static.Timeout correct", cfg.Discovery.Static.Timeout.Duration,
ShouldEqual, idiscovery.DefaultFetchTimeout)
SoMsg("Discovery.Static.Https correct", cfg.Discovery.Static.Https, ShouldBeFalse)
SoMsg("Discovery.Static.Filename correct", cfg.Discovery.Static.Filename, ShouldBeBlank)
SoMsg("Discovery.Dynamic.Enable correct", cfg.Discovery.Dynamic.Enable, ShouldBeFalse)
SoMsg("Discovery.Dynamic.Interval correct", cfg.Discovery.Dynamic.Interval.Duration,
ShouldEqual, idiscovery.DefaultDynamicFetchInterval)
Expand Down