From 8aa8d8acca8cbc079f002cd330af3a67468aa873 Mon Sep 17 00:00:00 2001 From: roos Date: Thu, 7 Feb 2019 16:21:48 +0100 Subject: [PATCH 1/6] discovery: Add parsed topo to raw callback Call raw callback with raw bytes and the parsed topology to give the caller access without reparsing. --- go/examples/discovery_client/client.go | 2 +- go/lib/discovery/topofetcher/fetcher.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/examples/discovery_client/client.go b/go/examples/discovery_client/client.go index e8de1fb181..8213b4225f 100644 --- a/go/examples/discovery_client/client.go +++ b/go/examples/discovery_client/client.go @@ -86,7 +86,7 @@ func realMain() int { Update: func(topo *topology.Topo) { log.Info("Fetched new topology", "ia", topo.ISD_AS, "ts", topo.Timestamp) }, - Raw: func(raw common.RawBytes) { + Raw: func(raw common.RawBytes, _ *topology.Topo) { writeOnce.Do(func() { fmt.Println(string(raw)) if *out == "" { diff --git a/go/lib/discovery/topofetcher/fetcher.go b/go/lib/discovery/topofetcher/fetcher.go index 3036c30da6..e096e7d664 100644 --- a/go/lib/discovery/topofetcher/fetcher.go +++ b/go/lib/discovery/topofetcher/fetcher.go @@ -29,8 +29,8 @@ var _ discovery.Fetcher = (*Fetcher)(nil) // Callbacks are used to inform the client. The functions are called when // an associated event occurs. If the function is nil, it is ignored. type Callbacks struct { - // Raw is called with the raw body from the discovery service response. - Raw func(common.RawBytes) + // Raw is called with the raw body from the discovery service response and the parsed topology. + Raw func(common.RawBytes, *topology.Topo) // Update is called with the parsed topology from the discovery service response. Update func(*topology.Topo) // Error is called with any error that occurs. @@ -102,7 +102,7 @@ func (f *Fetcher) run(ctx context.Context) error { } // Notify the client. if f.Callbacks.Raw != nil { - f.Callbacks.Raw(raw) + f.Callbacks.Raw(raw, topo) } if f.Callbacks.Update != nil { f.Callbacks.Update(topo) From 88df08f2c0fcea2682de4f9f57c56a274eb43f1a Mon Sep 17 00:00:00 2001 From: roos Date: Thu, 7 Feb 2019 16:24:51 +0100 Subject: [PATCH 2/6] util: atomic file write Add library support for atomically writing to file on disk. --- go/lib/util/file.go | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) create mode 100644 go/lib/util/file.go diff --git a/go/lib/util/file.go b/go/lib/util/file.go new file mode 100644 index 0000000000..8845bce155 --- /dev/null +++ b/go/lib/util/file.go @@ -0,0 +1,48 @@ +// Copyright 2019 Anapaya Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "io/ioutil" + "os" + "path" +) + +// WriteFile writes data to the file. It first writes to a temporary file +// in the same directory and the renames that file to filename. +func WriteFile(filename string, data []byte, perm os.FileMode) error { + dir, file := path.Split(filename) + tmp, err := ioutil.TempFile(dir, file) + if err != nil { + return err + } + defer os.Remove(tmp.Name()) + if _, err := tmp.Write(data); err != nil { + return err + } + if err := tmp.Sync(); err != nil { + return err + } + if err := tmp.Chmod(perm); err != nil { + return err + } + if err := tmp.Close(); err != nil { + return err + } + if err := os.Rename(tmp.Name(), filename); err != nil { + return err + } + return nil +} From 8423ac6c49a25d6e5dc14a7acf0ce01e751e47bf Mon Sep 17 00:00:00 2001 From: roos Date: Thu, 7 Feb 2019 16:48:32 +0100 Subject: [PATCH 3/6] config: Add filename to static config Add filename parameter to static discovery parameters. The filename indicates where an updated static topology shall be written to. In case of the empty string, the topology is not written. --- go/cert_srv/internal/config/config_test.go | 2 ++ go/cert_srv/internal/config/sample.go | 4 ++++ go/lib/infra/modules/idiscovery/config.go | 3 +++ go/path_srv/internal/config/sample.go | 4 ++++ go/path_srv/internal/config/sample_test.go | 2 ++ 5 files changed, 15 insertions(+) diff --git a/go/cert_srv/internal/config/config_test.go b/go/cert_srv/internal/config/config_test.go index e286a254f7..5fdfe4bb52 100644 --- a/go/cert_srv/internal/config/config_test.go +++ b/go/cert_srv/internal/config/config_test.go @@ -34,6 +34,7 @@ func TestSampleCorrect(t *testing.T) { cfg.Discovery.Dynamic.Https = true cfg.Discovery.Static.Enable = true cfg.Discovery.Static.Https = true + cfg.Discovery.Static.Filename = "topology.json" _, err := toml.Decode(Sample, &cfg) SoMsg("err", err, ShouldBeNil) @@ -53,6 +54,7 @@ func TestSampleCorrect(t *testing.T) { SoMsg("Discovery.Static.Timeout correct", cfg.Discovery.Static.Timeout.Duration, ShouldEqual, idiscovery.DefaultFetchTimeout) SoMsg("Discovery.Static.Https correct", cfg.Discovery.Static.Https, ShouldBeFalse) + SoMsg("Discovery.Static.Filename correct", cfg.Discovery.Static.Filename, ShouldBeBlank) SoMsg("Discovery.Dynamic.Enable correct", cfg.Discovery.Dynamic.Enable, ShouldBeFalse) SoMsg("Discovery.Dynamic.Interval correct", cfg.Discovery.Dynamic.Interval.Duration, ShouldEqual, idiscovery.DefaultDynamicFetchInterval) diff --git a/go/cert_srv/internal/config/sample.go b/go/cert_srv/internal/config/sample.go index 54a01448b9..01ba8c1f97 100644 --- a/go/cert_srv/internal/config/sample.go +++ b/go/cert_srv/internal/config/sample.go @@ -86,6 +86,10 @@ const Sample = `[general] # Require https connection. (default false) Https = false + # Filename where the updated static topologies are written. In case of the + # empty string, the updated topologies are not written. (default "") + Filename = "" + [discovery.dynamic] # Enable periodic fetching of the dynamic topology. (default false) Enable = false diff --git a/go/lib/infra/modules/idiscovery/config.go b/go/lib/infra/modules/idiscovery/config.go index e0ea70918e..3ef18ab10b 100644 --- a/go/lib/infra/modules/idiscovery/config.go +++ b/go/lib/infra/modules/idiscovery/config.go @@ -45,6 +45,9 @@ func (c *Config) InitDefaults() { type StaticConfig struct { FetchConfig + // Filename indicates the file that the static topology is written to on updates. + // The empty string indicates that the static topology is not written. + Filename string } func (s *StaticConfig) InitDefaults() { diff --git a/go/path_srv/internal/config/sample.go b/go/path_srv/internal/config/sample.go index 5d748a49a0..8cffab857d 100644 --- a/go/path_srv/internal/config/sample.go +++ b/go/path_srv/internal/config/sample.go @@ -79,6 +79,10 @@ const Sample = `[general] # Require https connection. (default false) Https = false + # Filename where the updated static topologies are written. In case of the + # empty string, the updated topologies are not written. (default "") + Filename = "" + [discovery.dynamic] # Enable periodic fetching of the dynamic topology. (default false) Enable = false diff --git a/go/path_srv/internal/config/sample_test.go b/go/path_srv/internal/config/sample_test.go index 012bacf7a0..e700add8ab 100644 --- a/go/path_srv/internal/config/sample_test.go +++ b/go/path_srv/internal/config/sample_test.go @@ -33,6 +33,7 @@ func TestSampleCorrect(t *testing.T) { cfg.Discovery.Dynamic.Https = true cfg.Discovery.Static.Enable = true cfg.Discovery.Static.Https = true + cfg.Discovery.Static.Filename = "topology.json" _, err := toml.Decode(Sample, &cfg) SoMsg("err", err, ShouldBeNil) @@ -52,6 +53,7 @@ func TestSampleCorrect(t *testing.T) { SoMsg("Discovery.Static.Timeout correct", cfg.Discovery.Static.Timeout.Duration, ShouldEqual, idiscovery.DefaultFetchTimeout) SoMsg("Discovery.Static.Https correct", cfg.Discovery.Static.Https, ShouldBeFalse) + SoMsg("Discovery.Static.Filename correct", cfg.Discovery.Static.Filename, ShouldBeBlank) SoMsg("Discovery.Dynamic.Enable correct", cfg.Discovery.Dynamic.Enable, ShouldBeFalse) SoMsg("Discovery.Dynamic.Interval correct", cfg.Discovery.Dynamic.Interval.Duration, ShouldEqual, idiscovery.DefaultDynamicFetchInterval) From 11d6a2c1309f8a786e70fdf94567173d751f0660 Mon Sep 17 00:00:00 2001 From: roos Date: Thu, 7 Feb 2019 17:39:30 +0100 Subject: [PATCH 4/6] idiscovery: Enable writing static topology updates Enable the idiscovery library to write static topology updates to the filesystem. This can be enabled by setting the filename in the StaticConfig. --- go/lib/infra/modules/idiscovery/idiscovery.go | 108 +++++++++++++++--- 1 file changed, 90 insertions(+), 18 deletions(-) diff --git a/go/lib/infra/modules/idiscovery/idiscovery.go b/go/lib/infra/modules/idiscovery/idiscovery.go index de8151a218..1d02b9d4e7 100644 --- a/go/lib/infra/modules/idiscovery/idiscovery.go +++ b/go/lib/infra/modules/idiscovery/idiscovery.go @@ -21,6 +21,10 @@ // By default changes to the semi-mutable section of static topologies is // not allowed. It can be enabled by providing a custom topo handler. // +// The periodic.Runner for the static topology can be instructed to +// write updated versions to the file system. To enable this, set +// the filename in StaticConfig. +// // A periodic.Task with a customized TopoHandler can be created with // NewFetcher, when the client package requires more control. package idiscovery @@ -37,6 +41,7 @@ import ( "github.com/scionproto/scion/go/lib/log" "github.com/scionproto/scion/go/lib/periodic" "github.com/scionproto/scion/go/lib/topology" + "github.com/scionproto/scion/go/lib/util" ) // TopoHandler handles a topology fetched from the discovery service, and @@ -93,8 +98,8 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, var err error r := Runners{} if cfg.Static.Enable { - r.Static, err = startPeriodic( - cfg.Static.FetchConfig, + r.Static, err = startPeriodicStaticFetcher( + cfg.Static, handlers.static(), discovery.FetchParams{ Mode: discovery.Static, @@ -108,7 +113,7 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, } } if cfg.Dynamic.Enable { - r.Dynamic, err = startPeriodic( + r.Dynamic, err = startPeriodicFetcher( cfg.Dynamic, handlers.dynamic(), discovery.FetchParams{ @@ -128,6 +133,37 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, return r, nil } +// startPeriodicFetcherWriter starts a runner that periodically fetches the topology. +// If a filename is specified, the topology is written to disk. +func startPeriodicStaticFetcher(cfg StaticConfig, handler TopoHandler, + params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) { + + if cfg.Filename == "" { + return startPeriodicFetcher(cfg.FetchConfig, handler, params, client) + } + fetcher, err := NewWriteFetcher(handler, params, cfg.Filename, client) + if err != nil { + return nil, err + } + return startPeriodic(fetcher, cfg.FetchConfig), nil +} + +// startPeriodicFetcher starts a runner that periodically fetches the topology. +func startPeriodicFetcher(cfg FetchConfig, handler TopoHandler, + params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) { + + fetcher, err := NewFetcher(handler, params, client) + if err != nil { + return nil, err + } + return startPeriodic(fetcher, cfg), nil +} + +func startPeriodic(fetcher *task, cfg FetchConfig) *periodic.Runner { + return periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration), + cfg.Timeout.Duration) +} + // Stop stops all runners. func (r *Runners) Stop() { if r.Static != nil { @@ -154,24 +190,12 @@ func (r *Runners) Kill() { } } -// startPeriodic starts a runner that periodically fetches the topology. -func startPeriodic(cfg FetchConfig, handler TopoHandler, - params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) { - - fetcher, err := NewFetcher(handler, params, client) - if err != nil { - return nil, err - } - runner := periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration), - cfg.Timeout.Duration) - return runner, nil -} - // task is a periodic.Task that fetches the topology from the discovery service. type task struct { log.Logger - handler TopoHandler - fetcher *topofetcher.Fetcher + handler TopoHandler + fetcher *topofetcher.Fetcher + filename string } // NewFetcher creates a periodic.Task that fetches the topology from the discovery @@ -202,6 +226,36 @@ func NewFetcher(handler TopoHandler, params discovery.FetchParams, return t, nil } +// NewWriteFetcher creates a periodic.Task that fetches the topology from the discovery +// service and calls the provided handler on the received topology. If the handler indiacates +// an update, the topology is written to filename. +func NewWriteFetcher(handler TopoHandler, params discovery.FetchParams, filename string, + client *http.Client) (*task, error) { + + if handler == nil { + return nil, common.NewBasicError("handler must not be nil", nil) + } + t := &task{ + Logger: log.New("Module", "Discovery", "Mode", params.Mode), + handler: handler, + filename: filename, + } + var err error + t.fetcher, err = topofetcher.New( + itopo.Get().DS, + params, + topofetcher.Callbacks{ + Error: t.handleErr, + Raw: t.handleRaw, + }, + client, + ) + if err != nil { + return nil, common.NewBasicError("Unable to initialize fetcher", err) + } + return t, nil +} + func (t *task) Run(ctx context.Context) { if err := t.fetcher.UpdateInstances(itopo.Get().DS); err != nil { log.Error("[discovery] Unable to update instances", "err", err) @@ -214,11 +268,29 @@ func (t *task) handleErr(err error) { t.Error("[discovery] Unable to fetch topology", "err", err) } +func (t *task) handleRaw(raw common.RawBytes, topo *topology.Topo) { + updated, err := t.callHandler(topo) + if err != nil || !updated { + return + } + if err := util.WriteFile(t.filename, raw, 0644); err != nil { + t.Error("[discovery] Unable to write new topology to filesystem", "err", err) + return + } + t.Trace("[discovery] Topology written to filesystem", + "file", t.filename, "params", t.fetcher.Params) +} + func (t *task) handleTopo(topo *topology.Topo) { + t.callHandler(topo) +} + +func (t *task) callHandler(topo *topology.Topo) (bool, error) { updated, err := t.handler(topo) if err != nil { t.Error("[discovery] Unable to handle topology", "err", err) } else if updated { t.Trace("[discovery] Set topology", "params", t.fetcher.Params) } + return updated, err } From 10ca707676f9ca08c5dc04d5945acf3d04473a47 Mon Sep 17 00:00:00 2001 From: roos Date: Thu, 7 Feb 2019 17:42:45 +0100 Subject: [PATCH 5/6] Accept: Check that the update is written Check that the static topology update is written to disk, and that it does not differ from the version served by the discovery service. --- .../test | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/acceptance/discovery_infra_fetches_static_acceptance/test b/acceptance/discovery_infra_fetches_static_acceptance/test index 4838cfbbc7..11ee4c5ab5 100755 --- a/acceptance/discovery_infra_fetches_static_acceptance/test +++ b/acceptance/discovery_infra_fetches_static_acceptance/test @@ -1,7 +1,7 @@ #!/bin/bash # This test checks that the infra services fetch the static topology -# from the discovery service. +# from the discovery service, and that it writes the update to disk. PROGRAM=`basename "$0"` COMMAND="$1" @@ -14,9 +14,12 @@ test_setup() { set -e base_setup - for cfg in gen/ISD1/AS$AS_FILE/*/{cs,ps}config.toml; do - set_log_lvl "$cfg" - set_interval "$cfg" "static" + for elem in gen/ISD1/AS$AS_FILE/{cs,ps}*; do + for cfg in $elem/*.toml; do + set_log_lvl "$cfg" + set_interval "$cfg" "static" + sed -i -e "/\[discovery.static]/a Filename = \"/share/cache/${elem##*/}-topo.json\"" $cfg + done done base_start_scion @@ -27,14 +30,25 @@ test_run() { # Start serving static topology. jq ".BorderRouters[].InternalAddrs.IPv4.PublicOverlay = {Addr: \"127.42.42.42\", OverlayPort: 39999} | .Timestamp = $( date +%s) | .TTL = 3" $TOPO | sponge $STATIC_FULL sleep 6 + # Check that the mock ds serves the file check_file "static" + # Check that the logs contain setting and writing the topo. check_logs "ps$IA_FILE-1" check_logs "cs$IA_FILE-1" + # Check that the written file does not differ from the served file. + check_diff "ps$IA_FILE-1" + check_diff "cs$IA_FILE-1" } check_logs() { grep -q "\[discovery\] Set topology .* Mode=static" "logs/$1.log" || \ { echo "Setting static topology not found in logs. id=$1"; return 1; } + grep -q "\[discovery\] Topology written to filesystem .* Mode=static" "logs/$1.log" || \ + { echo "Writing static topology not found in logs. id=$1"; return 1; } +} + +check_diff () { + diff -q $STATIC_FULL gen-cache/$1-topo.json } shift From e4ca25305a9cc69a2a29b2f0e15a0a2b039a5baa Mon Sep 17 00:00:00 2001 From: roos Date: Fri, 8 Feb 2019 11:07:25 +0100 Subject: [PATCH 6/6] Refactor fetcher Reduce code duplication --- go/lib/infra/modules/idiscovery/idiscovery.go | 74 ++++--------------- 1 file changed, 13 insertions(+), 61 deletions(-) diff --git a/go/lib/infra/modules/idiscovery/idiscovery.go b/go/lib/infra/modules/idiscovery/idiscovery.go index 1d02b9d4e7..c15aa002f0 100644 --- a/go/lib/infra/modules/idiscovery/idiscovery.go +++ b/go/lib/infra/modules/idiscovery/idiscovery.go @@ -98,14 +98,15 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, var err error r := Runners{} if cfg.Static.Enable { - r.Static, err = startPeriodicStaticFetcher( - cfg.Static, + r.Static, err = startPeriodicFetcher( + cfg.Static.FetchConfig, handlers.static(), discovery.FetchParams{ Mode: discovery.Static, Https: cfg.Static.Https, File: file, }, + cfg.Static.Filename, client, ) if err != nil { @@ -121,6 +122,7 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, Https: cfg.Dynamic.Https, File: file, }, + "", client, ) if err != nil { @@ -133,35 +135,17 @@ func StartRunners(cfg Config, file discovery.File, handlers TopoHandlers, return r, nil } -// startPeriodicFetcherWriter starts a runner that periodically fetches the topology. -// If a filename is specified, the topology is written to disk. -func startPeriodicStaticFetcher(cfg StaticConfig, handler TopoHandler, - params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) { - - if cfg.Filename == "" { - return startPeriodicFetcher(cfg.FetchConfig, handler, params, client) - } - fetcher, err := NewWriteFetcher(handler, params, cfg.Filename, client) - if err != nil { - return nil, err - } - return startPeriodic(fetcher, cfg.FetchConfig), nil -} - // startPeriodicFetcher starts a runner that periodically fetches the topology. -func startPeriodicFetcher(cfg FetchConfig, handler TopoHandler, - params discovery.FetchParams, client *http.Client) (*periodic.Runner, error) { +func startPeriodicFetcher(cfg FetchConfig, handler TopoHandler, params discovery.FetchParams, + filename string, client *http.Client) (*periodic.Runner, error) { - fetcher, err := NewFetcher(handler, params, client) + fetcher, err := NewFetcher(handler, params, filename, client) if err != nil { return nil, err } - return startPeriodic(fetcher, cfg), nil -} - -func startPeriodic(fetcher *task, cfg FetchConfig) *periodic.Runner { - return periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration), + runner := periodic.StartPeriodicTask(fetcher, periodic.NewTicker(cfg.Interval.Duration), cfg.Timeout.Duration) + return runner, nil } // Stop stops all runners. @@ -199,38 +183,10 @@ type task struct { } // NewFetcher creates a periodic.Task that fetches the topology from the discovery -// service and calls the provided handler on the received topology. +// service and calls the provided handler on the received topology. If the handler +// indicates an update, and filename is set, the topology is written. func NewFetcher(handler TopoHandler, params discovery.FetchParams, - client *http.Client) (*task, error) { - - if handler == nil { - return nil, common.NewBasicError("handler must not be nil", nil) - } - t := &task{ - Logger: log.New("Module", "Discovery", "Mode", params.Mode), - handler: handler, - } - var err error - t.fetcher, err = topofetcher.New( - itopo.Get().DS, - params, - topofetcher.Callbacks{ - Error: t.handleErr, - Update: t.handleTopo, - }, - client, - ) - if err != nil { - return nil, common.NewBasicError("Unable to initialize fetcher", err) - } - return t, nil -} - -// NewWriteFetcher creates a periodic.Task that fetches the topology from the discovery -// service and calls the provided handler on the received topology. If the handler indiacates -// an update, the topology is written to filename. -func NewWriteFetcher(handler TopoHandler, params discovery.FetchParams, filename string, - client *http.Client) (*task, error) { + filename string, client *http.Client) (*task, error) { if handler == nil { return nil, common.NewBasicError("handler must not be nil", nil) @@ -270,7 +226,7 @@ func (t *task) handleErr(err error) { func (t *task) handleRaw(raw common.RawBytes, topo *topology.Topo) { updated, err := t.callHandler(topo) - if err != nil || !updated { + if err != nil || t.filename == "" || !updated { return } if err := util.WriteFile(t.filename, raw, 0644); err != nil { @@ -281,10 +237,6 @@ func (t *task) handleRaw(raw common.RawBytes, topo *topology.Topo) { "file", t.filename, "params", t.fetcher.Params) } -func (t *task) handleTopo(topo *topology.Topo) { - t.callHandler(topo) -} - func (t *task) callHandler(topo *topology.Topo) (bool, error) { updated, err := t.handler(topo) if err != nil {